Compare commits
	
		
			2 Commits
		
	
	
		
			adf13560a8
			...
			5c96aa8c92
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 
						
						
							
						
						5c96aa8c92
	
				 | 
					
					
						|||
| 
						
						
							
						
						f713506656
	
				 | 
					
					
						
							
								
								
									
										1
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										1
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							@@ -1 +1,2 @@
 | 
				
			|||||||
/target
 | 
					/target
 | 
				
			||||||
 | 
					test
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										1
									
								
								src/lib.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										1
									
								
								src/lib.rs
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1 @@
 | 
				
			|||||||
 | 
					pub mod transforms;
 | 
				
			||||||
							
								
								
									
										309
									
								
								src/main.rs
									
									
									
									
									
								
							
							
						
						
									
										309
									
								
								src/main.rs
									
									
									
									
									
								
							@@ -1,46 +1,68 @@
 | 
				
			|||||||
use backoff::{backoff::Backoff, ExponentialBackoff};
 | 
					use backoff::backoff::Backoff;
 | 
				
			||||||
use config::Config;
 | 
					use config::Config;
 | 
				
			||||||
use core::str;
 | 
					use core::str;
 | 
				
			||||||
use itertools::Itertools;
 | 
					 | 
				
			||||||
use json::JsonValue;
 | 
					use json::JsonValue;
 | 
				
			||||||
use log::{debug, error, info, warn};
 | 
					use log::{debug, error, info};
 | 
				
			||||||
use metrics::gauge;
 | 
					use metrics::gauge;
 | 
				
			||||||
use metrics_exporter_prometheus::PrometheusBuilder;
 | 
					use metrics_exporter_prometheus::PrometheusBuilder;
 | 
				
			||||||
use metrics_util::{
 | 
					use metrics_util::MetricKindMask;
 | 
				
			||||||
    registry::{AtomicStorage, GenerationalAtomicStorage, Registry},
 | 
					use mqtt_exporter::transforms::*;
 | 
				
			||||||
    MetricKindMask,
 | 
					 | 
				
			||||||
};
 | 
					 | 
				
			||||||
use regex::Regex;
 | 
					use regex::Regex;
 | 
				
			||||||
