Compare commits

..

No commits in common. "3085e38dfd868c77f5d33fe3d676d21f77b9a305" and "4a15e0b11a10e6640b168880bc7ed41f950b0a14" have entirely different histories.

5 changed files with 47 additions and 91 deletions

2
Cargo.lock generated
View File

@ -983,7 +983,7 @@ dependencies = [
[[package]] [[package]]
name = "mqtt-exporter" name = "mqtt-exporter"
version = "0.1.2" version = "0.1.0"
dependencies = [ dependencies = [
"backoff", "backoff",
"bytes", "bytes",

View File

@ -1,6 +1,6 @@
[package] [package]
name = "mqtt-exporter" name = "mqtt-exporter"
version = "0.1.2" version = "0.1.0"
edition = "2021" edition = "2021"
[dependencies] [dependencies]
@ -20,6 +20,3 @@ serde = { version = "1.0.218", features = ["derive"] }
serde_regex = "1.1.0" serde_regex = "1.1.0"
tokio = { version = "1.44.0", features = ["net", "rt-multi-thread"] } tokio = { version = "1.44.0", features = ["net", "rt-multi-thread"] }
tokio-util = { version = "0.7.13", features = ["codec"] } tokio-util = { version = "0.7.13", features = ["codec"] }
[profile.release]
lto = "fat"

View File

@ -1,12 +1,8 @@
FROM rust:1 AS builder FROM rust:1.85
WORKDIR /usr/src/mqtt-exporter WORKDIR /usr/src/mqtt-exporter
COPY src/ ./src COPY . .
COPY Cargo.* .
RUN cargo install --path . --root /app RUN cargo install --path .
FROM debian:bookworm-slim
COPY --from=builder /app/bin/mqtt-exporter /usr/local/bin/mqtt-exporter
CMD ["mqtt-exporter"] CMD ["mqtt-exporter"]

View File

@ -1,5 +0,0 @@
# mqtt-exporter-rs
A reimplementation of [mqtt-exporter](https://github.com/kpetremann/mqtt-exporter) in Rust. That's pretty much the gist of it. I was having issues with memory leaks in the Python implementation so I spun this together. It works for my purposes but is far from tested.
I slightly modified the logic, since I don't understand why some of the topic munging is done and would prefer to keep the topics as-is.

View File

@ -2,24 +2,25 @@ use backoff::backoff::Backoff;
use config::Config; use config::Config;
use core::str; use core::str;
use json::JsonValue; use json::JsonValue;
use log::{debug, error, info, warn}; use log::{debug, error, info};
use metrics::gauge; use metrics::gauge;
use metrics_exporter_prometheus::PrometheusBuilder; use metrics_exporter_prometheus::PrometheusBuilder;
use metrics_util::MetricKindMask; use metrics_util::MetricKindMask;
use mqtt_exporter::transforms::*;
use regex::Regex; use regex::Regex;
use rumqttc::{ use rumqttc::{
AsyncClient, ConnAck, AsyncClient,
Event::{self}, Event::{self},
EventLoop, MqttOptions, Packet, Publish, QoS, SubscribeFilter, MqttOptions, Packet, Publish, QoS,
}; };
use serde::Deserialize; use serde::Deserialize;
use std::{ use std::{
net::{IpAddr, Ipv6Addr}, net::{IpAddr, Ipv6Addr},
process::exit, process::exit,
time::{Duration, SystemTime, UNIX_EPOCH}, time::Duration,
}; };
use mqtt_exporter::transforms::*; use std::time::{SystemTime, UNIX_EPOCH};
struct ConfigDefaults; struct ConfigDefaults;
impl ConfigDefaults { impl ConfigDefaults {
@ -110,7 +111,6 @@ struct AppConfig {
} }
struct App { struct App {
config: AppConfig, config: AppConfig,
client: AsyncClient,
transformers: Vec<Box<dyn Transform>>, transformers: Vec<Box<dyn Transform>>,
} }
@ -178,7 +178,7 @@ impl App {
let metric_s = metric_path.join("_"); let metric_s = metric_path.join("_");
let mut labels = labels.clone(); let mut labels = labels.clone();
labels.push((self.config.topic_label.to_owned(), topic.to_owned())); labels.push((self.config.topic_label.to_owned(), topic.to_owned()));
debug!(target: "mqtt-exporter::prom", "metric: {metric_s} value: {v} labels: {labels:?}"); debug!("metric: {metric_s} value: {v} labels: {labels:?}");
if self.config.expose_last_seen { if self.config.expose_last_seen {
let metric_ts = format!("{}_ts", metric_s); let metric_ts = format!("{}_ts", metric_s);
@ -210,34 +210,18 @@ impl App {
Ok(json::parse(str::from_utf8(payload)?)?) Ok(json::parse(str::from_utf8(payload)?)?)
} }
async fn handle_event(&self, event: &Event) { fn handle_event(&self, event: &Event) {
match event { match event {
Event::Incoming(Packet::Publish(notification)) => self.handle_publish(notification), Event::Incoming(Packet::Publish(notification)) => self.handle_publish(notification),
Event::Incoming(Packet::ConnAck(ack)) => self.handle_connack(ack).await,
// Event::Incoming(Packet::SubAck(ack)) => self.handle_suback(ack),
Event::Outgoing(_) => {} Event::Outgoing(_) => {}
e => debug!(target: "mqtt-exporter::mqtt", "Unhandled event: {:?}", e), e => debug!("Unhandled event: {:?}", e),
} }
} }
fn handle_publish(&self, event: &Publish) { fn handle_publish(&self, event: &Publish) {
match Self::deserialize_payload(&event.payload) { match Self::deserialize_payload(&event.payload) {
Ok(parsed) => self.update_metrics(&event.topic, parsed), Ok(parsed) => self.update_metrics(&event.topic, parsed),
Err(e) => { Err(e) => error!("Error deserializing JSON payload: {:?}", e),
error!(target: "mqtt-exporter::mqtt", "Error deserializing JSON payload: {:?}", e)
}
}
}
async fn handle_connack(&self, ack: &ConnAck) {
match ack.code {
rumqttc::ConnectReturnCode::Success => {
info!(target: "mqtt-exporter::mqtt", "Successfully connected to MQTT broker");
self.subscribe().await;
}
code => {
warn!(target: "mqtt-exporter::mqtt", "Non success connection return code {:?}", code)
}
} }
} }
@ -247,7 +231,7 @@ impl App {
if !&self.topic_filter(raw_topic) { if !&self.topic_filter(raw_topic) {
return; return;
} }
debug!(target: "mqtt-exporter::mqtt", "topic {} payload {}", raw_topic, data.pretty(2)); debug!("topic {} payload {}", raw_topic, data.pretty(2));
let mut topic = raw_topic.to_owned(); let mut topic = raw_topic.to_owned();
(topic, data) = self (topic, data) = self
@ -265,43 +249,45 @@ impl App {
self.parse_metrics(data, &topic, &topic, metric_path, &labels); self.parse_metrics(data, &topic, &topic, metric_path, &labels);
} }
async fn subscribe(&self) { async fn mqtt_task(self) {
match self let mut opts = MqttOptions::new(
.client &self.config.mqtt_client_id,
.subscribe_many(self.config.mqtt_topic.iter().map(|topic| SubscribeFilter { &self.config.mqtt_address,
path: topic.to_owned(), self.config.mqtt_port,
qos: QoS::AtLeastOnce, );
})) opts.set_keep_alive(Duration::from_secs(self.config.mqtt_keepalive))
.await .set_max_packet_size(150000, 150000);
if let (Some(username), Some(password)) =
(&self.config.mqtt_username, &self.config.mqtt_password)
{ {
Ok(_) => { opts.set_credentials(username, password);
info!(target: "mqtt-exporter::mqtt", "Successfully subscribed to topics `{}`", &self.config.mqtt_topic.join(" ")) }
}
Err(e) => { let (client, mut eventloop) = AsyncClient::new(opts, 10);
error!(target: "mqtt-exporter::mqtt", "Failed to subscribe to topics `{}` - {}", &self.config.mqtt_topic.join(" "), e)
} for topic in &self.config.mqtt_topic {
} client.subscribe(topic, QoS::AtLeastOnce).await.unwrap();
} }
async fn mqtt_task(self, mut eventloop: EventLoop) {
let mut bo = backoff::ExponentialBackoff::default(); let mut bo = backoff::ExponentialBackoff::default();
loop { loop {
match eventloop.poll().await { match eventloop.poll().await {
Ok(event) => { Ok(event) => {
bo.reset(); bo.reset();
self.handle_event(&event).await self.handle_event(&event)
} }
Err(e) => { Err(e) => {
error!(target: "mqtt-exporter::mqtt", "Connection error: {}", e); error!("Connection error: {}", e);
let delay = bo.next_backoff(); let delay = bo.next_backoff();
match delay { match delay {
Some(delay) => { Some(delay) => {
debug!(target: "mqtt-exporter::mqtt", "Backing off for {}s", delay.as_secs_f32()); debug!("Backing off for {}s", delay.as_secs_f32());
tokio::time::sleep(delay).await tokio::time::sleep(delay).await
} }
None => { None => {
error!(target: "mqtt-exporter::mqtt", "Connection timed out, giving up"); error!("Connection timed out, giving up");
exit(-1) exit(-1)
} }
} }
@ -321,7 +307,7 @@ impl App {
builder.install().unwrap(); builder.install().unwrap();
} }
fn new(config: AppConfig, client: AsyncClient) -> Self { fn new(config: AppConfig) -> Self {
let mut transformers: Vec<Box<dyn Transform>> = Vec::new(); let mut transformers: Vec<Box<dyn Transform>> = Vec::new();
if !config.zwave_topic_prefix.is_empty() { if !config.zwave_topic_prefix.is_empty() {
transformers.push(Box::new(ZwaveNormalize::new(&config.zwave_topic_prefix))) transformers.push(Box::new(ZwaveNormalize::new(&config.zwave_topic_prefix)))
@ -331,8 +317,7 @@ impl App {
&config.esphome_topic_prefixes, &config.esphome_topic_prefixes,
))) )))
} }
if !config.hubitat_topic_prefixes.is_empty() && !config.hubitat_topic_prefixes[0].is_empty() if !config.hubitat_topic_prefixes.is_empty() {
{
transformers.push(Box::new(HubitatNormalize::new( transformers.push(Box::new(HubitatNormalize::new(
&config.hubitat_topic_prefixes, &config.hubitat_topic_prefixes,
))) )))
@ -340,11 +325,9 @@ impl App {
if config.zigbee2mqtt_availability { if config.zigbee2mqtt_availability {
transformers.push(Box::new(Zigbee2MqttAvailability::default())) transformers.push(Box::new(Zigbee2MqttAvailability::default()))
} }
debug!(target: "mqtt-exporter::config", "Transformers enabled: {:?}", transformers); debug!("Transformers enabled: {:?}", transformers);
Self { Self {
config, config,
client,
transformers, transformers,
} }
} }
@ -368,25 +351,10 @@ async fn main() {
) )
.build() .build()
.unwrap(); .unwrap();
let app_config: AppConfig = config.try_deserialize().unwrap(); let app_config = config.try_deserialize().unwrap();
info!(target: "mqtt-exporter::config", "Loaded config: {app_config:?}"); info!("Loaded config: {app_config:?}");
let mut opts = MqttOptions::new( let app = App::new(app_config);
&app_config.mqtt_client_id,
&app_config.mqtt_address,
app_config.mqtt_port,
);
opts.set_keep_alive(Duration::from_secs(app_config.mqtt_keepalive))
.set_max_packet_size(150000, 150000);
if let (Some(username), Some(password)) = (&app_config.mqtt_username, &app_config.mqtt_password)
{
opts.set_credentials(username, password);
}
let (client, mut eventloop) = AsyncClient::new(opts, 10);
let app = App::new(app_config, client);
app.prom_task().await; app.prom_task().await;
app.mqtt_task(eventloop).await; app.mqtt_task().await;
} }