first commit
This commit is contained in:
commit
adf13560a8
1
.gitignore
vendored
Normal file
1
.gitignore
vendored
Normal file
@ -0,0 +1 @@
|
|||||||
|
/target
|
2096
Cargo.lock
generated
Normal file
2096
Cargo.lock
generated
Normal file
File diff suppressed because it is too large
Load Diff
22
Cargo.toml
Normal file
22
Cargo.toml
Normal file
@ -0,0 +1,22 @@
|
|||||||
|
[package]
|
||||||
|
name = "mqtt-exporter"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2021"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
backoff = { version = "0.4.0", features = ["tokio"] }
|
||||||
|
bytes = "1.10.1"
|
||||||
|
config = "0.15.9"
|
||||||
|
env_logger = "0.11.6"
|
||||||
|
itertools = "0.14.0"
|
||||||
|
json = "0.12.4"
|
||||||
|
log = "0.4.26"
|
||||||
|
metrics = "0.24.1"
|
||||||
|
metrics-exporter-prometheus = "0.16.2"
|
||||||
|
metrics-util = "0.19.0"
|
||||||
|
regex = "1.11.1"
|
||||||
|
rumqttc = "0.24.0"
|
||||||
|
serde = { version = "1.0.218", features = ["derive"] }
|
||||||
|
serde_regex = "1.1.0"
|
||||||
|
tokio = { version = "1.44.0", features = ["net", "rt-multi-thread"] }
|
||||||
|
tokio-util = { version = "0.7.13", features = ["codec"] }
|
317
src/main.rs
Normal file
317
src/main.rs
Normal file
@ -0,0 +1,317 @@
|
|||||||
|
use backoff::{backoff::Backoff, ExponentialBackoff};
|
||||||
|
use config::Config;
|
||||||
|
use core::str;
|
||||||
|
use itertools::Itertools;
|
||||||
|
use json::JsonValue;
|
||||||
|
use log::{debug, error, info, warn};
|
||||||
|
use metrics::gauge;
|
||||||
|
use metrics_exporter_prometheus::PrometheusBuilder;
|
||||||
|
use metrics_util::{
|
||||||
|
registry::{AtomicStorage, GenerationalAtomicStorage, Registry},
|
||||||
|
MetricKindMask,
|
||||||
|
};
|
||||||
|
use regex::Regex;
|
||||||
|
use rumqttc::{
|
||||||
|
tokio_rustls::client,
|
||||||
|
AsyncClient,
|
||||||
|
Event::{self, Incoming},
|
||||||
|
MqttOptions, Packet, Publish, QoS,
|
||||||
|
};
|
||||||
|
use serde::Deserialize;
|
||||||
|
use std::{collections::HashMap, error::Error, iter::repeat, process::exit, time::Duration};
|
||||||
|
use tokio::{join, net::TcpStream};
|
||||||
|
|
||||||
|
struct ConfigDefaults;
|
||||||
|
impl ConfigDefaults {
|
||||||
|
fn address() -> String {
|
||||||
|
"localhost".to_owned()
|
||||||
|
}
|
||||||
|
fn port() -> u16 {
|
||||||
|
1883
|
||||||
|
}
|
||||||
|
fn topic() -> Vec<String> {
|
||||||
|
vec!["#".to_owned()]
|
||||||
|
}
|
||||||
|
fn keepalive() -> u64 {
|
||||||
|
60
|
||||||
|
}
|
||||||
|
fn client_id() -> String {
|
||||||
|
"mqtt-exporter-rs".to_owned()
|
||||||
|
}
|
||||||
|
fn expose_client_id() -> bool {
|
||||||
|
true
|
||||||
|
}
|
||||||
|
fn zwave_topic_prefix() -> String {
|
||||||
|
"zwave/".to_owned()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize)]
|
||||||
|
struct AppConfig {
|
||||||
|
#[serde(default = "ConfigDefaults::address")]
|
||||||
|
address: String,
|
||||||
|
#[serde(default = "ConfigDefaults::port")]
|
||||||
|
port: u16,
|
||||||
|
#[serde(default = "ConfigDefaults::topic")]
|
||||||
|
topic: Vec<String>,
|
||||||
|
#[serde(default = "ConfigDefaults::keepalive")]
|
||||||
|
keepalive: u64,
|
||||||
|
#[serde(default)]
|
||||||
|
username: Option<String>,
|
||||||
|
#[serde(default)]
|
||||||
|
password: Option<String>,
|
||||||
|
#[serde(default = "ConfigDefaults::client_id")]
|
||||||
|
client_id: String,
|
||||||
|
#[serde(default = "ConfigDefaults::expose_client_id")]
|
||||||
|
expose_client_id: bool,
|
||||||
|
#[serde(default)]
|
||||||
|
ignored_topics: Vec<String>,
|
||||||
|
#[serde(with = "serde_regex", default)]
|
||||||
|
ignored_topics_re: Option<Regex>,
|
||||||
|
#[serde(default = "ConfigDefaults::zwave_topic_prefix")]
|
||||||
|
zwave_topic_prefix: String,
|
||||||
|
}
|
||||||
|
struct App {
|
||||||
|
config: AppConfig,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl App {
|
||||||
|
fn parse_number(v: &str) -> Option<f64> {
|
||||||
|
if let Ok(num) = v.parse::<f64>() {
|
||||||
|
Some(num)
|
||||||
|
} else if v.eq_ignore_ascii_case("on")
|
||||||
|
|| v.eq_ignore_ascii_case("true")
|
||||||
|
|| v.eq_ignore_ascii_case("online")
|
||||||
|
{
|
||||||
|
Some(1.0)
|
||||||
|
} else if v.eq_ignore_ascii_case("off")
|
||||||
|
|| v.eq_ignore_ascii_case("false")
|
||||||
|
|| v.eq_ignore_ascii_case("offline")
|
||||||
|
{
|
||||||
|
Some(0.0)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn parse_metric(value: &JsonValue) -> Option<f64> {
|
||||||
|
match value {
|
||||||
|
JsonValue::Null => None,
|
||||||
|
JsonValue::Boolean(v) => {
|
||||||
|
if *v {
|
||||||
|
Some(1.0)
|
||||||
|
} else {
|
||||||
|
Some(0.0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
JsonValue::Number(v) => Some((*v).into()),
|
||||||
|
JsonValue::Short(v) => Self::parse_number(v.as_str()),
|
||||||
|
JsonValue::String(v) => Self::parse_number(v.as_str()),
|
||||||
|
_ => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn parse_metrics(
|
||||||
|
&self,
|
||||||
|
mut data: JsonValue,
|
||||||
|
topic: &str,
|
||||||
|
original_topic: &str,
|
||||||
|
prefix: &str,
|
||||||
|
labels: &Option<Vec<(String, String)>>,
|
||||||
|
) {
|
||||||
|
match data {
|
||||||
|
JsonValue::Array(_) => {
|
||||||
|
for (metric, child) in data.members_mut().enumerate() {
|
||||||
|
let metric = metric.to_string();
|
||||||
|
self.parse_metrics(
|
||||||
|
child.take(),
|
||||||
|
topic,
|
||||||
|
original_topic,
|
||||||
|
&format!("{prefix}{metric}_"),
|
||||||
|
labels,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
JsonValue::Object(_) => {
|
||||||
|
for (metric, child) in data.entries_mut() {
|
||||||
|
self.parse_metrics(
|
||||||
|
child.take(),
|
||||||
|
topic,
|
||||||
|
original_topic,
|
||||||
|
&format!("{prefix}_{metric}"),
|
||||||
|
labels,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
if let Some(v) = Self::parse_metric(&data) {
|
||||||
|
debug!("prometheus metric: {prefix} value: {v}");
|
||||||
|
let mut labels = if let Some(labels) = labels {
|
||||||
|
labels.clone()
|
||||||
|
} else {
|
||||||
|
Vec::new()
|
||||||
|
};
|
||||||
|
labels.push(("topic".to_owned(), topic.to_owned()));
|
||||||
|
let owned_pfx = prefix.to_owned();
|
||||||
|
let gauge = gauge!(owned_pfx, &labels);
|
||||||
|
gauge.set(v);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn topic_filter(&self, topic: &str) -> bool {
|
||||||
|
!self
|
||||||
|
.config
|
||||||
|
.ignored_topics
|
||||||
|
.iter()
|
||||||
|
.any(|pfx| topic.starts_with(pfx))
|
||||||
|
&& !self
|
||||||
|
.config
|
||||||
|
.ignored_topics_re
|
||||||
|
.as_ref()
|
||||||
|
.is_some_and(|re| re.is_match(topic))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn deserialize_payload(payload: &[u8]) -> Result<JsonValue, Box<dyn std::error::Error>> {
|
||||||
|
Ok(json::parse(str::from_utf8(payload)?)?)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_event(&self, event: &Event) {
|
||||||
|
match event {
|
||||||
|
Event::Incoming(Packet::Publish(notification)) => self.handle_publish(notification),
|
||||||
|
Event::Outgoing(_) => {}
|
||||||
|
e => debug!("Unhandled event: {:?}", e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_publish(&self, event: &Publish) {
|
||||||
|
match Self::deserialize_payload(&event.payload) {
|
||||||
|
Ok(parsed) => self.update_metrics(&event.topic, parsed),
|
||||||
|
Err(e) => error!("Error deserializing JSON payload: {:?}", e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn update_metrics(&self, raw_topic: &str, mut data: JsonValue) {
|
||||||
|
if !&self.topic_filter(raw_topic) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
let mut topic = raw_topic.replace('/', "_");
|
||||||
|
|
||||||
|
if topic.starts_with(&self.config.zwave_topic_prefix) {
|
||||||
|
(topic, data) = Self::normalize_zwave2mqtt(&topic, data);
|
||||||
|
}
|
||||||
|
|
||||||
|
self.parse_metrics(data, &topic, &topic, "mqtt", &None);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn normalize_zwave2mqtt(topic: &str, mut data: JsonValue) -> (String, JsonValue) {
|
||||||
|
if topic.contains("node_info") || !topic.contains("endpoint_") {
|
||||||
|
return (topic.to_owned(), JsonValue::Null);
|
||||||
|
}
|
||||||
|
if !data.is_object() || !data.contains("value") {
|
||||||
|
return (topic.to_owned(), JsonValue::Null);
|
||||||
|
}
|
||||||
|
let parts = topic.split('/').collect_vec();
|
||||||
|
if let Some((properties_index, _)) = parts
|
||||||
|
.iter()
|
||||||
|
.enumerate()
|
||||||
|
.find(|(_, v)| v.starts_with("endpoint_"))
|
||||||
|
{
|
||||||
|
let parts = topic.split('/').collect_vec();
|
||||||
|
let topic = parts[..properties_index]
|
||||||
|
.iter()
|
||||||
|
.map(|s| s.to_lowercase())
|
||||||
|
.join("/");
|
||||||
|
let properties = parts[properties_index..]
|
||||||
|
.iter()
|
||||||
|
.map(|s| s.to_lowercase())
|
||||||
|
.join("_");
|
||||||
|
let mut value = JsonValue::new_object();
|
||||||
|
value.insert(&properties, data.remove("value")).unwrap();
|
||||||
|
|
||||||
|
return (topic, value);
|
||||||
|
} else {
|
||||||
|
error!("Unable to find propreties index topic: {topic:?} value: {data:?}");
|
||||||
|
return (topic.to_owned(), JsonValue::Null);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn connection_task(self) {
|
||||||
|
let mut opts = MqttOptions::new(
|
||||||
|
&self.config.client_id,
|
||||||
|
&self.config.address,
|
||||||
|
self.config.port,
|
||||||
|
);
|
||||||
|
opts.set_keep_alive(Duration::from_secs(self.config.keepalive))
|
||||||
|
.set_max_packet_size(150000, 150000);
|
||||||
|
|
||||||
|
if let (Some(username), Some(password)) = (&self.config.username, &self.config.password) {
|
||||||
|
opts.set_credentials(username, password);
|
||||||
|
}
|
||||||
|
|
||||||
|
let (client, mut eventloop) = AsyncClient::new(opts, 10);
|
||||||
|
|
||||||
|
for topic in &self.config.topic {
|
||||||
|
client.subscribe(topic, QoS::AtLeastOnce).await.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut bo = backoff::ExponentialBackoff::default();
|
||||||
|
|
||||||
|
loop {
|
||||||
|
match eventloop.poll().await {
|
||||||
|
Ok(event) => {
|
||||||
|
bo.reset();
|
||||||
|
self.handle_event(&event)
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
error!("Connection error: {}", e);
|
||||||
|
let delay = bo.next_backoff();
|
||||||
|
match delay {
|
||||||
|
Some(delay) => {
|
||||||
|
debug!("Backing off for {}s", delay.as_secs_f32());
|
||||||
|
tokio::time::sleep(delay).await
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
error!("Connection timed out, giving up");
|
||||||
|
exit(-1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
async fn prom_task(&self) {
|
||||||
|
let builder = PrometheusBuilder::new()
|
||||||
|
.idle_timeout(MetricKindMask::ALL, Some(Duration::from_secs(300)));
|
||||||
|
let builder = if self.config.expose_client_id {
|
||||||
|
builder.add_global_label("client_id", &self.config.client_id)
|
||||||
|
} else {
|
||||||
|
builder
|
||||||
|
};
|
||||||
|
builder.install().unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() {
|
||||||
|
env_logger::init();
|
||||||
|
|
||||||
|
let config = Config::builder()
|
||||||
|
.add_source(
|
||||||
|
config::Environment::with_prefix("MQTT")
|
||||||
|
.try_parsing(true)
|
||||||
|
.list_separator(",")
|
||||||
|
.with_list_parse_key("topic")
|
||||||
|
.with_list_parse_key("ignored_topics"),
|
||||||
|
)
|
||||||
|
.build()
|
||||||
|
.unwrap();
|
||||||
|
let app_config = config.try_deserialize().unwrap();
|
||||||
|
info!("Loaded config: {app_config:?}");
|
||||||
|
|
||||||
|
let app = App { config: app_config };
|
||||||
|
app.prom_task().await;
|
||||||
|
let task = tokio::spawn(app.connection_task());
|
||||||
|
task.await.unwrap();
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user