Compare commits
No commits in common. "main" and "0.1.0" have entirely different histories.
2
Cargo.lock
generated
2
Cargo.lock
generated
@ -983,7 +983,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "mqtt-exporter"
|
||||
version = "0.1.2"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"backoff",
|
||||
"bytes",
|
||||
|
@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "mqtt-exporter"
|
||||
version = "0.1.2"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
|
@ -1,5 +0,0 @@
|
||||
# mqtt-exporter-rs
|
||||
|
||||
A reimplementation of [mqtt-exporter](https://github.com/kpetremann/mqtt-exporter) in Rust. That's pretty much the gist of it. I was having issues with memory leaks in the Python implementation so I spun this together. It works for my purposes but is far from tested.
|
||||
|
||||
I slightly modified the logic, since I don't understand why some of the topic munging is done and would prefer to keep the topics as-is.
|
67
src/main.rs
67
src/main.rs
@ -10,7 +10,7 @@ use regex::Regex;
|
||||
use rumqttc::{
|
||||
AsyncClient, ConnAck,
|
||||
Event::{self},
|
||||
EventLoop, MqttOptions, Packet, Publish, QoS, SubscribeFilter,
|
||||
MqttOptions, Packet, Publish, QoS, SubscribeFilter,
|
||||
};
|
||||
use serde::Deserialize;
|
||||
use std::{
|
||||
@ -110,7 +110,6 @@ struct AppConfig {
|
||||
}
|
||||
struct App {
|
||||
config: AppConfig,
|
||||
client: AsyncClient,
|
||||
transformers: Vec<Box<dyn Transform>>,
|
||||
}
|
||||
|
||||
@ -210,10 +209,10 @@ impl App {
|
||||
Ok(json::parse(str::from_utf8(payload)?)?)
|
||||
}
|
||||
|
||||
async fn handle_event(&self, event: &Event) {
|
||||
fn handle_event(&self, event: &Event) {
|
||||
match event {
|
||||
Event::Incoming(Packet::Publish(notification)) => self.handle_publish(notification),
|
||||
Event::Incoming(Packet::ConnAck(ack)) => self.handle_connack(ack).await,
|
||||
Event::Incoming(Packet::ConnAck(ack)) => self.handle_connack(ack),
|
||||
// Event::Incoming(Packet::SubAck(ack)) => self.handle_suback(ack),
|
||||
Event::Outgoing(_) => {}
|
||||
e => debug!(target: "mqtt-exporter::mqtt", "Unhandled event: {:?}", e),
|
||||
@ -229,11 +228,10 @@ impl App {
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_connack(&self, ack: &ConnAck) {
|
||||
fn handle_connack(&self, ack: &ConnAck) {
|
||||
match ack.code {
|
||||
rumqttc::ConnectReturnCode::Success => {
|
||||
info!(target: "mqtt-exporter::mqtt", "Successfully connected to MQTT broker");
|
||||
self.subscribe().await;
|
||||
info!(target: "mqtt-exporter::mqtt", "Successfully connected to MQTT broker")
|
||||
}
|
||||
code => {
|
||||
warn!(target: "mqtt-exporter::mqtt", "Non success connection return code {:?}", code)
|
||||
@ -265,9 +263,8 @@ impl App {
|
||||
self.parse_metrics(data, &topic, &topic, metric_path, &labels);
|
||||
}
|
||||
|
||||
async fn subscribe(&self) {
|
||||
match self
|
||||
.client
|
||||
async fn subscribe(&self, client: &AsyncClient) {
|
||||
match client
|
||||
.subscribe_many(self.config.mqtt_topic.iter().map(|topic| SubscribeFilter {
|
||||
path: topic.to_owned(),
|
||||
qos: QoS::AtLeastOnce,
|
||||
@ -283,14 +280,32 @@ impl App {
|
||||
}
|
||||
}
|
||||
|
||||
async fn mqtt_task(self, mut eventloop: EventLoop) {
|
||||
async fn mqtt_task(self) {
|
||||
let mut opts = MqttOptions::new(
|
||||
&self.config.mqtt_client_id,
|
||||
&self.config.mqtt_address,
|
||||
self.config.mqtt_port,
|
||||
);
|
||||
opts.set_keep_alive(Duration::from_secs(self.config.mqtt_keepalive))
|
||||
.set_max_packet_size(150000, 150000);
|
||||
|
||||
if let (Some(username), Some(password)) =
|
||||
(&self.config.mqtt_username, &self.config.mqtt_password)
|
||||
{
|
||||
opts.set_credentials(username, password);
|
||||
}
|
||||
|
||||
let (client, mut eventloop) = AsyncClient::new(opts, 10);
|
||||
|
||||
self.subscribe(&client).await;
|
||||
|
||||
let mut bo = backoff::ExponentialBackoff::default();
|
||||
|
||||
loop {
|
||||
match eventloop.poll().await {
|
||||
Ok(event) => {
|
||||
bo.reset();
|
||||
self.handle_event(&event).await
|
||||
self.handle_event(&event)
|
||||
}
|
||||
Err(e) => {
|
||||
error!(target: "mqtt-exporter::mqtt", "Connection error: {}", e);
|
||||
@ -321,7 +336,7 @@ impl App {
|
||||
builder.install().unwrap();
|
||||
}
|
||||
|
||||
fn new(config: AppConfig, client: AsyncClient) -> Self {
|
||||
fn new(config: AppConfig) -> Self {
|
||||
let mut transformers: Vec<Box<dyn Transform>> = Vec::new();
|
||||
if !config.zwave_topic_prefix.is_empty() {
|
||||
transformers.push(Box::new(ZwaveNormalize::new(&config.zwave_topic_prefix)))
|
||||
@ -331,8 +346,7 @@ impl App {
|
||||
&config.esphome_topic_prefixes,
|
||||
)))
|
||||
}
|
||||
if !config.hubitat_topic_prefixes.is_empty() && !config.hubitat_topic_prefixes[0].is_empty()
|
||||
{
|
||||
if !config.hubitat_topic_prefixes.is_empty() {
|
||||
transformers.push(Box::new(HubitatNormalize::new(
|
||||
&config.hubitat_topic_prefixes,
|
||||
)))
|
||||
@ -341,10 +355,8 @@ impl App {
|
||||
transformers.push(Box::new(Zigbee2MqttAvailability::default()))
|
||||
}
|
||||
debug!(target: "mqtt-exporter::config", "Transformers enabled: {:?}", transformers);
|
||||
|
||||
Self {
|
||||
config,
|
||||
client,
|
||||
transformers,
|
||||
}
|
||||
}
|
||||
@ -368,25 +380,10 @@ async fn main() {
|
||||
)
|
||||
.build()
|
||||
.unwrap();
|
||||
let app_config: AppConfig = config.try_deserialize().unwrap();
|
||||
let app_config = config.try_deserialize().unwrap();
|
||||
info!(target: "mqtt-exporter::config", "Loaded config: {app_config:?}");
|
||||
|
||||
let mut opts = MqttOptions::new(
|
||||
&app_config.mqtt_client_id,
|
||||
&app_config.mqtt_address,
|
||||
app_config.mqtt_port,
|
||||
);
|
||||
opts.set_keep_alive(Duration::from_secs(app_config.mqtt_keepalive))
|
||||
.set_max_packet_size(150000, 150000);
|
||||
|
||||
if let (Some(username), Some(password)) = (&app_config.mqtt_username, &app_config.mqtt_password)
|
||||
{
|
||||
opts.set_credentials(username, password);
|
||||
}
|
||||
|
||||
let (client, mut eventloop) = AsyncClient::new(opts, 10);
|
||||
|
||||
let app = App::new(app_config, client);
|
||||
let app = App::new(app_config);
|
||||
app.prom_task().await;
|
||||
app.mqtt_task(eventloop).await;
|
||||
app.mqtt_task().await;
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user