use rumqttc::{
 | 
					use rumqttc::{
 | 
				
			||||||
    tokio_rustls::client,
 | 
					 | 
				
			||||||
    AsyncClient,
 | 
					    AsyncClient,
 | 
				
			||||||
    Event::{self, Incoming},
 | 
					    Event::{self},
 | 
				
			||||||
    MqttOptions, Packet, Publish, QoS,
 | 
					    MqttOptions, Packet, Publish, QoS,
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
use serde::Deserialize;
 | 
					use serde::Deserialize;
 | 
				
			||||||
use std::{collections::HashMap, error::Error, iter::repeat, process::exit, time::Duration};
 | 
					use std::{
 | 
				
			||||||
use tokio::{join, net::TcpStream};
 | 
					    net::{IpAddr, Ipv6Addr},
 | 
				
			||||||
 | 
					    process::exit,
 | 
				
			||||||
 | 
					    time::Duration,
 | 
				
			||||||
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					use std::time::{SystemTime, UNIX_EPOCH};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
struct ConfigDefaults;
 | 
					struct ConfigDefaults;
 | 
				
			||||||
impl ConfigDefaults {
 | 
					impl ConfigDefaults {
 | 
				
			||||||
    fn address() -> String {
 | 
					    fn mqtt_address() -> String {
 | 
				
			||||||
        "localhost".to_owned()
 | 
					        "localhost".to_owned()
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    fn port() -> u16 {
 | 
					    fn mqtt_port() -> u16 {
 | 
				
			||||||
        1883
 | 
					        1883
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    fn topic() -> Vec<String> {
 | 
					    fn mqtt_topic() -> Vec<String> {
 | 
				
			||||||
        vec!["#".to_owned()]
 | 
					        vec!["#".to_owned()]
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    fn keepalive() -> u64 {
 | 
					    fn mqtt_keepalive() -> u64 {
 | 
				
			||||||
        60
 | 
					        60
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    fn client_id() -> String {
 | 
					    fn mqtt_client_id() -> String {
 | 
				
			||||||
        "mqtt-exporter-rs".to_owned()
 | 
					        "mqtt-exporter-rs".to_owned()
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    fn expose_client_id() -> bool {
 | 
					    fn mqtt_expose_client_id() -> bool {
 | 
				
			||||||
        true
 | 
					        true
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					    fn expose_last_seen() -> bool {
 | 
				
			||||||
 | 
					        false
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					    fn prometheus_address() -> IpAddr {
 | 
				
			||||||
 | 
					        IpAddr::V6(Ipv6Addr::from_bits(0))
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					    fn prometheus_port() -> u16 {
 | 
				
			||||||
 | 
					        9000
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					    fn prometheus_prefix() -> String {
 | 
				
			||||||
 | 
					        "mqtt".to_owned()
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					    fn topic_label() -> String {
 | 
				
			||||||
 | 
					        "topic".to_owned()
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					    fn hubitat_topic_prefixes() -> Vec<String> {
 | 
				
			||||||
 | 
					        vec!["hubitat".to_owned()]
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					    fn zigbee2mqtt_availability() -> bool {
 | 
				
			||||||
 | 
					        false
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
    fn zwave_topic_prefix() -> String {
 | 
					    fn zwave_topic_prefix() -> String {
 | 
				
			||||||
        "zwave/".to_owned()
 | 
					        "zwave/".to_owned()
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
@@ -48,52 +70,69 @@ impl ConfigDefaults {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
#[derive(Debug, Deserialize)]
 | 
					#[derive(Debug, Deserialize)]
 | 
				
			||||||
struct AppConfig {
 | 
					struct AppConfig {
 | 
				
			||||||
    #[serde(default = "ConfigDefaults::address")]
 | 
					    #[serde(default = "ConfigDefaults::mqtt_address")]
 | 
				
			||||||
    address: String,
 | 
					    mqtt_address: String,
 | 
				
			||||||
    #[serde(default = "ConfigDefaults::port")]
 | 
					    #[serde(default = "ConfigDefaults::mqtt_port")]
 | 
				
			||||||
    port: u16,
 | 
					    mqtt_port: u16,
 | 
				
			||||||
    #[serde(default = "ConfigDefaults::topic")]
 | 
					    #[serde(default = "ConfigDefaults::mqtt_topic")]
 | 
				
			||||||
    topic: Vec<String>,
 | 
					    mqtt_topic: Vec<String>,
 | 
				
			||||||
    #[serde(default = "ConfigDefaults::keepalive")]
 | 
					    #[serde(default = "ConfigDefaults::mqtt_keepalive")]
 | 
				
			||||||
    keepalive: u64,
 | 
					    mqtt_keepalive: u64,
 | 
				
			||||||
    #[serde(default)]
 | 
					    #[serde(default)]
 | 
				
			||||||
    username: Option<String>,
 | 
					    mqtt_username: Option<String>,
 | 
				
			||||||
    #[serde(default)]
 | 
					    #[serde(default)]
 | 
				
			||||||
    password: Option<String>,
 | 
					    mqtt_password: Option<String>,
 | 
				
			||||||
    #[serde(default = "ConfigDefaults::client_id")]
 | 
					    #[serde(default = "ConfigDefaults::mqtt_client_id")]
 | 
				
			||||||
    client_id: String,
 | 
					    mqtt_client_id: String,
 | 
				
			||||||
    #[serde(default = "ConfigDefaults::expose_client_id")]
 | 
					    #[serde(default = "ConfigDefaults::mqtt_expose_client_id")]
 | 
				
			||||||
    expose_client_id: bool,
 | 
					    mqtt_expose_client_id: bool,
 | 
				
			||||||
 | 
					    #[serde(default = "ConfigDefaults::expose_last_seen")]
 | 
				
			||||||
 | 
					    expose_last_seen: bool,
 | 
				
			||||||
    #[serde(default)]
 | 
					    #[serde(default)]
 | 
				
			||||||
    ignored_topics: Vec<String>,
 | 
					    mqtt_ignored_topics: Vec<String>,
 | 
				
			||||||
    #[serde(with = "serde_regex", default)]
 | 
					    #[serde(with = "serde_regex", default)]
 | 
				
			||||||
    ignored_topics_re: Option<Regex>,
 | 
					    mqtt_ignored_topics_re: Option<Regex>,
 | 
				
			||||||
 | 
					    #[serde(default = "ConfigDefaults::prometheus_address")]
 | 
				
			||||||
 | 
					    prometheus_address: IpAddr,
 | 
				
			||||||
 | 
					    #[serde(default = "ConfigDefaults::prometheus_port")]
 | 
				
			||||||
 | 
					    prometheus_port: u16,
 | 
				
			||||||
 | 
					    #[serde(default = "ConfigDefaults::prometheus_prefix")]
 | 
				
			||||||
 | 
					    prometheus_prefix: String,
 | 
				
			||||||
 | 
					    #[serde(default = "ConfigDefaults::topic_label")]
 | 
				
			||||||
 | 
					    topic_label: String,
 | 
				
			||||||
 | 
					    #[serde(default)]
 | 
				
			||||||
 | 
					    esphome_topic_prefixes: Vec<String>,
 | 
				
			||||||
 | 
					    #[serde(default = "ConfigDefaults::hubitat_topic_prefixes")]
 | 
				
			||||||
 | 
					    hubitat_topic_prefixes: Vec<String>,
 | 
				
			||||||
 | 
					    #[serde(default = "ConfigDefaults::zigbee2mqtt_availability")]
 | 
				
			||||||
 | 
					    zigbee2mqtt_availability: bool,
 | 
				
			||||||
    #[serde(default = "ConfigDefaults::zwave_topic_prefix")]
 | 
					    #[serde(default = "ConfigDefaults::zwave_topic_prefix")]
 | 
				
			||||||
    zwave_topic_prefix: String,
 | 
					    zwave_topic_prefix: String,
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
struct App {
 | 
					struct App {
 | 
				
			||||||
    config: AppConfig,
 | 
					    config: AppConfig,
 | 
				
			||||||
 | 
					    transformers: Vec<Box<dyn Transform>>,
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					fn parse_number(v: &str) -> Option<f64> {
 | 
				
			||||||
 | 
					    if let Ok(num) = v.parse::<f64>() {
 | 
				
			||||||
 | 
					        Some(num)
 | 
				
			||||||
 | 
					    } else if v.eq_ignore_ascii_case("on")
 | 
				
			||||||
 | 
					        || v.eq_ignore_ascii_case("true")
 | 
				
			||||||
 | 
					        || v.eq_ignore_ascii_case("online")
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					        Some(1.0)
 | 
				
			||||||
 | 
					    } else if v.eq_ignore_ascii_case("off")
 | 
				
			||||||
 | 
					        || v.eq_ignore_ascii_case("false")
 | 
				
			||||||
 | 
					        || v.eq_ignore_ascii_case("offline")
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					        Some(0.0)
 | 
				
			||||||
 | 
					    } else {
 | 
				
			||||||
 | 
					        None
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
impl App {
 | 
					impl App {
 | 
				
			||||||
    fn parse_number(v: &str) -> Option<f64> {
 | 
					 | 
				
			||||||
        if let Ok(num) = v.parse::<f64>() {
 | 
					 | 
				
			||||||
            Some(num)
 | 
					 | 
				
			||||||
        } else if v.eq_ignore_ascii_case("on")
 | 
					 | 
				
			||||||
            || v.eq_ignore_ascii_case("true")
 | 
					 | 
				
			||||||
            || v.eq_ignore_ascii_case("online")
 | 
					 | 
				
			||||||
        {
 | 
					 | 
				
			||||||
            Some(1.0)
 | 
					 | 
				
			||||||
        } else if v.eq_ignore_ascii_case("off")
 | 
					 | 
				
			||||||
            || v.eq_ignore_ascii_case("false")
 | 
					 | 
				
			||||||
            || v.eq_ignore_ascii_case("offline")
 | 
					 | 
				
			||||||
        {
 | 
					 | 
				
			||||||
            Some(0.0)
 | 
					 | 
				
			||||||
        } else {
 | 
					 | 
				
			||||||
            None
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    fn parse_metric(value: &JsonValue) -> Option<f64> {
 | 
					    fn parse_metric(value: &JsonValue) -> Option<f64> {
 | 
				
			||||||
        match value {
 | 
					        match value {
 | 
				
			||||||
            JsonValue::Null => None,
 | 
					            JsonValue::Null => None,
 | 
				
			||||||
@@ -105,8 +144,8 @@ impl App {
 | 
				
			|||||||
                }
 | 
					                }
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
            JsonValue::Number(v) => Some((*v).into()),
 | 
					            JsonValue::Number(v) => Some((*v).into()),
 | 
				
			||||||
            JsonValue::Short(v) => Self::parse_number(v.as_str()),
 | 
					            JsonValue::Short(v) => parse_number(v.as_str()),
 | 
				
			||||||
            JsonValue::String(v) => Self::parse_number(v.as_str()),
 | 
					            JsonValue::String(v) => parse_number(v.as_str()),
 | 
				
			||||||
            _ => None,
 | 
					            _ => None,
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
@@ -115,45 +154,39 @@ impl App {
 | 
				
			|||||||
        &self,
 | 
					        &self,
 | 
				
			||||||
        mut data: JsonValue,
 | 
					        mut data: JsonValue,
 | 
				
			||||||
        topic: &str,
 | 
					        topic: &str,
 | 
				
			||||||
        original_topic: &str,
 | 
					        _original_topic: &str,
 | 
				
			||||||
        prefix: &str,
 | 
					        metric_path: Vec<String>,
 | 
				
			||||||
        labels: &Option<Vec<(String, String)>>,
 | 
					        labels: &Vec<(String, String)>,
 | 
				
			||||||
    ) {
 | 
					    ) {
 | 
				
			||||||
        match data {
 | 
					        match data {
 | 
				
			||||||
            JsonValue::Array(_) => {
 | 
					            JsonValue::Array(_) => {
 | 
				
			||||||
                for (metric, child) in data.members_mut().enumerate() {
 | 
					                for (metric, child) in data.members_mut().enumerate() {
 | 
				
			||||||
                    let metric = metric.to_string();
 | 
					                    let mut metric_path = metric_path.clone();
 | 
				
			||||||
                    self.parse_metrics(
 | 
					                    metric_path.push(metric.to_string());
 | 
				
			||||||
                        child.take(),
 | 
					                    self.parse_metrics(child.take(), topic, _original_topic, metric_path, labels)
 | 
				
			||||||
                        topic,
 | 
					 | 
				
			||||||
                        original_topic,
 | 
					 | 
				
			||||||
                        &format!("{prefix}{metric}_"),
 | 
					 | 
				
			||||||
                        labels,
 | 
					 | 
				
			||||||
                    )
 | 
					 | 
				
			||||||
                }
 | 
					                }
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
            JsonValue::Object(_) => {
 | 
					            JsonValue::Object(_) => {
 | 
				
			||||||
                for (metric, child) in data.entries_mut() {
 | 
					                for (metric, child) in data.entries_mut() {
 | 
				
			||||||
                    self.parse_metrics(
 | 
					                    let mut metric_path = metric_path.clone();
 | 
				
			||||||
                        child.take(),
 | 
					                    metric_path.push(metric.to_string());
 | 
				
			||||||
                        topic,
 | 
					                    self.parse_metrics(child.take(), topic, _original_topic, metric_path, labels)
 | 
				
			||||||
                        original_topic,
 | 
					 | 
				
			||||||
                        &format!("{prefix}_{metric}"),
 | 
					 | 
				
			||||||
                        labels,
 | 
					 | 
				
			||||||
                    )
 | 
					 | 
				
			||||||
                }
 | 
					                }
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
            _ => {
 | 
					            _ => {
 | 
				
			||||||
                if let Some(v) = Self::parse_metric(&data) {
 | 
					                if let Some(v) = Self::parse_metric(&data) {
 | 
				
			||||||
                    debug!("prometheus metric: {prefix} value: {v}");
 | 
					                    let metric_s = metric_path.join("_");
 | 
				
			||||||
                    let mut labels = if let Some(labels) = labels {
 | 
					                    let mut labels = labels.clone();
 | 
				
			||||||
                        labels.clone()
 | 
					                    labels.push((self.config.topic_label.to_owned(), topic.to_owned()));
 | 
				
			||||||
                    } else {
 | 
					                    debug!("metric: {metric_s} value: {v} labels: {labels:?}");
 | 
				
			||||||
                        Vec::new()
 | 
					
 | 
				
			||||||
                    };
 | 
					                    if self.config.expose_last_seen {
 | 
				
			||||||
                    labels.push(("topic".to_owned(), topic.to_owned()));
 | 
					                        let metric_ts = format!("{}_ts", metric_s);
 | 
				
			||||||
                    let owned_pfx = prefix.to_owned();
 | 
					                        let ts_gauge = gauge!(metric_ts, &labels);
 | 
				
			||||||
                    let gauge = gauge!(owned_pfx, &labels);
 | 
					                        let ts = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
 | 
				
			||||||
 | 
					                        ts_gauge.set(ts)
 | 
				
			||||||
 | 
					                    }
 | 
				
			||||||
 | 
					                    let gauge = gauge!(metric_s, &labels);
 | 
				
			||||||
                    gauge.set(v);
 | 
					                    gauge.set(v);
 | 
				
			||||||
                }
 | 
					                }
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
@@ -163,12 +196,12 @@ impl App {
 | 
				
			|||||||
    fn topic_filter(&self, topic: &str) -> bool {
 | 
					    fn topic_filter(&self, topic: &str) -> bool {
 | 
				
			||||||
        !self
 | 
					        !self
 | 
				
			||||||
            .config
 | 
					            .config
 | 
				
			||||||
            .ignored_topics
 | 
					            .mqtt_ignored_topics
 | 
				
			||||||
            .iter()
 | 
					            .iter()
 | 
				
			||||||
            .any(|pfx| topic.starts_with(pfx))
 | 
					            .any(|pfx| topic.starts_with(pfx))
 | 
				
			||||||
            && !self
 | 
					            && !self
 | 
				
			||||||
                .config
 | 
					                .config
 | 
				
			||||||
                .ignored_topics_re
 | 
					                .mqtt_ignored_topics_re
 | 
				
			||||||
                .as_ref()
 | 
					                .as_ref()
 | 
				
			||||||
                .is_some_and(|re| re.is_match(topic))
 | 
					                .is_some_and(|re| re.is_match(topic))
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
@@ -193,66 +226,47 @@ impl App {
 | 
				
			|||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    fn update_metrics(&self, raw_topic: &str, mut data: JsonValue) {
 | 
					    fn update_metrics(&self, raw_topic: &str, mut data: JsonValue) {
 | 
				
			||||||
 | 
					        let mut labels = Vec::new();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        if !&self.topic_filter(raw_topic) {
 | 
					        if !&self.topic_filter(raw_topic) {
 | 
				
			||||||
            return;
 | 
					            return;
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
        let mut topic = raw_topic.replace('/', "_");
 | 
					        debug!("topic {} payload {}", raw_topic, data.pretty(2));
 | 
				
			||||||
 | 
					        let mut topic = raw_topic.to_owned();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        if topic.starts_with(&self.config.zwave_topic_prefix) {
 | 
					        (topic, data) = self
 | 
				
			||||||
            (topic, data) = Self::normalize_zwave2mqtt(&topic, data);
 | 
					            .transformers
 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        self.parse_metrics(data, &topic, &topic, "mqtt", &None);
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    fn normalize_zwave2mqtt(topic: &str, mut data: JsonValue) -> (String, JsonValue) {
 | 
					 | 
				
			||||||
        if topic.contains("node_info") || !topic.contains("endpoint_") {
 | 
					 | 
				
			||||||
            return (topic.to_owned(), JsonValue::Null);
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
        if !data.is_object() || !data.contains("value") {
 | 
					 | 
				
			||||||
            return (topic.to_owned(), JsonValue::Null);
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
        let parts = topic.split('/').collect_vec();
 | 
					 | 
				
			||||||
        if let Some((properties_index, _)) = parts
 | 
					 | 
				
			||||||
            .iter()
 | 
					            .iter()
 | 
				
			||||||
            .enumerate()
 | 
					            .filter(|txfm| txfm.applies_to(raw_topic))
 | 
				
			||||||
            .find(|(_, v)| v.starts_with("endpoint_"))
 | 
					            .fold((topic, data), |(topic, data), txfm| {
 | 
				
			||||||
        {
 | 
					                txfm.transform(&topic, data, &mut labels)
 | 
				
			||||||
            let parts = topic.split('/').collect_vec();
 | 
					            });
 | 
				
			||||||
            let topic = parts[..properties_index]
 | 
					 | 
				
			||||||
                .iter()
 | 
					 | 
				
			||||||
                .map(|s| s.to_lowercase())
 | 
					 | 
				
			||||||
                .join("/");
 | 
					 | 
				
			||||||
            let properties = parts[properties_index..]
 | 
					 | 
				
			||||||
                .iter()
 | 
					 | 
				
			||||||
                .map(|s| s.to_lowercase())
 | 
					 | 
				
			||||||
                .join("_");
 | 
					 | 
				
			||||||
            let mut value = JsonValue::new_object();
 | 
					 | 
				
			||||||
            value.insert(&properties, data.remove("value")).unwrap();
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
            return (topic, value);
 | 
					        // Should be a transform itself?
 | 
				
			||||||
        } else {
 | 
					        // let topic = topic.replace('/', "_");
 | 
				
			||||||
            error!("Unable to find propreties index topic: {topic:?} value: {data:?}");
 | 
					
 | 
				
			||||||
            return (topic.to_owned(), JsonValue::Null);
 | 
					        let metric_path = vec![self.config.prometheus_prefix.clone()];
 | 
				
			||||||
        }
 | 
					        self.parse_metrics(data, &topic, &topic, metric_path, &labels);
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    async fn connection_task(self) {
 | 
					    async fn mqtt_task(self) {
 | 
				
			||||||
        let mut opts = MqttOptions::new(
 | 
					        let mut opts = MqttOptions::new(
 | 
				
			||||||
            &self.config.client_id,
 | 
					            &self.config.mqtt_client_id,
 | 
				
			||||||
            &self.config.address,
 | 
					            &self.config.mqtt_address,
 | 
				
			||||||
            self.config.port,
 | 
					            self.config.mqtt_port,
 | 
				
			||||||
        );
 | 
					        );
 | 
				
			||||||
        opts.set_keep_alive(Duration::from_secs(self.config.keepalive))
 | 
					        opts.set_keep_alive(Duration::from_secs(self.config.mqtt_keepalive))
 | 
				
			||||||
            .set_max_packet_size(150000, 150000);
 | 
					            .set_max_packet_size(150000, 150000);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        if let (Some(username), Some(password)) = (&self.config.username, &self.config.password) {
 | 
					        if let (Some(username), Some(password)) =
 | 
				
			||||||
 | 
					            (&self.config.mqtt_username, &self.config.mqtt_password)
 | 
				
			||||||
 | 
					        {
 | 
				
			||||||
            opts.set_credentials(username, password);
 | 
					            opts.set_credentials(username, password);
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        let (client, mut eventloop) = AsyncClient::new(opts, 10);
 | 
					        let (client, mut eventloop) = AsyncClient::new(opts, 10);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        for topic in &self.config.topic {
 | 
					        for topic in &self.config.mqtt_topic {
 | 
				
			||||||
            client.subscribe(topic, QoS::AtLeastOnce).await.unwrap();
 | 
					            client.subscribe(topic, QoS::AtLeastOnce).await.unwrap();
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -283,14 +297,40 @@ impl App {
 | 
				
			|||||||
    }
 | 
					    }
 | 
				
			||||||
    async fn prom_task(&self) {
 | 
					    async fn prom_task(&self) {
 | 
				
			||||||
        let builder = PrometheusBuilder::new()
 | 
					        let builder = PrometheusBuilder::new()
 | 
				
			||||||
 | 
					            .with_http_listener((self.config.prometheus_address, self.config.prometheus_port))
 | 
				
			||||||
            .idle_timeout(MetricKindMask::ALL, Some(Duration::from_secs(300)));
 | 
					            .idle_timeout(MetricKindMask::ALL, Some(Duration::from_secs(300)));
 | 
				
			||||||
        let builder = if self.config.expose_client_id {
 | 
					        let builder = if self.config.mqtt_expose_client_id {
 | 
				
			||||||
            builder.add_global_label("client_id", &self.config.client_id)
 | 
					            builder.add_global_label("client_id", &self.config.mqtt_client_id)
 | 
				
			||||||
        } else {
 | 
					        } else {
 | 
				
			||||||
            builder
 | 
					            builder
 | 
				
			||||||
        };
 | 
					        };
 | 
				
			||||||
        builder.install().unwrap();
 | 
					        builder.install().unwrap();
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    fn new(config: AppConfig) -> Self {
 | 
				
			||||||
 | 
					        let mut transformers: Vec<Box<dyn Transform>> = Vec::new();
 | 
				
			||||||
 | 
					        if !config.zwave_topic_prefix.is_empty() {
 | 
				
			||||||
 | 
					            transformers.push(Box::new(ZwaveNormalize::new(&config.zwave_topic_prefix)))
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					        if !config.esphome_topic_prefixes.is_empty() {
 | 
				
			||||||
 | 
					            transformers.push(Box::new(EsphomeNormalize::new(
 | 
				
			||||||
 | 
					                &config.esphome_topic_prefixes,
 | 
				
			||||||
 | 
					            )))
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					        if !config.hubitat_topic_prefixes.is_empty() {
 | 
				
			||||||
 | 
					            transformers.push(Box::new(HubitatNormalize::new(
 | 
				
			||||||
 | 
					                &config.hubitat_topic_prefixes,
 | 
				
			||||||
 | 
					            )))
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					        if config.zigbee2mqtt_availability {
 | 
				
			||||||
 | 
					            transformers.push(Box::new(Zigbee2MqttAvailability::default()))
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					        debug!("Transformers enabled: {:?}", transformers);
 | 
				
			||||||
 | 
					        Self {
 | 
				
			||||||
 | 
					            config,
 | 
				
			||||||
 | 
					            transformers,
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#[tokio::main]
 | 
					#[tokio::main]
 | 
				
			||||||
@@ -299,19 +339,22 @@ async fn main() {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    let config = Config::builder()
 | 
					    let config = Config::builder()
 | 
				
			||||||
        .add_source(
 | 
					        .add_source(
 | 
				
			||||||
            config::Environment::with_prefix("MQTT")
 | 
					            config::Environment::with_prefix("")
 | 
				
			||||||
 | 
					                .prefix_separator("")
 | 
				
			||||||
                .try_parsing(true)
 | 
					                .try_parsing(true)
 | 
				
			||||||
 | 
					                .separator("__")
 | 
				
			||||||
                .list_separator(",")
 | 
					                .list_separator(",")
 | 
				
			||||||
                .with_list_parse_key("topic")
 | 
					                .with_list_parse_key("mqtt_topic")
 | 
				
			||||||
                .with_list_parse_key("ignored_topics"),
 | 
					                .with_list_parse_key("mqtt_ignored_topics")
 | 
				
			||||||
 | 
					                .with_list_parse_key("esphome_topic_prefixes")
 | 
				
			||||||
 | 
					                .with_list_parse_key("hubitat_topic_prefixes"),
 | 
				
			||||||
        )
 | 
					        )
 | 
				
			||||||
        .build()
 | 
					        .build()
 | 
				
			||||||
        .unwrap();
 | 
					        .unwrap();
 | 
				
			||||||
    let app_config = config.try_deserialize().unwrap();
 | 
					    let app_config = config.try_deserialize().unwrap();
 | 
				
			||||||
    info!("Loaded config: {app_config:?}");
 | 
					    info!("Loaded config: {app_config:?}");
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    let app = App { config: app_config };
 | 
					    let app = App::new(app_config);
 | 
				
			||||||
    app.prom_task().await;
 | 
					    app.prom_task().await;
 | 
				
			||||||
    let task = tokio::spawn(app.connection_task());
 | 
					    app.mqtt_task().await;
 | 
				
			||||||
    task.await.unwrap();
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										189
									
								
								src/transforms.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										189
									
								
								src/transforms.rs
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,189 @@
 | 
				
			|||||||
 | 
					use itertools::Itertools;
 | 
				
			||||||
 | 
					use json::JsonValue;
 | 
				
			||||||
 | 
					use log::{debug, error};
 | 
				
			||||||
 | 
					use std::fmt::Debug;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					pub trait Transform: Debug {
 | 
				
			||||||
 | 
					    fn applies_to(&self, topic: &str) -> bool;
 | 
				
			||||||
 | 
					    fn transform(
 | 
				
			||||||
 | 
					        &self,
 | 
				
			||||||
 | 
					        topic: &str,
 | 
				
			||||||
 | 
					        data: JsonValue,
 | 
				
			||||||
 | 
					        labels: &mut Vec<(String, String)>,
 | 
				
			||||||
 | 
					    ) -> (String, JsonValue);
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#[derive(Debug)]
 | 
				
			||||||
 | 
					pub struct ZwaveNormalize {
 | 
				
			||||||
 | 
					    topic_prefix: String,
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					impl ZwaveNormalize {
 | 
				
			||||||
 | 
					    pub fn new(topic_prefix: &str) -> Self {
 | 
				
			||||||
 | 
					        Self {
 | 
				
			||||||
 | 
					            topic_prefix: topic_prefix.to_owned(),
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					impl Transform for ZwaveNormalize {
 | 
				
			||||||
 | 
					    fn applies_to(&self, topic: &str) -> bool {
 | 
				
			||||||
 | 
					        topic.starts_with(&self.topic_prefix)
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					    fn transform(
 | 
				
			||||||
 | 
					        &self,
 | 
				
			||||||
 | 
					        topic: &str,
 | 
				
			||||||
 | 
					        mut data: JsonValue,
 | 
				
			||||||
 | 
					        labels: &mut Vec<(String, String)>,
 | 
				
			||||||
 | 
					    ) -> (String, JsonValue) {
 | 
				
			||||||
 | 
					        if topic.contains("nodeinfo") || !topic.contains("endpoint_") {
 | 
				
			||||||
 | 
					            debug!("zwave topic isn't relevant");
 | 
				
			||||||
 | 
					            return (topic.to_owned(), JsonValue::Null);
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					        if !data.is_object() || !data.has_key("value") {
 | 
				
			||||||
 | 
					            debug!("zwave payload doesn't include value");
 | 
				
			||||||
 | 
					            return (topic.to_owned(), JsonValue::Null);
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					        debug!("Normalizing zwave2mqtt payload");
 | 
				
			||||||
 | 
					        let parts = topic.split('/').collect_vec();
 | 
				
			||||||
 | 
					        if let Some((properties_index, _)) = parts
 | 
				
			||||||
 | 
					            .iter()
 | 
				
			||||||
 | 
					            .enumerate()
 | 
				
			||||||
 | 
					            .find(|(_, v)| v.starts_with("endpoint_"))
 | 
				
			||||||
 | 
					        {
 | 
				
			||||||
 | 
					            let parts = topic.split('/').collect_vec();
 | 
				
			||||||
 | 
					            let topic = parts[..properties_index]
 | 
				
			||||||
 | 
					                .iter()
 | 
				
			||||||
 | 
					                .map(|s| s.to_lowercase())
 | 
				
			||||||
 | 
					                .join("/");
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            if let Ok(endpoint) = parts[properties_index]
 | 
				
			||||||
 | 
					                .split_at("endpoint_".len())
 | 
				
			||||||
 | 
					                .1
 | 
				
			||||||
 | 
					                .parse::<u64>()
 | 
				
			||||||
 | 
					            {
 | 
				
			||||||
 | 
					                labels.push(("endpoint".to_owned(), endpoint.to_string()));
 | 
				
			||||||
 | 
					            } else {
 | 
				
			||||||
 | 
					                labels.push(("endpoint".to_owned(), parts[properties_index].to_owned()))
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            let properties = parts[properties_index + 1..]
 | 
				
			||||||
 | 
					                .iter()
 | 
				
			||||||
 | 
					                .map(|s| s.to_lowercase())
 | 
				
			||||||
 | 
					                .join("_");
 | 
				
			||||||
 | 
					            let mut value = JsonValue::new_object();
 | 
				
			||||||
 | 
					            value.insert(&properties, data.remove("value")).unwrap();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            debug!("Normalized topic: {topic} value: {value}");
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            (topic, value)
 | 
				
			||||||
 | 
					        } else {
 | 
				
			||||||
 | 
					            error!(target: "ZwaveNormalize", "Unable to find propreties index topic: {topic:?} value: {data:?}");
 | 
				
			||||||
 | 
					            (topic.to_owned(), JsonValue::Null)
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#[derive(Debug)]
 | 
				
			||||||
 | 
					pub struct EsphomeNormalize {
 | 
				
			||||||
 | 
					    topic_prefixes: Vec<String>,
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					impl EsphomeNormalize {
 | 
				
			||||||
 | 
					    pub fn new(topic_prefixes: &[String]) -> Self {
 | 
				
			||||||
 | 
					        Self {
 | 
				
			||||||
 | 
					            topic_prefixes: topic_prefixes.to_vec(),
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					// Not sure why you'd want this, it de-normalizes the values since it moves them to separate topics per esphome node, but okay...
 | 
				
			||||||
 | 
					impl Transform for EsphomeNormalize {
 | 
				
			||||||
 | 
					    fn applies_to(&self, topic: &str) -> bool {
 | 
				
			||||||
 | 
					        self.topic_prefixes.iter().any(|pfx| topic.starts_with(pfx))
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					    fn transform(
 | 
				
			||||||
 | 
					        &self,
 | 
				
			||||||
 | 
					        topic: &str,
 | 
				
			||||||
 | 
					        data: JsonValue,
 | 
				
			||||||
 | 
					        labels: &mut Vec<(String, String)>,
 | 
				
			||||||
 | 
					    ) -> (String, JsonValue) {
 | 
				
			||||||
 | 
					        // <TOPIC_PREFIX>/<COMPONENT_TYPE>/<COMPONENT_NAME>/state
 | 
				
			||||||
 | 
					        let parts = topic.split('/').collect_vec();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        labels.push(("esphome_type".to_owned(), parts[1].to_owned()));
 | 
				
			||||||
 | 
					        labels.push(("esphome_name".to_owned(), parts[2].to_owned()));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        let topic = format!("{}/{}", parts[0].to_lowercase(), parts[1].to_lowercase());
 | 
				
			||||||
 | 
					        let mut value = JsonValue::new_object();
 | 
				
			||||||
 | 
					        value.insert(parts[parts.len() - 2], data).unwrap();
 | 
				
			||||||
 | 
					        (topic, value)
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#[derive(Debug)]
 | 
				
			||||||
 | 
					pub struct HubitatNormalize {
 | 
				
			||||||
 | 
					    topic_prefixes: Vec<String>,
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					impl HubitatNormalize {
 | 
				
			||||||
 | 
					    pub fn new(topic_prefixes: &[String]) -> Self {
 | 
				
			||||||
 | 
					        Self {
 | 
				
			||||||
 | 
					            topic_prefixes: topic_prefixes.to_vec(),
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					impl Transform for HubitatNormalize {
 | 
				
			||||||
 | 
					    fn applies_to(&self, topic: &str) -> bool {
 | 
				
			||||||
 | 
					        self.topic_prefixes.iter().any(|pfx| topic.starts_with(pfx))
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					    fn transform(
 | 
				
			||||||
 | 
					        &self,
 | 
				
			||||||
 | 
					        topic: &str,
 | 
				
			||||||
 | 
					        data: JsonValue,
 | 
				
			||||||
 | 
					        labels: &mut Vec<(String, String)>,
 | 
				
			||||||
 | 
					    ) -> (String, JsonValue) {
 | 
				
			||||||
 | 
					        // hubitat/hub1/some room/temperature/value
 | 
				
			||||||
 | 
					        let parts = topic.split('/').collect_vec();
 | 
				
			||||||
 | 
					        if parts.len() < 3 {
 | 
				
			||||||
 | 
					            return (topic.to_owned(), data);
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        let topic = parts[0..3].iter().map(|s| s.to_lowercase()).join("_");
 | 
				
			||||||
 | 
					        labels.push(("hubitat_hub".to_owned(), parts[1].to_owned()));
 | 
				
			||||||
 | 
					        labels.push(("hubitat_room".to_owned(), parts[2].to_owned()));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        let mut value = JsonValue::new_object();
 | 
				
			||||||
 | 
					        value.insert(parts[parts.len() - 2], data).unwrap();
 | 
				
			||||||
 | 
					        (topic, value)
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					const ZIGBEE2MQTT_AVAILABILITY_SUFFIX: &str = "availability";
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#[derive(Debug, Default)]
 | 
				
			||||||
 | 
					pub struct Zigbee2MqttAvailability {}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					impl Transform for Zigbee2MqttAvailability {
 | 
				
			||||||
 | 
					    fn applies_to(&self, topic: &str) -> bool {
 | 
				
			||||||
 | 
					        topic.ends_with(ZIGBEE2MQTT_AVAILABILITY_SUFFIX)
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					    fn transform(
 | 
				
			||||||
 | 
					        &self,
 | 
				
			||||||
 | 
					        topic: &str,
 | 
				
			||||||
 | 
					        mut data: JsonValue,
 | 
				
			||||||
 | 
					        _labels: &mut Vec<(String, String)>,
 | 
				
			||||||
 | 
					    ) -> (String, JsonValue) {
 | 
				
			||||||
 | 
					        if data.has_key("state") {
 | 
				
			||||||
 | 
					            let topic = topic
 | 
				
			||||||
 | 
					                .split_at(topic.len() - ZIGBEE2MQTT_AVAILABILITY_SUFFIX.len() - 1)
 | 
				
			||||||
 | 
					                .0;
 | 
				
			||||||
 | 
					            let mut value = JsonValue::new_object();
 | 
				
			||||||
 | 
					            value
 | 
				
			||||||
 | 
					                .insert("zigbee_availability", data.remove("state"))
 | 
				
			||||||
 | 
					                .unwrap();
 | 
				
			||||||
 | 
					            (topic.to_owned(), value)
 | 
				
			||||||
 | 
					        } else {
 | 
				
			||||||
 | 
					            (topic.to_owned(), data)
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
		Reference in New Issue
	
	Block a user