From 38006566aff8b3477c1478fc814a187f63cb9727 Mon Sep 17 00:00:00 2001 From: Keenan Tims Date: Sun, 9 Mar 2025 01:04:42 -0800 Subject: [PATCH] initial deployment --- Cargo.toml | 3 +++ src/main.rs | 65 ++++++++++++++++++++++++++++++++++++++--------------- 2 files changed, 50 insertions(+), 18 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 5939ace..64b4aab 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,3 +20,6 @@ serde = { version = "1.0.218", features = ["derive"] } serde_regex = "1.1.0" tokio = { version = "1.44.0", features = ["net", "rt-multi-thread"] } tokio-util = { version = "0.7.13", features = ["codec"] } + +[profile.release] +lto = "fat" \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 5fa0625..c68db98 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,25 +2,24 @@ use backoff::backoff::Backoff; use config::Config; use core::str; use json::JsonValue; -use log::{debug, error, info}; +use log::{debug, error, info, warn}; use metrics::gauge; use metrics_exporter_prometheus::PrometheusBuilder; use metrics_util::MetricKindMask; -use mqtt_exporter::transforms::*; use regex::Regex; use rumqttc::{ - AsyncClient, + AsyncClient, ConnAck, Event::{self}, - MqttOptions, Packet, Publish, QoS, + MqttOptions, Packet, Publish, QoS, SubscribeFilter, }; use serde::Deserialize; use std::{ net::{IpAddr, Ipv6Addr}, process::exit, - time::Duration, + time::{Duration, SystemTime, UNIX_EPOCH}, }; -use std::time::{SystemTime, UNIX_EPOCH}; +use mqtt_exporter::transforms::*; struct ConfigDefaults; impl ConfigDefaults { @@ -178,7 +177,7 @@ impl App { let metric_s = metric_path.join("_"); let mut labels = labels.clone(); labels.push((self.config.topic_label.to_owned(), topic.to_owned())); - debug!("metric: {metric_s} value: {v} labels: {labels:?}"); + debug!(target: "mqtt-exporter::prom", "metric: {metric_s} value: {v} labels: {labels:?}"); if self.config.expose_last_seen { let metric_ts = format!("{}_ts", metric_s); @@ -213,15 +212,30 @@ impl App { fn handle_event(&self, event: &Event) { match event { Event::Incoming(Packet::Publish(notification)) => self.handle_publish(notification), + Event::Incoming(Packet::ConnAck(ack)) => self.handle_connack(ack), + // Event::Incoming(Packet::SubAck(ack)) => self.handle_suback(ack), Event::Outgoing(_) => {} - e => debug!("Unhandled event: {:?}", e), + e => debug!(target: "mqtt-exporter::mqtt", "Unhandled event: {:?}", e), } } fn handle_publish(&self, event: &Publish) { match Self::deserialize_payload(&event.payload) { Ok(parsed) => self.update_metrics(&event.topic, parsed), - Err(e) => error!("Error deserializing JSON payload: {:?}", e), + Err(e) => { + error!(target: "mqtt-exporter::mqtt", "Error deserializing JSON payload: {:?}", e) + } + } + } + + fn handle_connack(&self, ack: &ConnAck) { + match ack.code { + rumqttc::ConnectReturnCode::Success => { + info!(target: "mqtt-exporter::mqtt", "Successfully connected to MQTT broker") + } + code => { + warn!(target: "mqtt-exporter::mqtt", "Non success connection return code {:?}", code) + } } } @@ -231,7 +245,7 @@ impl App { if !&self.topic_filter(raw_topic) { return; } - debug!("topic {} payload {}", raw_topic, data.pretty(2)); + debug!(target: "mqtt-exporter::mqtt", "topic {} payload {}", raw_topic, data.pretty(2)); let mut topic = raw_topic.to_owned(); (topic, data) = self @@ -249,6 +263,23 @@ impl App { self.parse_metrics(data, &topic, &topic, metric_path, &labels); } + async fn subscribe(&self, client: &AsyncClient) { + match client + .subscribe_many(self.config.mqtt_topic.iter().map(|topic| SubscribeFilter { + path: topic.to_owned(), + qos: QoS::AtLeastOnce, + })) + .await + { + Ok(_) => { + info!(target: "mqtt-exporter::mqtt", "Successfully subscribed to topics `{}`", &self.config.mqtt_topic.join(" ")) + } + Err(e) => { + error!(target: "mqtt-exporter::mqtt", "Failed to subscribe to topics `{}` - {}", &self.config.mqtt_topic.join(" "), e) + } + } + } + async fn mqtt_task(self) { let mut opts = MqttOptions::new( &self.config.mqtt_client_id, @@ -266,9 +297,7 @@ impl App { let (client, mut eventloop) = AsyncClient::new(opts, 10); - for topic in &self.config.mqtt_topic { - client.subscribe(topic, QoS::AtLeastOnce).await.unwrap(); - } + self.subscribe(&client).await; let mut bo = backoff::ExponentialBackoff::default(); @@ -279,15 +308,15 @@ impl App { self.handle_event(&event) } Err(e) => { - error!("Connection error: {}", e); + error!(target: "mqtt-exporter::mqtt", "Connection error: {}", e); let delay = bo.next_backoff(); match delay { Some(delay) => { - debug!("Backing off for {}s", delay.as_secs_f32()); + debug!(target: "mqtt-exporter::mqtt", "Backing off for {}s", delay.as_secs_f32()); tokio::time::sleep(delay).await } None => { - error!("Connection timed out, giving up"); + error!(target: "mqtt-exporter::mqtt", "Connection timed out, giving up"); exit(-1) } } @@ -325,7 +354,7 @@ impl App { if config.zigbee2mqtt_availability { transformers.push(Box::new(Zigbee2MqttAvailability::default())) } - debug!("Transformers enabled: {:?}", transformers); + debug!(target: "mqtt-exporter::config", "Transformers enabled: {:?}", transformers); Self { config, transformers, @@ -352,7 +381,7 @@ async fn main() { .build() .unwrap(); let app_config = config.try_deserialize().unwrap(); - info!("Loaded config: {app_config:?}"); + info!(target: "mqtt-exporter::config", "Loaded config: {app_config:?}"); let app = App::new(app_config); app.prom_task().await;