From 5c96aa8c92211c940a3fd654c7054875fdae59a2 Mon Sep 17 00:00:00 2001 From: Keenan Tims Date: Sat, 8 Mar 2025 18:36:14 -0800 Subject: [PATCH] Mostly feautre partiy with the python version Not well tested --- src/lib.rs | 1 + src/main.rs | 309 ++++++++++++++++++++++++++-------------------- src/transforms.rs | 189 ++++++++++++++++++++++++++++ 3 files changed, 366 insertions(+), 133 deletions(-) create mode 100644 src/lib.rs create mode 100644 src/transforms.rs diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..3276530 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1 @@ +pub mod transforms; diff --git a/src/main.rs b/src/main.rs index 991dbb0..5fa0625 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,46 +1,68 @@ -use backoff::{backoff::Backoff, ExponentialBackoff}; +use backoff::backoff::Backoff; use config::Config; use core::str; -use itertools::Itertools; use json::JsonValue; -use log::{debug, error, info, warn}; +use log::{debug, error, info}; use metrics::gauge; use metrics_exporter_prometheus::PrometheusBuilder; -use metrics_util::{ - registry::{AtomicStorage, GenerationalAtomicStorage, Registry}, - MetricKindMask, -}; +use metrics_util::MetricKindMask; +use mqtt_exporter::transforms::*; use regex::Regex; use rumqttc::{ - tokio_rustls::client, AsyncClient, - Event::{self, Incoming}, + Event::{self}, MqttOptions, Packet, Publish, QoS, }; use serde::Deserialize; -use std::{collections::HashMap, error::Error, iter::repeat, process::exit, time::Duration}; -use tokio::{join, net::TcpStream}; +use std::{ + net::{IpAddr, Ipv6Addr}, + process::exit, + time::Duration, +}; + +use std::time::{SystemTime, UNIX_EPOCH}; struct ConfigDefaults; impl ConfigDefaults { - fn address() -> String { + fn mqtt_address() -> String { "localhost".to_owned() } - fn port() -> u16 { + fn mqtt_port() -> u16 { 1883 } - fn topic() -> Vec { + fn mqtt_topic() -> Vec { vec!["#".to_owned()] } - fn keepalive() -> u64 { + fn mqtt_keepalive() -> u64 { 60 } - fn client_id() -> String { + fn mqtt_client_id() -> String { "mqtt-exporter-rs".to_owned() } - fn expose_client_id() -> bool { + fn mqtt_expose_client_id() -> bool { true } + fn expose_last_seen() -> bool { + false + } + fn prometheus_address() -> IpAddr { + IpAddr::V6(Ipv6Addr::from_bits(0)) + } + fn prometheus_port() -> u16 { + 9000 + } + fn prometheus_prefix() -> String { + "mqtt".to_owned() + } + fn topic_label() -> String { + "topic".to_owned() + } + fn hubitat_topic_prefixes() -> Vec { + vec!["hubitat".to_owned()] + } + fn zigbee2mqtt_availability() -> bool { + false + } fn zwave_topic_prefix() -> String { "zwave/".to_owned() } @@ -48,52 +70,69 @@ impl ConfigDefaults { #[derive(Debug, Deserialize)] struct AppConfig { - #[serde(default = "ConfigDefaults::address")] - address: String, - #[serde(default = "ConfigDefaults::port")] - port: u16, - #[serde(default = "ConfigDefaults::topic")] - topic: Vec, - #[serde(default = "ConfigDefaults::keepalive")] - keepalive: u64, + #[serde(default = "ConfigDefaults::mqtt_address")] + mqtt_address: String, + #[serde(default = "ConfigDefaults::mqtt_port")] + mqtt_port: u16, + #[serde(default = "ConfigDefaults::mqtt_topic")] + mqtt_topic: Vec, + #[serde(default = "ConfigDefaults::mqtt_keepalive")] + mqtt_keepalive: u64, #[serde(default)] - username: Option, + mqtt_username: Option, #[serde(default)] - password: Option, - #[serde(default = "ConfigDefaults::client_id")] - client_id: String, - #[serde(default = "ConfigDefaults::expose_client_id")] - expose_client_id: bool, + mqtt_password: Option, + #[serde(default = "ConfigDefaults::mqtt_client_id")] + mqtt_client_id: String, + #[serde(default = "ConfigDefaults::mqtt_expose_client_id")] + mqtt_expose_client_id: bool, + #[serde(default = "ConfigDefaults::expose_last_seen")] + expose_last_seen: bool, #[serde(default)] - ignored_topics: Vec, + mqtt_ignored_topics: Vec, #[serde(with = "serde_regex", default)] - ignored_topics_re: Option, + mqtt_ignored_topics_re: Option, + #[serde(default = "ConfigDefaults::prometheus_address")] + prometheus_address: IpAddr, + #[serde(default = "ConfigDefaults::prometheus_port")] + prometheus_port: u16, + #[serde(default = "ConfigDefaults::prometheus_prefix")] + prometheus_prefix: String, + #[serde(default = "ConfigDefaults::topic_label")] + topic_label: String, + #[serde(default)] + esphome_topic_prefixes: Vec, + #[serde(default = "ConfigDefaults::hubitat_topic_prefixes")] + hubitat_topic_prefixes: Vec, + #[serde(default = "ConfigDefaults::zigbee2mqtt_availability")] + zigbee2mqtt_availability: bool, #[serde(default = "ConfigDefaults::zwave_topic_prefix")] zwave_topic_prefix: String, } struct App { config: AppConfig, + transformers: Vec>, +} + +fn parse_number(v: &str) -> Option { + if let Ok(num) = v.parse::() { + Some(num) + } else if v.eq_ignore_ascii_case("on") + || v.eq_ignore_ascii_case("true") + || v.eq_ignore_ascii_case("online") + { + Some(1.0) + } else if v.eq_ignore_ascii_case("off") + || v.eq_ignore_ascii_case("false") + || v.eq_ignore_ascii_case("offline") + { + Some(0.0) + } else { + None + } } impl App { - fn parse_number(v: &str) -> Option { - if let Ok(num) = v.parse::() { - Some(num) - } else if v.eq_ignore_ascii_case("on") - || v.eq_ignore_ascii_case("true") - || v.eq_ignore_ascii_case("online") - { - Some(1.0) - } else if v.eq_ignore_ascii_case("off") - || v.eq_ignore_ascii_case("false") - || v.eq_ignore_ascii_case("offline") - { - Some(0.0) - } else { - None - } - } - fn parse_metric(value: &JsonValue) -> Option { match value { JsonValue::Null => None, @@ -105,8 +144,8 @@ impl App { } } JsonValue::Number(v) => Some((*v).into()), - JsonValue::Short(v) => Self::parse_number(v.as_str()), - JsonValue::String(v) => Self::parse_number(v.as_str()), + JsonValue::Short(v) => parse_number(v.as_str()), + JsonValue::String(v) => parse_number(v.as_str()), _ => None, } } @@ -115,45 +154,39 @@ impl App { &self, mut data: JsonValue, topic: &str, - original_topic: &str, - prefix: &str, - labels: &Option>, + _original_topic: &str, + metric_path: Vec, + labels: &Vec<(String, String)>, ) { match data { JsonValue::Array(_) => { for (metric, child) in data.members_mut().enumerate() { - let metric = metric.to_string(); - self.parse_metrics( - child.take(), - topic, - original_topic, - &format!("{prefix}{metric}_"), - labels, - ) + let mut metric_path = metric_path.clone(); + metric_path.push(metric.to_string()); + self.parse_metrics(child.take(), topic, _original_topic, metric_path, labels) } } JsonValue::Object(_) => { for (metric, child) in data.entries_mut() { - self.parse_metrics( - child.take(), - topic, - original_topic, - &format!("{prefix}_{metric}"), - labels, - ) + let mut metric_path = metric_path.clone(); + metric_path.push(metric.to_string()); + self.parse_metrics(child.take(), topic, _original_topic, metric_path, labels) } } _ => { if let Some(v) = Self::parse_metric(&data) { - debug!("prometheus metric: {prefix} value: {v}"); - let mut labels = if let Some(labels) = labels { - labels.clone() - } else { - Vec::new() - }; - labels.push(("topic".to_owned(), topic.to_owned())); - let owned_pfx = prefix.to_owned(); - let gauge = gauge!(owned_pfx, &labels); + let metric_s = metric_path.join("_"); + let mut labels = labels.clone(); + labels.push((self.config.topic_label.to_owned(), topic.to_owned())); + debug!("metric: {metric_s} value: {v} labels: {labels:?}"); + + if self.config.expose_last_seen { + let metric_ts = format!("{}_ts", metric_s); + let ts_gauge = gauge!(metric_ts, &labels); + let ts = SystemTime::now().duration_since(UNIX_EPOCH).unwrap(); + ts_gauge.set(ts) + } + let gauge = gauge!(metric_s, &labels); gauge.set(v); } } @@ -163,12 +196,12 @@ impl App { fn topic_filter(&self, topic: &str) -> bool { !self .config - .ignored_topics + .mqtt_ignored_topics .iter() .any(|pfx| topic.starts_with(pfx)) && !self .config - .ignored_topics_re + .mqtt_ignored_topics_re .as_ref() .is_some_and(|re| re.is_match(topic)) } @@ -193,66 +226,47 @@ impl App { } fn update_metrics(&self, raw_topic: &str, mut data: JsonValue) { + let mut labels = Vec::new(); + if !&self.topic_filter(raw_topic) { return; } - let mut topic = raw_topic.replace('/', "_"); + debug!("topic {} payload {}", raw_topic, data.pretty(2)); + let mut topic = raw_topic.to_owned(); - if topic.starts_with(&self.config.zwave_topic_prefix) { - (topic, data) = Self::normalize_zwave2mqtt(&topic, data); - } - - self.parse_metrics(data, &topic, &topic, "mqtt", &None); - } - - fn normalize_zwave2mqtt(topic: &str, mut data: JsonValue) -> (String, JsonValue) { - if topic.contains("node_info") || !topic.contains("endpoint_") { - return (topic.to_owned(), JsonValue::Null); - } - if !data.is_object() || !data.contains("value") { - return (topic.to_owned(), JsonValue::Null); - } - let parts = topic.split('/').collect_vec(); - if let Some((properties_index, _)) = parts + (topic, data) = self + .transformers .iter() - .enumerate() - .find(|(_, v)| v.starts_with("endpoint_")) - { - let parts = topic.split('/').collect_vec(); - let topic = parts[..properties_index] - .iter() - .map(|s| s.to_lowercase()) - .join("/"); - let properties = parts[properties_index..] - .iter() - .map(|s| s.to_lowercase()) - .join("_"); - let mut value = JsonValue::new_object(); - value.insert(&properties, data.remove("value")).unwrap(); + .filter(|txfm| txfm.applies_to(raw_topic)) + .fold((topic, data), |(topic, data), txfm| { + txfm.transform(&topic, data, &mut labels) + }); - return (topic, value); - } else { - error!("Unable to find propreties index topic: {topic:?} value: {data:?}"); - return (topic.to_owned(), JsonValue::Null); - } + // Should be a transform itself? + // let topic = topic.replace('/', "_"); + + let metric_path = vec![self.config.prometheus_prefix.clone()]; + self.parse_metrics(data, &topic, &topic, metric_path, &labels); } - async fn connection_task(self) { + async fn mqtt_task(self) { let mut opts = MqttOptions::new( - &self.config.client_id, - &self.config.address, - self.config.port, + &self.config.mqtt_client_id, + &self.config.mqtt_address, + self.config.mqtt_port, ); - opts.set_keep_alive(Duration::from_secs(self.config.keepalive)) + opts.set_keep_alive(Duration::from_secs(self.config.mqtt_keepalive)) .set_max_packet_size(150000, 150000); - if let (Some(username), Some(password)) = (&self.config.username, &self.config.password) { + if let (Some(username), Some(password)) = + (&self.config.mqtt_username, &self.config.mqtt_password) + { opts.set_credentials(username, password); } let (client, mut eventloop) = AsyncClient::new(opts, 10); - for topic in &self.config.topic { + for topic in &self.config.mqtt_topic { client.subscribe(topic, QoS::AtLeastOnce).await.unwrap(); } @@ -283,14 +297,40 @@ impl App { } async fn prom_task(&self) { let builder = PrometheusBuilder::new() + .with_http_listener((self.config.prometheus_address, self.config.prometheus_port)) .idle_timeout(MetricKindMask::ALL, Some(Duration::from_secs(300))); - let builder = if self.config.expose_client_id { - builder.add_global_label("client_id", &self.config.client_id) + let builder = if self.config.mqtt_expose_client_id { + builder.add_global_label("client_id", &self.config.mqtt_client_id) } else { builder }; builder.install().unwrap(); } + + fn new(config: AppConfig) -> Self { + let mut transformers: Vec> = Vec::new(); + if !config.zwave_topic_prefix.is_empty() { + transformers.push(Box::new(ZwaveNormalize::new(&config.zwave_topic_prefix))) + } + if !config.esphome_topic_prefixes.is_empty() { + transformers.push(Box::new(EsphomeNormalize::new( + &config.esphome_topic_prefixes, + ))) + } + if !config.hubitat_topic_prefixes.is_empty() { + transformers.push(Box::new(HubitatNormalize::new( + &config.hubitat_topic_prefixes, + ))) + } + if config.zigbee2mqtt_availability { + transformers.push(Box::new(Zigbee2MqttAvailability::default())) + } + debug!("Transformers enabled: {:?}", transformers); + Self { + config, + transformers, + } + } } #[tokio::main] @@ -299,19 +339,22 @@ async fn main() { let config = Config::builder() .add_source( - config::Environment::with_prefix("MQTT") + config::Environment::with_prefix("") + .prefix_separator("") .try_parsing(true) + .separator("__") .list_separator(",") - .with_list_parse_key("topic") - .with_list_parse_key("ignored_topics"), + .with_list_parse_key("mqtt_topic") + .with_list_parse_key("mqtt_ignored_topics") + .with_list_parse_key("esphome_topic_prefixes") + .with_list_parse_key("hubitat_topic_prefixes"), ) .build() .unwrap(); let app_config = config.try_deserialize().unwrap(); info!("Loaded config: {app_config:?}"); - let app = App { config: app_config }; + let app = App::new(app_config); app.prom_task().await; - let task = tokio::spawn(app.connection_task()); - task.await.unwrap(); + app.mqtt_task().await; } diff --git a/src/transforms.rs b/src/transforms.rs new file mode 100644 index 0000000..1a4658b --- /dev/null +++ b/src/transforms.rs @@ -0,0 +1,189 @@ +use itertools::Itertools; +use json::JsonValue; +use log::{debug, error}; +use std::fmt::Debug; + +pub trait Transform: Debug { + fn applies_to(&self, topic: &str) -> bool; + fn transform( + &self, + topic: &str, + data: JsonValue, + labels: &mut Vec<(String, String)>, + ) -> (String, JsonValue); +} + +#[derive(Debug)] +pub struct ZwaveNormalize { + topic_prefix: String, +} + +impl ZwaveNormalize { + pub fn new(topic_prefix: &str) -> Self { + Self { + topic_prefix: topic_prefix.to_owned(), + } + } +} + +impl Transform for ZwaveNormalize { + fn applies_to(&self, topic: &str) -> bool { + topic.starts_with(&self.topic_prefix) + } + fn transform( + &self, + topic: &str, + mut data: JsonValue, + labels: &mut Vec<(String, String)>, + ) -> (String, JsonValue) { + if topic.contains("nodeinfo") || !topic.contains("endpoint_") { + debug!("zwave topic isn't relevant"); + return (topic.to_owned(), JsonValue::Null); + } + if !data.is_object() || !data.has_key("value") { + debug!("zwave payload doesn't include value"); + return (topic.to_owned(), JsonValue::Null); + } + debug!("Normalizing zwave2mqtt payload"); + let parts = topic.split('/').collect_vec(); + if let Some((properties_index, _)) = parts + .iter() + .enumerate() + .find(|(_, v)| v.starts_with("endpoint_")) + { + let parts = topic.split('/').collect_vec(); + let topic = parts[..properties_index] + .iter() + .map(|s| s.to_lowercase()) + .join("/"); + + if let Ok(endpoint) = parts[properties_index] + .split_at("endpoint_".len()) + .1 + .parse::() + { + labels.push(("endpoint".to_owned(), endpoint.to_string())); + } else { + labels.push(("endpoint".to_owned(), parts[properties_index].to_owned())) + } + + let properties = parts[properties_index + 1..] + .iter() + .map(|s| s.to_lowercase()) + .join("_"); + let mut value = JsonValue::new_object(); + value.insert(&properties, data.remove("value")).unwrap(); + + debug!("Normalized topic: {topic} value: {value}"); + + (topic, value) + } else { + error!(target: "ZwaveNormalize", "Unable to find propreties index topic: {topic:?} value: {data:?}"); + (topic.to_owned(), JsonValue::Null) + } + } +} + +#[derive(Debug)] +pub struct EsphomeNormalize { + topic_prefixes: Vec, +} +impl EsphomeNormalize { + pub fn new(topic_prefixes: &[String]) -> Self { + Self { + topic_prefixes: topic_prefixes.to_vec(), + } + } +} +// Not sure why you'd want this, it de-normalizes the values since it moves them to separate topics per esphome node, but okay... +impl Transform for EsphomeNormalize { + fn applies_to(&self, topic: &str) -> bool { + self.topic_prefixes.iter().any(|pfx| topic.starts_with(pfx)) + } + fn transform( + &self, + topic: &str, + data: JsonValue, + labels: &mut Vec<(String, String)>, + ) -> (String, JsonValue) { + // ///state + let parts = topic.split('/').collect_vec(); + + labels.push(("esphome_type".to_owned(), parts[1].to_owned())); + labels.push(("esphome_name".to_owned(), parts[2].to_owned())); + + let topic = format!("{}/{}", parts[0].to_lowercase(), parts[1].to_lowercase()); + let mut value = JsonValue::new_object(); + value.insert(parts[parts.len() - 2], data).unwrap(); + (topic, value) + } +} + +#[derive(Debug)] +pub struct HubitatNormalize { + topic_prefixes: Vec, +} + +impl HubitatNormalize { + pub fn new(topic_prefixes: &[String]) -> Self { + Self { + topic_prefixes: topic_prefixes.to_vec(), + } + } +} + +impl Transform for HubitatNormalize { + fn applies_to(&self, topic: &str) -> bool { + self.topic_prefixes.iter().any(|pfx| topic.starts_with(pfx)) + } + fn transform( + &self, + topic: &str, + data: JsonValue, + labels: &mut Vec<(String, String)>, + ) -> (String, JsonValue) { + // hubitat/hub1/some room/temperature/value + let parts = topic.split('/').collect_vec(); + if parts.len() < 3 { + return (topic.to_owned(), data); + } + + let topic = parts[0..3].iter().map(|s| s.to_lowercase()).join("_"); + labels.push(("hubitat_hub".to_owned(), parts[1].to_owned())); + labels.push(("hubitat_room".to_owned(), parts[2].to_owned())); + + let mut value = JsonValue::new_object(); + value.insert(parts[parts.len() - 2], data).unwrap(); + (topic, value) + } +} + +const ZIGBEE2MQTT_AVAILABILITY_SUFFIX: &str = "availability"; + +#[derive(Debug, Default)] +pub struct Zigbee2MqttAvailability {} + +impl Transform for Zigbee2MqttAvailability { + fn applies_to(&self, topic: &str) -> bool { + topic.ends_with(ZIGBEE2MQTT_AVAILABILITY_SUFFIX) + } + fn transform( + &self, + topic: &str, + mut data: JsonValue, + _labels: &mut Vec<(String, String)>, + ) -> (String, JsonValue) { + if data.has_key("state") { + let topic = topic + .split_at(topic.len() - ZIGBEE2MQTT_AVAILABILITY_SUFFIX.len() - 1) + .0; + let mut value = JsonValue::new_object(); + value + .insert("zigbee_availability", data.remove("state")) + .unwrap(); + (topic.to_owned(), value) + } else { + (topic.to_owned(), data) + } + } +}