Compare commits
No commits in common. "3085e38dfd868c77f5d33fe3d676d21f77b9a305" and "4a15e0b11a10e6640b168880bc7ed41f950b0a14" have entirely different histories.
3085e38dfd
...
4a15e0b11a
2
Cargo.lock
generated
2
Cargo.lock
generated
@ -983,7 +983,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "mqtt-exporter"
|
name = "mqtt-exporter"
|
||||||
version = "0.1.2"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"backoff",
|
"backoff",
|
||||||
"bytes",
|
"bytes",
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "mqtt-exporter"
|
name = "mqtt-exporter"
|
||||||
version = "0.1.2"
|
version = "0.1.0"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
@ -20,6 +20,3 @@ serde = { version = "1.0.218", features = ["derive"] }
|
|||||||
serde_regex = "1.1.0"
|
serde_regex = "1.1.0"
|
||||||
tokio = { version = "1.44.0", features = ["net", "rt-multi-thread"] }
|
tokio = { version = "1.44.0", features = ["net", "rt-multi-thread"] }
|
||||||
tokio-util = { version = "0.7.13", features = ["codec"] }
|
tokio-util = { version = "0.7.13", features = ["codec"] }
|
||||||
|
|
||||||
[profile.release]
|
|
||||||
lto = "fat"
|
|
10
Dockerfile
10
Dockerfile
@ -1,12 +1,8 @@
|
|||||||
FROM rust:1 AS builder
|
FROM rust:1.85
|
||||||
|
|
||||||
WORKDIR /usr/src/mqtt-exporter
|
WORKDIR /usr/src/mqtt-exporter
|
||||||
COPY src/ ./src
|
COPY . .
|
||||||
COPY Cargo.* .
|
|
||||||
|
|
||||||
RUN cargo install --path . --root /app
|
RUN cargo install --path .
|
||||||
|
|
||||||
FROM debian:bookworm-slim
|
|
||||||
COPY --from=builder /app/bin/mqtt-exporter /usr/local/bin/mqtt-exporter
|
|
||||||
|
|
||||||
CMD ["mqtt-exporter"]
|
CMD ["mqtt-exporter"]
|
@ -1,5 +0,0 @@
|
|||||||
# mqtt-exporter-rs
|
|
||||||
|
|
||||||
A reimplementation of [mqtt-exporter](https://github.com/kpetremann/mqtt-exporter) in Rust. That's pretty much the gist of it. I was having issues with memory leaks in the Python implementation so I spun this together. It works for my purposes but is far from tested.
|
|
||||||
|
|
||||||
I slightly modified the logic, since I don't understand why some of the topic munging is done and would prefer to keep the topics as-is.
|
|
112
src/main.rs
112
src/main.rs
@ -2,24 +2,25 @@ use backoff::backoff::Backoff;
|
|||||||
use config::Config;
|
use config::Config;
|
||||||
use core::str;
|
use core::str;
|
||||||
use json::JsonValue;
|
use json::JsonValue;
|
||||||
use log::{debug, error, info, warn};
|
use log::{debug, error, info};
|
||||||
use metrics::gauge;
|
use metrics::gauge;
|
||||||
use metrics_exporter_prometheus::PrometheusBuilder;
|
use metrics_exporter_prometheus::PrometheusBuilder;
|
||||||
use metrics_util::MetricKindMask;
|
use metrics_util::MetricKindMask;
|
||||||
|
use mqtt_exporter::transforms::*;
|
||||||
use regex::Regex;
|
use regex::Regex;
|
||||||
use rumqttc::{
|
use rumqttc::{
|
||||||
AsyncClient, ConnAck,
|
AsyncClient,
|
||||||
Event::{self},
|
Event::{self},
|
||||||
EventLoop, MqttOptions, Packet, Publish, QoS, SubscribeFilter,
|
MqttOptions, Packet, Publish, QoS,
|
||||||
};
|
};
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use std::{
|
use std::{
|
||||||
net::{IpAddr, Ipv6Addr},
|
net::{IpAddr, Ipv6Addr},
|
||||||
process::exit,
|
process::exit,
|
||||||
time::{Duration, SystemTime, UNIX_EPOCH},
|
time::Duration,
|
||||||
};
|
};
|
||||||
|
|
||||||
use mqtt_exporter::transforms::*;
|
use std::time::{SystemTime, UNIX_EPOCH};
|
||||||
|
|
||||||
struct ConfigDefaults;
|
struct ConfigDefaults;
|
||||||
impl ConfigDefaults {
|
impl ConfigDefaults {
|
||||||
@ -110,7 +111,6 @@ struct AppConfig {
|
|||||||
}
|
}
|
||||||
struct App {
|
struct App {
|
||||||
config: AppConfig,
|
config: AppConfig,
|
||||||
client: AsyncClient,
|
|
||||||
transformers: Vec<Box<dyn Transform>>,
|
transformers: Vec<Box<dyn Transform>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -178,7 +178,7 @@ impl App {
|
|||||||
let metric_s = metric_path.join("_");
|
let metric_s = metric_path.join("_");
|
||||||
let mut labels = labels.clone();
|
let mut labels = labels.clone();
|
||||||
labels.push((self.config.topic_label.to_owned(), topic.to_owned()));
|
labels.push((self.config.topic_label.to_owned(), topic.to_owned()));
|
||||||
debug!(target: "mqtt-exporter::prom", "metric: {metric_s} value: {v} labels: {labels:?}");
|
debug!("metric: {metric_s} value: {v} labels: {labels:?}");
|
||||||
|
|
||||||
if self.config.expose_last_seen {
|
if self.config.expose_last_seen {
|
||||||
let metric_ts = format!("{}_ts", metric_s);
|
let metric_ts = format!("{}_ts", metric_s);
|
||||||
@ -210,34 +210,18 @@ impl App {
|
|||||||
Ok(json::parse(str::from_utf8(payload)?)?)
|
Ok(json::parse(str::from_utf8(payload)?)?)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_event(&self, event: &Event) {
|
fn handle_event(&self, event: &Event) {
|
||||||
match event {
|
match event {
|
||||||
Event::Incoming(Packet::Publish(notification)) => self.handle_publish(notification),
|
Event::Incoming(Packet::Publish(notification)) => self.handle_publish(notification),
|
||||||
Event::Incoming(Packet::ConnAck(ack)) => self.handle_connack(ack).await,
|
|
||||||
// Event::Incoming(Packet::SubAck(ack)) => self.handle_suback(ack),
|
|
||||||
Event::Outgoing(_) => {}
|
Event::Outgoing(_) => {}
|
||||||
e => debug!(target: "mqtt-exporter::mqtt", "Unhandled event: {:?}", e),
|
e => debug!("Unhandled event: {:?}", e),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_publish(&self, event: &Publish) {
|
fn handle_publish(&self, event: &Publish) {
|
||||||
match Self::deserialize_payload(&event.payload) {
|
match Self::deserialize_payload(&event.payload) {
|
||||||
Ok(parsed) => self.update_metrics(&event.topic, parsed),
|
Ok(parsed) => self.update_metrics(&event.topic, parsed),
|
||||||
Err(e) => {
|
Err(e) => error!("Error deserializing JSON payload: {:?}", e),
|
||||||
error!(target: "mqtt-exporter::mqtt", "Error deserializing JSON payload: {:?}", e)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_connack(&self, ack: &ConnAck) {
|
|
||||||
match ack.code {
|
|
||||||
rumqttc::ConnectReturnCode::Success => {
|
|
||||||
info!(target: "mqtt-exporter::mqtt", "Successfully connected to MQTT broker");
|
|
||||||
self.subscribe().await;
|
|
||||||
}
|
|
||||||
code => {
|
|
||||||
warn!(target: "mqtt-exporter::mqtt", "Non success connection return code {:?}", code)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -247,7 +231,7 @@ impl App {
|
|||||||
if !&self.topic_filter(raw_topic) {
|
if !&self.topic_filter(raw_topic) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
debug!(target: "mqtt-exporter::mqtt", "topic {} payload {}", raw_topic, data.pretty(2));
|
debug!("topic {} payload {}", raw_topic, data.pretty(2));
|
||||||
let mut topic = raw_topic.to_owned();
|
let mut topic = raw_topic.to_owned();
|
||||||
|
|
||||||
(topic, data) = self
|
(topic, data) = self
|
||||||
@ -265,43 +249,45 @@ impl App {
|
|||||||
self.parse_metrics(data, &topic, &topic, metric_path, &labels);
|
self.parse_metrics(data, &topic, &topic, metric_path, &labels);
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn subscribe(&self) {
|
async fn mqtt_task(self) {
|
||||||
match self
|
let mut opts = MqttOptions::new(
|
||||||
.client
|
&self.config.mqtt_client_id,
|
||||||
.subscribe_many(self.config.mqtt_topic.iter().map(|topic| SubscribeFilter {
|
&self.config.mqtt_address,
|
||||||
path: topic.to_owned(),
|
self.config.mqtt_port,
|
||||||
qos: QoS::AtLeastOnce,
|
);
|
||||||
}))
|
opts.set_keep_alive(Duration::from_secs(self.config.mqtt_keepalive))
|
||||||
.await
|
.set_max_packet_size(150000, 150000);
|
||||||
|
|
||||||
|
if let (Some(username), Some(password)) =
|
||||||
|
(&self.config.mqtt_username, &self.config.mqtt_password)
|
||||||
{
|
{
|
||||||
Ok(_) => {
|
opts.set_credentials(username, password);
|
||||||
info!(target: "mqtt-exporter::mqtt", "Successfully subscribed to topics `{}`", &self.config.mqtt_topic.join(" "))
|
}
|
||||||
}
|
|
||||||
Err(e) => {
|
let (client, mut eventloop) = AsyncClient::new(opts, 10);
|
||||||
error!(target: "mqtt-exporter::mqtt", "Failed to subscribe to topics `{}` - {}", &self.config.mqtt_topic.join(" "), e)
|
|
||||||
}
|
for topic in &self.config.mqtt_topic {
|
||||||
}
|
client.subscribe(topic, QoS::AtLeastOnce).await.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn mqtt_task(self, mut eventloop: EventLoop) {
|
|
||||||
let mut bo = backoff::ExponentialBackoff::default();
|
let mut bo = backoff::ExponentialBackoff::default();
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
match eventloop.poll().await {
|
match eventloop.poll().await {
|
||||||
Ok(event) => {
|
Ok(event) => {
|
||||||
bo.reset();
|
bo.reset();
|
||||||
self.handle_event(&event).await
|
self.handle_event(&event)
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!(target: "mqtt-exporter::mqtt", "Connection error: {}", e);
|
error!("Connection error: {}", e);
|
||||||
let delay = bo.next_backoff();
|
let delay = bo.next_backoff();
|
||||||
match delay {
|
match delay {
|
||||||
Some(delay) => {
|
Some(delay) => {
|
||||||
debug!(target: "mqtt-exporter::mqtt", "Backing off for {}s", delay.as_secs_f32());
|
debug!("Backing off for {}s", delay.as_secs_f32());
|
||||||
tokio::time::sleep(delay).await
|
tokio::time::sleep(delay).await
|
||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
error!(target: "mqtt-exporter::mqtt", "Connection timed out, giving up");
|
error!("Connection timed out, giving up");
|
||||||
exit(-1)
|
exit(-1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -321,7 +307,7 @@ impl App {
|
|||||||
builder.install().unwrap();
|
builder.install().unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
fn new(config: AppConfig, client: AsyncClient) -> Self {
|
fn new(config: AppConfig) -> Self {
|
||||||
let mut transformers: Vec<Box<dyn Transform>> = Vec::new();
|
let mut transformers: Vec<Box<dyn Transform>> = Vec::new();
|
||||||
if !config.zwave_topic_prefix.is_empty() {
|
if !config.zwave_topic_prefix.is_empty() {
|
||||||
transformers.push(Box::new(ZwaveNormalize::new(&config.zwave_topic_prefix)))
|
transformers.push(Box::new(ZwaveNormalize::new(&config.zwave_topic_prefix)))
|
||||||
@ -331,8 +317,7 @@ impl App {
|
|||||||
&config.esphome_topic_prefixes,
|
&config.esphome_topic_prefixes,
|
||||||
)))
|
)))
|
||||||
}
|
}
|
||||||
if !config.hubitat_topic_prefixes.is_empty() && !config.hubitat_topic_prefixes[0].is_empty()
|
if !config.hubitat_topic_prefixes.is_empty() {
|
||||||
{
|
|
||||||
transformers.push(Box::new(HubitatNormalize::new(
|
transformers.push(Box::new(HubitatNormalize::new(
|
||||||
&config.hubitat_topic_prefixes,
|
&config.hubitat_topic_prefixes,
|
||||||
)))
|
)))
|
||||||
@ -340,11 +325,9 @@ impl App {
|
|||||||
if config.zigbee2mqtt_availability {
|
if config.zigbee2mqtt_availability {
|
||||||
transformers.push(Box::new(Zigbee2MqttAvailability::default()))
|
transformers.push(Box::new(Zigbee2MqttAvailability::default()))
|
||||||
}
|
}
|
||||||
debug!(target: "mqtt-exporter::config", "Transformers enabled: {:?}", transformers);
|
debug!("Transformers enabled: {:?}", transformers);
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
config,
|
config,
|
||||||
client,
|
|
||||||
transformers,
|
transformers,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -368,25 +351,10 @@ async fn main() {
|
|||||||
)
|
)
|
||||||
.build()
|
.build()
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let app_config: AppConfig = config.try_deserialize().unwrap();
|
let app_config = config.try_deserialize().unwrap();
|
||||||
info!(target: "mqtt-exporter::config", "Loaded config: {app_config:?}");
|
info!("Loaded config: {app_config:?}");
|
||||||
|
|
||||||
let mut opts = MqttOptions::new(
|
let app = App::new(app_config);
|
||||||
&app_config.mqtt_client_id,
|
|
||||||
&app_config.mqtt_address,
|
|
||||||
app_config.mqtt_port,
|
|
||||||
);
|
|
||||||
opts.set_keep_alive(Duration::from_secs(app_config.mqtt_keepalive))
|
|
||||||
.set_max_packet_size(150000, 150000);
|
|
||||||
|
|
||||||
if let (Some(username), Some(password)) = (&app_config.mqtt_username, &app_config.mqtt_password)
|
|
||||||
{
|
|
||||||
opts.set_credentials(username, password);
|
|
||||||
}
|
|
||||||
|
|
||||||
let (client, mut eventloop) = AsyncClient::new(opts, 10);
|
|
||||||
|
|
||||||
let app = App::new(app_config, client);
|
|
||||||
app.prom_task().await;
|
app.prom_task().await;
|
||||||
app.mqtt_task(eventloop).await;
|
app.mqtt_task().await;
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user