diff --git a/Cargo.lock b/Cargo.lock index 6583999..3cfaf45 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -983,7 +983,7 @@ dependencies = [ [[package]] name = "mqtt-exporter" -version = "0.1.1" +version = "0.1.2" dependencies = [ "backoff", "bytes", diff --git a/Cargo.toml b/Cargo.toml index ff0e3ad..f4bbed9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mqtt-exporter" -version = "0.1.1" +version = "0.1.2" edition = "2021" [dependencies] diff --git a/src/main.rs b/src/main.rs index ed79bad..32c042f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,7 +10,7 @@ use regex::Regex; use rumqttc::{ AsyncClient, ConnAck, Event::{self}, - MqttOptions, Packet, Publish, QoS, SubscribeFilter, + EventLoop, MqttOptions, Packet, Publish, QoS, SubscribeFilter, }; use serde::Deserialize; use std::{ @@ -110,6 +110,7 @@ struct AppConfig { } struct App { config: AppConfig, + client: AsyncClient, transformers: Vec>, } @@ -209,10 +210,10 @@ impl App { Ok(json::parse(str::from_utf8(payload)?)?) } - fn handle_event(&self, event: &Event) { + async fn handle_event(&self, event: &Event) { match event { Event::Incoming(Packet::Publish(notification)) => self.handle_publish(notification), - Event::Incoming(Packet::ConnAck(ack)) => self.handle_connack(ack), + Event::Incoming(Packet::ConnAck(ack)) => self.handle_connack(ack).await, // Event::Incoming(Packet::SubAck(ack)) => self.handle_suback(ack), Event::Outgoing(_) => {} e => debug!(target: "mqtt-exporter::mqtt", "Unhandled event: {:?}", e), @@ -228,10 +229,11 @@ impl App { } } - fn handle_connack(&self, ack: &ConnAck) { + async fn handle_connack(&self, ack: &ConnAck) { match ack.code { rumqttc::ConnectReturnCode::Success => { - info!(target: "mqtt-exporter::mqtt", "Successfully connected to MQTT broker") + info!(target: "mqtt-exporter::mqtt", "Successfully connected to MQTT broker"); + self.subscribe().await; } code => { warn!(target: "mqtt-exporter::mqtt", "Non success connection return code {:?}", code) @@ -263,8 +265,9 @@ impl App { self.parse_metrics(data, &topic, &topic, metric_path, &labels); } - async fn subscribe(&self, client: &AsyncClient) { - match client + async fn subscribe(&self) { + match self + .client .subscribe_many(self.config.mqtt_topic.iter().map(|topic| SubscribeFilter { path: topic.to_owned(), qos: QoS::AtLeastOnce, @@ -280,32 +283,14 @@ impl App { } } - async fn mqtt_task(self) { - let mut opts = MqttOptions::new( - &self.config.mqtt_client_id, - &self.config.mqtt_address, - self.config.mqtt_port, - ); - opts.set_keep_alive(Duration::from_secs(self.config.mqtt_keepalive)) - .set_max_packet_size(150000, 150000); - - if let (Some(username), Some(password)) = - (&self.config.mqtt_username, &self.config.mqtt_password) - { - opts.set_credentials(username, password); - } - - let (client, mut eventloop) = AsyncClient::new(opts, 10); - - self.subscribe(&client).await; - + async fn mqtt_task(self, mut eventloop: EventLoop) { let mut bo = backoff::ExponentialBackoff::default(); loop { match eventloop.poll().await { Ok(event) => { bo.reset(); - self.handle_event(&event) + self.handle_event(&event).await } Err(e) => { error!(target: "mqtt-exporter::mqtt", "Connection error: {}", e); @@ -336,7 +321,7 @@ impl App { builder.install().unwrap(); } - fn new(config: AppConfig) -> Self { + fn new(config: AppConfig, client: AsyncClient) -> Self { let mut transformers: Vec> = Vec::new(); if !config.zwave_topic_prefix.is_empty() { transformers.push(Box::new(ZwaveNormalize::new(&config.zwave_topic_prefix))) @@ -346,7 +331,8 @@ impl App { &config.esphome_topic_prefixes, ))) } - if !config.hubitat_topic_prefixes.is_empty() && !config.hubitat_topic_prefixes[0].is_empty() { + if !config.hubitat_topic_prefixes.is_empty() && !config.hubitat_topic_prefixes[0].is_empty() + { transformers.push(Box::new(HubitatNormalize::new( &config.hubitat_topic_prefixes, ))) @@ -355,8 +341,10 @@ impl App { transformers.push(Box::new(Zigbee2MqttAvailability::default())) } debug!(target: "mqtt-exporter::config", "Transformers enabled: {:?}", transformers); + Self { config, + client, transformers, } } @@ -380,10 +368,25 @@ async fn main() { ) .build() .unwrap(); - let app_config = config.try_deserialize().unwrap(); + let app_config: AppConfig = config.try_deserialize().unwrap(); info!(target: "mqtt-exporter::config", "Loaded config: {app_config:?}"); - let app = App::new(app_config); + let mut opts = MqttOptions::new( + &app_config.mqtt_client_id, + &app_config.mqtt_address, + app_config.mqtt_port, + ); + opts.set_keep_alive(Duration::from_secs(app_config.mqtt_keepalive)) + .set_max_packet_size(150000, 150000); + + if let (Some(username), Some(password)) = (&app_config.mqtt_username, &app_config.mqtt_password) + { + opts.set_credentials(username, password); + } + + let (client, mut eventloop) = AsyncClient::new(opts, 10); + + let app = App::new(app_config, client); app.prom_task().await; - app.mqtt_task().await; + app.mqtt_task(eventloop).await; }