Mostly feautre partiy with the python version
Not well tested
This commit is contained in:
parent
f713506656
commit
5c96aa8c92
1
src/lib.rs
Normal file
1
src/lib.rs
Normal file
@ -0,0 +1 @@
|
|||||||
|
pub mod transforms;
|
309
src/main.rs
309
src/main.rs
@ -1,46 +1,68 @@
|
|||||||
use backoff::{backoff::Backoff, ExponentialBackoff};
|
use backoff::backoff::Backoff;
|
||||||
use config::Config;
|
use config::Config;
|
||||||
use core::str;
|
use core::str;
|
||||||
use itertools::Itertools;
|
|
||||||
use json::JsonValue;
|
use json::JsonValue;
|
||||||
use log::{debug, error, info, warn};
|
use log::{debug, error, info};
|
||||||
use metrics::gauge;
|
use metrics::gauge;
|
||||||
use metrics_exporter_prometheus::PrometheusBuilder;
|
use metrics_exporter_prometheus::PrometheusBuilder;
|
||||||
use metrics_util::{
|
use metrics_util::MetricKindMask;
|
||||||
registry::{AtomicStorage, GenerationalAtomicStorage, Registry},
|
use mqtt_exporter::transforms::*;
|
||||||
MetricKindMask,
|
|
||||||
};
|
|
||||||
use regex::Regex;
|
use regex::Regex;
|
||||||
use rumqttc::{
|
use rumqttc::{
|
||||||
tokio_rustls::client,
|
|
||||||
AsyncClient,
|
AsyncClient,
|
||||||
Event::{self, Incoming},
|
Event::{self},
|
||||||
MqttOptions, Packet, Publish, QoS,
|
MqttOptions, Packet, Publish, QoS,
|
||||||
};
|
};
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use std::{collections::HashMap, error::Error, iter::repeat, process::exit, time::Duration};
|
use std::{
|
||||||
use tokio::{join, net::TcpStream};
|
net::{IpAddr, Ipv6Addr},
|
||||||
|
process::exit,
|
||||||
|
time::Duration,
|
||||||
|
};
|
||||||
|
|
||||||
|
use std::time::{SystemTime, UNIX_EPOCH};
|
||||||
|
|
||||||
struct ConfigDefaults;
|
struct ConfigDefaults;
|
||||||
impl ConfigDefaults {
|
impl ConfigDefaults {
|
||||||
fn address() -> String {
|
fn mqtt_address() -> String {
|
||||||
"localhost".to_owned()
|
"localhost".to_owned()
|
||||||
}
|
}
|
||||||
fn port() -> u16 {
|
fn mqtt_port() -> u16 {
|
||||||
1883
|
1883
|
||||||
}
|
}
|
||||||
fn topic() -> Vec<String> {
|
fn mqtt_topic() -> Vec<String> {
|
||||||
vec!["#".to_owned()]
|
vec!["#".to_owned()]
|
||||||
}
|
}
|
||||||
fn keepalive() -> u64 {
|
fn mqtt_keepalive() -> u64 {
|
||||||
60
|
60
|
||||||
}
|
}
|
||||||
fn client_id() -> String {
|
fn mqtt_client_id() -> String {
|
||||||
"mqtt-exporter-rs".to_owned()
|
"mqtt-exporter-rs".to_owned()
|
||||||
}
|
}
|
||||||
fn expose_client_id() -> bool {
|
fn mqtt_expose_client_id() -> bool {
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
|
fn expose_last_seen() -> bool {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
fn prometheus_address() -> IpAddr {
|
||||||
|
IpAddr::V6(Ipv6Addr::from_bits(0))
|
||||||
|
}
|
||||||
|
fn prometheus_port() -> u16 {
|
||||||
|
9000
|
||||||
|
}
|
||||||
|
fn prometheus_prefix() -> String {
|
||||||
|
"mqtt".to_owned()
|
||||||
|
}
|
||||||
|
fn topic_label() -> String {
|
||||||
|
"topic".to_owned()
|
||||||
|
}
|
||||||
|
fn hubitat_topic_prefixes() -> Vec<String> {
|
||||||
|
vec!["hubitat".to_owned()]
|
||||||
|
}
|
||||||
|
fn zigbee2mqtt_availability() -> bool {
|
||||||
|
false
|
||||||
|
}
|
||||||
fn zwave_topic_prefix() -> String {
|
fn zwave_topic_prefix() -> String {
|
||||||
"zwave/".to_owned()
|
"zwave/".to_owned()
|
||||||
}
|
}
|
||||||
@ -48,52 +70,69 @@ impl ConfigDefaults {
|
|||||||
|
|
||||||
#[derive(Debug, Deserialize)]
|
#[derive(Debug, Deserialize)]
|
||||||
struct AppConfig {
|
struct AppConfig {
|
||||||
#[serde(default = "ConfigDefaults::address")]
|
#[serde(default = "ConfigDefaults::mqtt_address")]
|
||||||
address: String,
|
mqtt_address: String,
|
||||||
#[serde(default = "ConfigDefaults::port")]
|
#[serde(default = "ConfigDefaults::mqtt_port")]
|
||||||
port: u16,
|
mqtt_port: u16,
|
||||||
#[serde(default = "ConfigDefaults::topic")]
|
#[serde(default = "ConfigDefaults::mqtt_topic")]
|
||||||
topic: Vec<String>,
|
mqtt_topic: Vec<String>,
|
||||||
#[serde(default = "ConfigDefaults::keepalive")]
|
#[serde(default = "ConfigDefaults::mqtt_keepalive")]
|
||||||
keepalive: u64,
|
mqtt_keepalive: u64,
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
username: Option<String>,
|
mqtt_username: Option<String>,
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
password: Option<String>,
|
mqtt_password: Option<String>,
|
||||||
#[serde(default = "ConfigDefaults::client_id")]
|
#[serde(default = "ConfigDefaults::mqtt_client_id")]
|
||||||
client_id: String,
|
mqtt_client_id: String,
|
||||||
#[serde(default = "ConfigDefaults::expose_client_id")]
|
#[serde(default = "ConfigDefaults::mqtt_expose_client_id")]
|
||||||
expose_client_id: bool,
|
mqtt_expose_client_id: bool,
|
||||||
|
#[serde(default = "ConfigDefaults::expose_last_seen")]
|
||||||
|
expose_last_seen: bool,
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
ignored_topics: Vec<String>,
|
mqtt_ignored_topics: Vec<String>,
|
||||||
#[serde(with = "serde_regex", default)]
|
#[serde(with = "serde_regex", default)]
|
||||||
ignored_topics_re: Option<Regex>,
|
mqtt_ignored_topics_re: Option<Regex>,
|
||||||
|
#[serde(default = "ConfigDefaults::prometheus_address")]
|
||||||
|
prometheus_address: IpAddr,
|
||||||
|
#[serde(default = "ConfigDefaults::prometheus_port")]
|
||||||
|
prometheus_port: u16,
|
||||||
|
#[serde(default = "ConfigDefaults::prometheus_prefix")]
|
||||||
|
prometheus_prefix: String,
|
||||||
|
#[serde(default = "ConfigDefaults::topic_label")]
|
||||||
|
topic_label: String,
|
||||||
|
#[serde(default)]
|
||||||
|
esphome_topic_prefixes: Vec<String>,
|
||||||
|
#[serde(default = "ConfigDefaults::hubitat_topic_prefixes")]
|
||||||
|
hubitat_topic_prefixes: Vec<String>,
|
||||||
|
#[serde(default = "ConfigDefaults::zigbee2mqtt_availability")]
|
||||||
|
zigbee2mqtt_availability: bool,
|
||||||
#[serde(default = "ConfigDefaults::zwave_topic_prefix")]
|
#[serde(default = "ConfigDefaults::zwave_topic_prefix")]
|
||||||
zwave_topic_prefix: String,
|
zwave_topic_prefix: String,
|
||||||
}
|
}
|
||||||
struct App {
|
struct App {
|
||||||
config: AppConfig,
|
config: AppConfig,
|
||||||
|
transformers: Vec<Box<dyn Transform>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn parse_number(v: &str) -> Option<f64> {
|
||||||
|
if let Ok(num) = v.parse::<f64>() {
|
||||||
|
Some(num)
|
||||||
|
} else if v.eq_ignore_ascii_case("on")
|
||||||
|
|| v.eq_ignore_ascii_case("true")
|
||||||
|
|| v.eq_ignore_ascii_case("online")
|
||||||
|
{
|
||||||
|
Some(1.0)
|
||||||
|
} else if v.eq_ignore_ascii_case("off")
|
||||||
|
|| v.eq_ignore_ascii_case("false")
|
||||||
|
|| v.eq_ignore_ascii_case("offline")
|
||||||
|
{
|
||||||
|
Some(0.0)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl App {
|
impl App {
|
||||||
fn parse_number(v: &str) -> Option<f64> {
|
|
||||||
if let Ok(num) = v.parse::<f64>() {
|
|
||||||
Some(num)
|
|
||||||
} else if v.eq_ignore_ascii_case("on")
|
|
||||||
|| v.eq_ignore_ascii_case("true")
|
|
||||||
|| v.eq_ignore_ascii_case("online")
|
|
||||||
{
|
|
||||||
Some(1.0)
|
|
||||||
} else if v.eq_ignore_ascii_case("off")
|
|
||||||
|| v.eq_ignore_ascii_case("false")
|
|
||||||
|| v.eq_ignore_ascii_case("offline")
|
|
||||||
{
|
|
||||||
Some(0.0)
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn parse_metric(value: &JsonValue) -> Option<f64> {
|
fn parse_metric(value: &JsonValue) -> Option<f64> {
|
||||||
match value {
|
match value {
|
||||||
JsonValue::Null => None,
|
JsonValue::Null => None,
|
||||||
@ -105,8 +144,8 @@ impl App {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
JsonValue::Number(v) => Some((*v).into()),
|
JsonValue::Number(v) => Some((*v).into()),
|
||||||
JsonValue::Short(v) => Self::parse_number(v.as_str()),
|
JsonValue::Short(v) => parse_number(v.as_str()),
|
||||||
JsonValue::String(v) => Self::parse_number(v.as_str()),
|
JsonValue::String(v) => parse_number(v.as_str()),
|
||||||
_ => None,
|
_ => None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -115,45 +154,39 @@ impl App {
|
|||||||
&self,
|
&self,
|
||||||
mut data: JsonValue,
|
mut data: JsonValue,
|
||||||
topic: &str,
|
topic: &str,
|
||||||
original_topic: &str,
|
_original_topic: &str,
|
||||||
prefix: &str,
|
metric_path: Vec<String>,
|
||||||
labels: &Option<Vec<(String, String)>>,
|
labels: &Vec<(String, String)>,
|
||||||
) {
|
) {
|
||||||
match data {
|
match data {
|
||||||
JsonValue::Array(_) => {
|
JsonValue::Array(_) => {
|
||||||
for (metric, child) in data.members_mut().enumerate() {
|
for (metric, child) in data.members_mut().enumerate() {
|
||||||
let metric = metric.to_string();
|
let mut metric_path = metric_path.clone();
|
||||||
self.parse_metrics(
|
metric_path.push(metric.to_string());
|
||||||
child.take(),
|
self.parse_metrics(child.take(), topic, _original_topic, metric_path, labels)
|
||||||
topic,
|
|
||||||
original_topic,
|
|
||||||
&format!("{prefix}{metric}_"),
|
|
||||||
labels,
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
JsonValue::Object(_) => {
|
JsonValue::Object(_) => {
|
||||||
for (metric, child) in data.entries_mut() {
|
for (metric, child) in data.entries_mut() {
|
||||||
self.parse_metrics(
|
let mut metric_path = metric_path.clone();
|
||||||
child.take(),
|
metric_path.push(metric.to_string());
|
||||||
topic,
|
self.parse_metrics(child.take(), topic, _original_topic, metric_path, labels)
|
||||||
original_topic,
|
|
||||||
&format!("{prefix}_{metric}"),
|
|
||||||
labels,
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
_ => {
|
_ => {
|
||||||
if let Some(v) = Self::parse_metric(&data) {
|
if let Some(v) = Self::parse_metric(&data) {
|
||||||
debug!("prometheus metric: {prefix} value: {v}");
|
let metric_s = metric_path.join("_");
|
||||||
let mut labels = if let Some(labels) = labels {
|
let mut labels = labels.clone();
|
||||||
labels.clone()
|
labels.push((self.config.topic_label.to_owned(), topic.to_owned()));
|
||||||
} else {
|
debug!("metric: {metric_s} value: {v} labels: {labels:?}");
|
||||||
Vec::new()
|
|
||||||
};
|
if self.config.expose_last_seen {
|
||||||
labels.push(("topic".to_owned(), topic.to_owned()));
|
let metric_ts = format!("{}_ts", metric_s);
|
||||||
let owned_pfx = prefix.to_owned();
|
let ts_gauge = gauge!(metric_ts, &labels);
|
||||||
let gauge = gauge!(owned_pfx, &labels);
|
let ts = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
|
||||||
|
ts_gauge.set(ts)
|
||||||
|
}
|
||||||
|
let gauge = gauge!(metric_s, &labels);
|
||||||
gauge.set(v);
|
gauge.set(v);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -163,12 +196,12 @@ impl App {
|
|||||||
fn topic_filter(&self, topic: &str) -> bool {
|
fn topic_filter(&self, topic: &str) -> bool {
|
||||||
!self
|
!self
|
||||||
.config
|
.config
|
||||||
.ignored_topics
|
.mqtt_ignored_topics
|
||||||
.iter()
|
.iter()
|
||||||
.any(|pfx| topic.starts_with(pfx))
|
.any(|pfx| topic.starts_with(pfx))
|
||||||
&& !self
|
&& !self
|
||||||
.config
|
.config
|
||||||
.ignored_topics_re
|
.mqtt_ignored_topics_re
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.is_some_and(|re| re.is_match(topic))
|
.is_some_and(|re| re.is_match(topic))
|
||||||
}
|
}
|
||||||
@ -193,66 +226,47 @@ impl App {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn update_metrics(&self, raw_topic: &str, mut data: JsonValue) {
|
fn update_metrics(&self, raw_topic: &str, mut data: JsonValue) {
|
||||||
|
let mut labels = Vec::new();
|
||||||
|
|
||||||
if !&self.topic_filter(raw_topic) {
|
if !&self.topic_filter(raw_topic) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
let mut topic = raw_topic.replace('/', "_");
|
debug!("topic {} payload {}", raw_topic, data.pretty(2));
|
||||||
|
let mut topic = raw_topic.to_owned();
|
||||||
|
|
||||||
if topic.starts_with(&self.config.zwave_topic_prefix) {
|
(topic, data) = self
|
||||||
(topic, data) = Self::normalize_zwave2mqtt(&topic, data);
|
.transformers
|
||||||
}
|
|
||||||
|
|
||||||
self.parse_metrics(data, &topic, &topic, "mqtt", &None);
|
|
||||||
}
|
|
||||||
|
|
||||||
fn normalize_zwave2mqtt(topic: &str, mut data: JsonValue) -> (String, JsonValue) {
|
|
||||||
if topic.contains("node_info") || !topic.contains("endpoint_") {
|
|
||||||
return (topic.to_owned(), JsonValue::Null);
|
|
||||||
}
|
|
||||||
if !data.is_object() || !data.contains("value") {
|
|
||||||
return (topic.to_owned(), JsonValue::Null);
|
|
||||||
}
|
|
||||||
let parts = topic.split('/').collect_vec();
|
|
||||||
if let Some((properties_index, _)) = parts
|
|
||||||
.iter()
|
.iter()
|
||||||
.enumerate()
|
.filter(|txfm| txfm.applies_to(raw_topic))
|
||||||
.find(|(_, v)| v.starts_with("endpoint_"))
|
.fold((topic, data), |(topic, data), txfm| {
|
||||||
{
|
txfm.transform(&topic, data, &mut labels)
|
||||||
let parts = topic.split('/').collect_vec();
|
});
|
||||||
let topic = parts[..properties_index]
|
|
||||||
.iter()
|
|
||||||
.map(|s| s.to_lowercase())
|
|
||||||
.join("/");
|
|
||||||
let properties = parts[properties_index..]
|
|
||||||
.iter()
|
|
||||||
.map(|s| s.to_lowercase())
|
|
||||||
.join("_");
|
|
||||||
let mut value = JsonValue::new_object();
|
|
||||||
value.insert(&properties, data.remove("value")).unwrap();
|
|
||||||
|
|
||||||
return (topic, value);
|
// Should be a transform itself?
|
||||||
} else {
|
// let topic = topic.replace('/', "_");
|
||||||
error!("Unable to find propreties index topic: {topic:?} value: {data:?}");
|
|
||||||
return (topic.to_owned(), JsonValue::Null);
|
let metric_path = vec![self.config.prometheus_prefix.clone()];
|
||||||
}
|
self.parse_metrics(data, &topic, &topic, metric_path, &labels);
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn connection_task(self) {
|
async fn mqtt_task(self) {
|
||||||
let mut opts = MqttOptions::new(
|
let mut opts = MqttOptions::new(
|
||||||
&self.config.client_id,
|
&self.config.mqtt_client_id,
|
||||||
&self.config.address,
|
&self.config.mqtt_address,
|
||||||
self.config.port,
|
self.config.mqtt_port,
|
||||||
);
|
);
|
||||||
opts.set_keep_alive(Duration::from_secs(self.config.keepalive))
|
opts.set_keep_alive(Duration::from_secs(self.config.mqtt_keepalive))
|
||||||
.set_max_packet_size(150000, 150000);
|
.set_max_packet_size(150000, 150000);
|
||||||
|
|
||||||
if let (Some(username), Some(password)) = (&self.config.username, &self.config.password) {
|
if let (Some(username), Some(password)) =
|
||||||
|
(&self.config.mqtt_username, &self.config.mqtt_password)
|
||||||
|
{
|
||||||
opts.set_credentials(username, password);
|
opts.set_credentials(username, password);
|
||||||
}
|
}
|
||||||
|
|
||||||
let (client, mut eventloop) = AsyncClient::new(opts, 10);
|
let (client, mut eventloop) = AsyncClient::new(opts, 10);
|
||||||
|
|
||||||
for topic in &self.config.topic {
|
for topic in &self.config.mqtt_topic {
|
||||||
client.subscribe(topic, QoS::AtLeastOnce).await.unwrap();
|
client.subscribe(topic, QoS::AtLeastOnce).await.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -283,14 +297,40 @@ impl App {
|
|||||||
}
|
}
|
||||||
async fn prom_task(&self) {
|
async fn prom_task(&self) {
|
||||||
let builder = PrometheusBuilder::new()
|
let builder = PrometheusBuilder::new()
|
||||||
|
.with_http_listener((self.config.prometheus_address, self.config.prometheus_port))
|
||||||
.idle_timeout(MetricKindMask::ALL, Some(Duration::from_secs(300)));
|
.idle_timeout(MetricKindMask::ALL, Some(Duration::from_secs(300)));
|
||||||
let builder = if self.config.expose_client_id {
|
let builder = if self.config.mqtt_expose_client_id {
|
||||||
builder.add_global_label("client_id", &self.config.client_id)
|
builder.add_global_label("client_id", &self.config.mqtt_client_id)
|
||||||
} else {
|
} else {
|
||||||
builder
|
builder
|
||||||
};
|
};
|
||||||
builder.install().unwrap();
|
builder.install().unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn new(config: AppConfig) -> Self {
|
||||||
|
let mut transformers: Vec<Box<dyn Transform>> = Vec::new();
|
||||||
|
if !config.zwave_topic_prefix.is_empty() {
|
||||||
|
transformers.push(Box::new(ZwaveNormalize::new(&config.zwave_topic_prefix)))
|
||||||
|
}
|
||||||
|
if !config.esphome_topic_prefixes.is_empty() {
|
||||||
|
transformers.push(Box::new(EsphomeNormalize::new(
|
||||||
|
&config.esphome_topic_prefixes,
|
||||||
|
)))
|
||||||
|
}
|
||||||
|
if !config.hubitat_topic_prefixes.is_empty() {
|
||||||
|
transformers.push(Box::new(HubitatNormalize::new(
|
||||||
|
&config.hubitat_topic_prefixes,
|
||||||
|
)))
|
||||||
|
}
|
||||||
|
if config.zigbee2mqtt_availability {
|
||||||
|
transformers.push(Box::new(Zigbee2MqttAvailability::default()))
|
||||||
|
}
|
||||||
|
debug!("Transformers enabled: {:?}", transformers);
|
||||||
|
Self {
|
||||||
|
config,
|
||||||
|
transformers,
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
@ -299,19 +339,22 @@ async fn main() {
|
|||||||
|
|
||||||
let config = Config::builder()
|
let config = Config::builder()
|
||||||
.add_source(
|
.add_source(
|
||||||
config::Environment::with_prefix("MQTT")
|
config::Environment::with_prefix("")
|
||||||
|
.prefix_separator("")
|
||||||
.try_parsing(true)
|
.try_parsing(true)
|
||||||
|
.separator("__")
|
||||||
.list_separator(",")
|
.list_separator(",")
|
||||||
.with_list_parse_key("topic")
|
.with_list_parse_key("mqtt_topic")
|
||||||
.with_list_parse_key("ignored_topics"),
|
.with_list_parse_key("mqtt_ignored_topics")
|
||||||
|
.with_list_parse_key("esphome_topic_prefixes")
|
||||||
|
.with_list_parse_key("hubitat_topic_prefixes"),
|
||||||
)
|
)
|
||||||
.build()
|
.build()
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let app_config = config.try_deserialize().unwrap();
|
let app_config = config.try_deserialize().unwrap();
|
||||||
info!("Loaded config: {app_config:?}");
|
info!("Loaded config: {app_config:?}");
|
||||||
|
|
||||||
let app = App { config: app_config };
|
let app = App::new(app_config);
|
||||||
app.prom_task().await;
|
app.prom_task().await;
|
||||||
let task = tokio::spawn(app.connection_task());
|
app.mqtt_task().await;
|
||||||
task.await.unwrap();
|
|
||||||
}
|
}
|
||||||
|
189
src/transforms.rs
Normal file
189
src/transforms.rs
Normal file
@ -0,0 +1,189 @@
|
|||||||
|
use itertools::Itertools;
|
||||||
|
use json::JsonValue;
|
||||||
|
use log::{debug, error};
|
||||||
|
use std::fmt::Debug;
|
||||||
|
|
||||||
|
pub trait Transform: Debug {
|
||||||
|
fn applies_to(&self, topic: &str) -> bool;
|
||||||
|
fn transform(
|
||||||
|
&self,
|
||||||
|
topic: &str,
|
||||||
|
data: JsonValue,
|
||||||
|
labels: &mut Vec<(String, String)>,
|
||||||
|
) -> (String, JsonValue);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct ZwaveNormalize {
|
||||||
|
topic_prefix: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ZwaveNormalize {
|
||||||
|
pub fn new(topic_prefix: &str) -> Self {
|
||||||
|
Self {
|
||||||
|
topic_prefix: topic_prefix.to_owned(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Transform for ZwaveNormalize {
|
||||||
|
fn applies_to(&self, topic: &str) -> bool {
|
||||||
|
topic.starts_with(&self.topic_prefix)
|
||||||
|
}
|
||||||
|
fn transform(
|
||||||
|
&self,
|
||||||
|
topic: &str,
|
||||||
|
mut data: JsonValue,
|
||||||
|
labels: &mut Vec<(String, String)>,
|
||||||
|
) -> (String, JsonValue) {
|
||||||
|
if topic.contains("nodeinfo") || !topic.contains("endpoint_") {
|
||||||
|
debug!("zwave topic isn't relevant");
|
||||||
|
return (topic.to_owned(), JsonValue::Null);
|
||||||
|
}
|
||||||
|
if !data.is_object() || !data.has_key("value") {
|
||||||
|
debug!("zwave payload doesn't include value");
|
||||||
|
return (topic.to_owned(), JsonValue::Null);
|
||||||
|
}
|
||||||
|
debug!("Normalizing zwave2mqtt payload");
|
||||||
|
let parts = topic.split('/').collect_vec();
|
||||||
|
if let Some((properties_index, _)) = parts
|
||||||
|
.iter()
|
||||||
|
.enumerate()
|
||||||
|
.find(|(_, v)| v.starts_with("endpoint_"))
|
||||||
|
{
|
||||||
|
let parts = topic.split('/').collect_vec();
|
||||||
|
let topic = parts[..properties_index]
|
||||||
|
.iter()
|
||||||
|
.map(|s| s.to_lowercase())
|
||||||
|
.join("/");
|
||||||
|
|
||||||
|
if let Ok(endpoint) = parts[properties_index]
|
||||||
|
.split_at("endpoint_".len())
|
||||||
|
.1
|
||||||
|
.parse::<u64>()
|
||||||
|
{
|
||||||
|
labels.push(("endpoint".to_owned(), endpoint.to_string()));
|
||||||
|
} else {
|
||||||
|
labels.push(("endpoint".to_owned(), parts[properties_index].to_owned()))
|
||||||
|
}
|
||||||
|
|
||||||
|
let properties = parts[properties_index + 1..]
|
||||||
|
.iter()
|
||||||
|
.map(|s| s.to_lowercase())
|
||||||
|
.join("_");
|
||||||
|
let mut value = JsonValue::new_object();
|
||||||
|
value.insert(&properties, data.remove("value")).unwrap();
|
||||||
|
|
||||||
|
debug!("Normalized topic: {topic} value: {value}");
|
||||||
|
|
||||||
|
(topic, value)
|
||||||
|
} else {
|
||||||
|
error!(target: "ZwaveNormalize", "Unable to find propreties index topic: {topic:?} value: {data:?}");
|
||||||
|
(topic.to_owned(), JsonValue::Null)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct EsphomeNormalize {
|
||||||
|
topic_prefixes: Vec<String>,
|
||||||
|
}
|
||||||
|
impl EsphomeNormalize {
|
||||||
|
pub fn new(topic_prefixes: &[String]) -> Self {
|
||||||
|
Self {
|
||||||
|
topic_prefixes: topic_prefixes.to_vec(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Not sure why you'd want this, it de-normalizes the values since it moves them to separate topics per esphome node, but okay...
|
||||||
|
impl Transform for EsphomeNormalize {
|
||||||
|
fn applies_to(&self, topic: &str) -> bool {
|
||||||
|
self.topic_prefixes.iter().any(|pfx| topic.starts_with(pfx))
|
||||||
|
}
|
||||||
|
fn transform(
|
||||||
|
&self,
|
||||||
|
topic: &str,
|
||||||
|
data: JsonValue,
|
||||||
|
labels: &mut Vec<(String, String)>,
|
||||||
|
) -> (String, JsonValue) {
|
||||||
|
// <TOPIC_PREFIX>/<COMPONENT_TYPE>/<COMPONENT_NAME>/state
|
||||||
|
let parts = topic.split('/').collect_vec();
|
||||||
|
|
||||||
|
labels.push(("esphome_type".to_owned(), parts[1].to_owned()));
|
||||||
|
labels.push(("esphome_name".to_owned(), parts[2].to_owned()));
|
||||||
|
|
||||||
|
let topic = format!("{}/{}", parts[0].to_lowercase(), parts[1].to_lowercase());
|
||||||
|
let mut value = JsonValue::new_object();
|
||||||
|
value.insert(parts[parts.len() - 2], data).unwrap();
|
||||||
|
(topic, value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct HubitatNormalize {
|
||||||
|
topic_prefixes: Vec<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl HubitatNormalize {
|
||||||
|
pub fn new(topic_prefixes: &[String]) -> Self {
|
||||||
|
Self {
|
||||||
|
topic_prefixes: topic_prefixes.to_vec(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Transform for HubitatNormalize {
|
||||||
|
fn applies_to(&self, topic: &str) -> bool {
|
||||||
|
self.topic_prefixes.iter().any(|pfx| topic.starts_with(pfx))
|
||||||
|
}
|
||||||
|
fn transform(
|
||||||
|
&self,
|
||||||
|
topic: &str,
|
||||||
|
data: JsonValue,
|
||||||
|
labels: &mut Vec<(String, String)>,
|
||||||
|
) -> (String, JsonValue) {
|
||||||
|
// hubitat/hub1/some room/temperature/value
|
||||||
|
let parts = topic.split('/').collect_vec();
|
||||||
|
if parts.len() < 3 {
|
||||||
|
return (topic.to_owned(), data);
|
||||||
|
}
|
||||||
|
|
||||||
|
let topic = parts[0..3].iter().map(|s| s.to_lowercase()).join("_");
|
||||||
|
labels.push(("hubitat_hub".to_owned(), parts[1].to_owned()));
|
||||||
|
labels.push(("hubitat_room".to_owned(), parts[2].to_owned()));
|
||||||
|
|
||||||
|
let mut value = JsonValue::new_object();
|
||||||
|
value.insert(parts[parts.len() - 2], data).unwrap();
|
||||||
|
(topic, value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const ZIGBEE2MQTT_AVAILABILITY_SUFFIX: &str = "availability";
|
||||||
|
|
||||||
|
#[derive(Debug, Default)]
|
||||||
|
pub struct Zigbee2MqttAvailability {}
|
||||||
|
|
||||||
|
impl Transform for Zigbee2MqttAvailability {
|
||||||
|
fn applies_to(&self, topic: &str) -> bool {
|
||||||
|
topic.ends_with(ZIGBEE2MQTT_AVAILABILITY_SUFFIX)
|
||||||
|
}
|
||||||
|
fn transform(
|
||||||
|
&self,
|
||||||
|
topic: &str,
|
||||||
|
mut data: JsonValue,
|
||||||
|
_labels: &mut Vec<(String, String)>,
|
||||||
|
) -> (String, JsonValue) {
|
||||||
|
if data.has_key("state") {
|
||||||
|
let topic = topic
|
||||||
|
.split_at(topic.len() - ZIGBEE2MQTT_AVAILABILITY_SUFFIX.len() - 1)
|
||||||
|
.0;
|
||||||
|
let mut value = JsonValue::new_object();
|
||||||
|
value
|
||||||
|
.insert("zigbee_availability", data.remove("state"))
|
||||||
|
.unwrap();
|
||||||
|
(topic.to_owned(), value)
|
||||||
|
} else {
|
||||||
|
(topic.to_owned(), data)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user