Compare commits

8 Commits

8 changed files with 442 additions and 156 deletions

1
.gitignore vendored
View File

@@ -1 +1,2 @@
/target
test

2
Cargo.lock generated
View File

@@ -983,7 +983,7 @@ dependencies = [
[[package]]
name = "mqtt-exporter"
version = "0.1.0"
version = "0.1.2"
dependencies = [
"backoff",
"bytes",

View File

@@ -1,6 +1,6 @@
[package]
name = "mqtt-exporter"
version = "0.1.0"
version = "0.1.2"
edition = "2021"
[dependencies]
@@ -20,3 +20,6 @@ serde = { version = "1.0.218", features = ["derive"] }
serde_regex = "1.1.0"
tokio = { version = "1.44.0", features = ["net", "rt-multi-thread"] }
tokio-util = { version = "0.7.13", features = ["codec"] }
[profile.release]
lto = "fat"

12
Dockerfile Normal file
View File

@@ -0,0 +1,12 @@
FROM rust:1 AS builder
WORKDIR /usr/src/mqtt-exporter
COPY src/ ./src
COPY Cargo.* .
RUN cargo install --path . --root /app
FROM debian:bookworm-slim
COPY --from=builder /app/bin/mqtt-exporter /usr/local/bin/mqtt-exporter
CMD ["mqtt-exporter"]

5
README.md Normal file
View File

@@ -0,0 +1,5 @@
# mqtt-exporter-rs
A reimplementation of [mqtt-exporter](https://github.com/kpetremann/mqtt-exporter) in Rust. That's pretty much the gist of it. I was having issues with memory leaks in the Python implementation so I spun this together. It works for my purposes but is far from tested.
I slightly modified the logic, since I don't understand why some of the topic munging is done and would prefer to keep the topics as-is.

1
src/lib.rs Normal file
View File

@@ -0,0 +1 @@
pub mod transforms;

View File

@@ -1,46 +1,67 @@
use backoff::{backoff::Backoff, ExponentialBackoff};
use backoff::backoff::Backoff;
use config::Config;
use core::str;
use itertools::Itertools;
use json::JsonValue;
use log::{debug, error, info, warn};
use metrics::gauge;
use metrics_exporter_prometheus::PrometheusBuilder;
use metrics_util::{
registry::{AtomicStorage, GenerationalAtomicStorage, Registry},
MetricKindMask,
};
use metrics_util::MetricKindMask;
use regex::Regex;
use rumqttc::{
tokio_rustls::client,
AsyncClient,
Event::{self, Incoming},
MqttOptions, Packet, Publish, QoS,
AsyncClient, ConnAck,
Event::{self},
EventLoop, MqttOptions, Packet, Publish, QoS, SubscribeFilter,
};
use serde::Deserialize;
use std::{collections::HashMap, error::Error, iter::repeat, process::exit, time::Duration};
use tokio::{join, net::TcpStream};
use std::{
net::{IpAddr, Ipv6Addr},
process::exit,
time::{Duration, SystemTime, UNIX_EPOCH},
};
use mqtt_exporter::transforms::*;
struct ConfigDefaults;
impl ConfigDefaults {
fn address() -> String {
fn mqtt_address() -> String {
"localhost".to_owned()
}
fn port() -> u16 {
fn mqtt_port() -> u16 {
1883
}
fn topic() -> Vec<String> {
fn mqtt_topic() -> Vec<String> {
vec!["#".to_owned()]
}
fn keepalive() -> u64 {
fn mqtt_keepalive() -> u64 {
60
}
fn client_id() -> String {
fn mqtt_client_id() -> String {
"mqtt-exporter-rs".to_owned()
}
fn expose_client_id() -> bool {
fn mqtt_expose_client_id() -> bool {
true
}
fn expose_last_seen() -> bool {
false
}
fn prometheus_address() -> IpAddr {
IpAddr::V6(Ipv6Addr::from_bits(0))
}
fn prometheus_port() -> u16 {
9000
}
fn prometheus_prefix() -> String {
"mqtt".to_owned()
}
fn topic_label() -> String {
"topic".to_owned()
}
fn hubitat_topic_prefixes() -> Vec<String> {
vec!["hubitat".to_owned()]
}
fn zigbee2mqtt_availability() -> bool {
false
}
fn zwave_topic_prefix() -> String {
"zwave/".to_owned()
}
@@ -48,35 +69,52 @@ impl ConfigDefaults {
#[derive(Debug, Deserialize)]
struct AppConfig {
#[serde(default = "ConfigDefaults::address")]
address: String,
#[serde(default = "ConfigDefaults::port")]
port: u16,
#[serde(default = "ConfigDefaults::topic")]
topic: Vec<String>,
#[serde(default = "ConfigDefaults::keepalive")]
keepalive: u64,
#[serde(default = "ConfigDefaults::mqtt_address")]
mqtt_address: String,
#[serde(default = "ConfigDefaults::mqtt_port")]
mqtt_port: u16,
#[serde(default = "ConfigDefaults::mqtt_topic")]
mqtt_topic: Vec<String>,
#[serde(default = "ConfigDefaults::mqtt_keepalive")]
mqtt_keepalive: u64,
#[serde(default)]
username: Option<String>,
mqtt_username: Option<String>,
#[serde(default)]
password: Option<String>,
#[serde(default = "ConfigDefaults::client_id")]
client_id: String,
#[serde(default = "ConfigDefaults::expose_client_id")]
expose_client_id: bool,
mqtt_password: Option<String>,
#[serde(default = "ConfigDefaults::mqtt_client_id")]
mqtt_client_id: String,
#[serde(default = "ConfigDefaults::mqtt_expose_client_id")]
mqtt_expose_client_id: bool,
#[serde(default = "ConfigDefaults::expose_last_seen")]
expose_last_seen: bool,
#[serde(default)]
ignored_topics: Vec<String>,
mqtt_ignored_topics: Vec<String>,
#[serde(with = "serde_regex", default)]
ignored_topics_re: Option<Regex>,
mqtt_ignored_topics_re: Option<Regex>,
#[serde(default = "ConfigDefaults::prometheus_address")]
prometheus_address: IpAddr,
#[serde(default = "ConfigDefaults::prometheus_port")]
prometheus_port: u16,
#[serde(default = "ConfigDefaults::prometheus_prefix")]
prometheus_prefix: String,
#[serde(default = "ConfigDefaults::topic_label")]
topic_label: String,
#[serde(default)]
esphome_topic_prefixes: Vec<String>,
#[serde(default = "ConfigDefaults::hubitat_topic_prefixes")]
hubitat_topic_prefixes: Vec<String>,
#[serde(default = "ConfigDefaults::zigbee2mqtt_availability")]
zigbee2mqtt_availability: bool,
#[serde(default = "ConfigDefaults::zwave_topic_prefix")]
zwave_topic_prefix: String,
}
struct App {
config: AppConfig,
client: AsyncClient,
transformers: Vec<Box<dyn Transform>>,
}
impl App {
fn parse_number(v: &str) -> Option<f64> {
fn parse_number(v: &str) -> Option<f64> {
if let Ok(num) = v.parse::<f64>() {
Some(num)
} else if v.eq_ignore_ascii_case("on")
@@ -92,8 +130,9 @@ impl App {
} else {
None
}
}
}
impl App {
fn parse_metric(value: &JsonValue) -> Option<f64> {
match value {
JsonValue::Null => None,
@@ -105,8 +144,8 @@ impl App {
}
}
JsonValue::Number(v) => Some((*v).into()),
JsonValue::Short(v) => Self::parse_number(v.as_str()),
JsonValue::String(v) => Self::parse_number(v.as_str()),
JsonValue::Short(v) => parse_number(v.as_str()),
JsonValue::String(v) => parse_number(v.as_str()),
_ => None,
}
}
@@ -115,45 +154,39 @@ impl App {
&self,
mut data: JsonValue,
topic: &str,
original_topic: &str,
prefix: &str,
labels: &Option<Vec<(String, String)>>,
_original_topic: &str,
metric_path: Vec<String>,
labels: &Vec<(String, String)>,
) {
match data {
JsonValue::Array(_) => {
for (metric, child) in data.members_mut().enumerate() {
let metric = metric.to_string();
self.parse_metrics(
child.take(),
topic,
original_topic,
&format!("{prefix}{metric}_"),
labels,
)
let mut metric_path = metric_path.clone();
metric_path.push(metric.to_string());
self.parse_metrics(child.take(), topic, _original_topic, metric_path, labels)
}
}
JsonValue::Object(_) => {
for (metric, child) in data.entries_mut() {
self.parse_metrics(
child.take(),
topic,
original_topic,
&format!("{prefix}_{metric}"),
labels,
)
let mut metric_path = metric_path.clone();
metric_path.push(metric.to_string());
self.parse_metrics(child.take(), topic, _original_topic, metric_path, labels)
}
}
_ => {
if let Some(v) = Self::parse_metric(&data) {
debug!("prometheus metric: {prefix} value: {v}");
let mut labels = if let Some(labels) = labels {
labels.clone()
} else {
Vec::new()
};
labels.push(("topic".to_owned(), topic.to_owned()));
let owned_pfx = prefix.to_owned();
let gauge = gauge!(owned_pfx, &labels);
let metric_s = metric_path.join("_");
let mut labels = labels.clone();
labels.push((self.config.topic_label.to_owned(), topic.to_owned()));
debug!(target: "mqtt-exporter::prom", "metric: {metric_s} value: {v} labels: {labels:?}");
if self.config.expose_last_seen {
let metric_ts = format!("{}_ts", metric_s);
let ts_gauge = gauge!(metric_ts, &labels);
let ts = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
ts_gauge.set(ts)
}
let gauge = gauge!(metric_s, &labels);
gauge.set(v);
}
}
@@ -163,12 +196,12 @@ impl App {
fn topic_filter(&self, topic: &str) -> bool {
!self
.config
.ignored_topics
.mqtt_ignored_topics
.iter()
.any(|pfx| topic.starts_with(pfx))
&& !self
.config
.ignored_topics_re
.mqtt_ignored_topics_re
.as_ref()
.is_some_and(|re| re.is_match(topic))
}
@@ -177,103 +210,98 @@ impl App {
Ok(json::parse(str::from_utf8(payload)?)?)
}
fn handle_event(&self, event: &Event) {
async fn handle_event(&self, event: &Event) {
match event {
Event::Incoming(Packet::Publish(notification)) => self.handle_publish(notification),
Event::Incoming(Packet::ConnAck(ack)) => self.handle_connack(ack).await,
// Event::Incoming(Packet::SubAck(ack)) => self.handle_suback(ack),
Event::Outgoing(_) => {}
e => debug!("Unhandled event: {:?}", e),
e => debug!(target: "mqtt-exporter::mqtt", "Unhandled event: {:?}", e),
}
}
fn handle_publish(&self, event: &Publish) {
match Self::deserialize_payload(&event.payload) {
Ok(parsed) => self.update_metrics(&event.topic, parsed),
Err(e) => error!("Error deserializing JSON payload: {:?}", e),
Err(e) => {
error!(target: "mqtt-exporter::mqtt", "Error deserializing JSON payload: {:?}", e)
}
}
}
async fn handle_connack(&self, ack: &ConnAck) {
match ack.code {
rumqttc::ConnectReturnCode::Success => {
info!(target: "mqtt-exporter::mqtt", "Successfully connected to MQTT broker");
self.subscribe().await;
}
code => {
warn!(target: "mqtt-exporter::mqtt", "Non success connection return code {:?}", code)
}
}
}
fn update_metrics(&self, raw_topic: &str, mut data: JsonValue) {
let mut labels = Vec::new();
if !&self.topic_filter(raw_topic) {
return;
}
let mut topic = raw_topic.replace('/', "_");
debug!(target: "mqtt-exporter::mqtt", "topic {} payload {}", raw_topic, data.pretty(2));
let mut topic = raw_topic.to_owned();
if topic.starts_with(&self.config.zwave_topic_prefix) {
(topic, data) = Self::normalize_zwave2mqtt(&topic, data);
}
self.parse_metrics(data, &topic, &topic, "mqtt", &None);
}
fn normalize_zwave2mqtt(topic: &str, mut data: JsonValue) -> (String, JsonValue) {
if topic.contains("node_info") || !topic.contains("endpoint_") {
return (topic.to_owned(), JsonValue::Null);
}
if !data.is_object() || !data.contains("value") {
return (topic.to_owned(), JsonValue::Null);
}
let parts = topic.split('/').collect_vec();
if let Some((properties_index, _)) = parts
(topic, data) = self
.transformers
.iter()
.enumerate()
.find(|(_, v)| v.starts_with("endpoint_"))
.filter(|txfm| txfm.applies_to(raw_topic))
.fold((topic, data), |(topic, data), txfm| {
txfm.transform(&topic, data, &mut labels)
});
// Should be a transform itself?
// let topic = topic.replace('/', "_");
let metric_path = vec![self.config.prometheus_prefix.clone()];
self.parse_metrics(data, &topic, &topic, metric_path, &labels);
}
async fn subscribe(&self) {
match self
.client
.subscribe_many(self.config.mqtt_topic.iter().map(|topic| SubscribeFilter {
path: topic.to_owned(),
qos: QoS::AtLeastOnce,
}))
.await
{
let parts = topic.split('/').collect_vec();
let topic = parts[..properties_index]
.iter()
.map(|s| s.to_lowercase())
.join("/");
let properties = parts[properties_index..]
.iter()
.map(|s| s.to_lowercase())
.join("_");
let mut value = JsonValue::new_object();
value.insert(&properties, data.remove("value")).unwrap();
return (topic, value);
} else {
error!("Unable to find propreties index topic: {topic:?} value: {data:?}");
return (topic.to_owned(), JsonValue::Null);
Ok(_) => {
info!(target: "mqtt-exporter::mqtt", "Successfully subscribed to topics `{}`", &self.config.mqtt_topic.join(" "))
}
Err(e) => {
error!(target: "mqtt-exporter::mqtt", "Failed to subscribe to topics `{}` - {}", &self.config.mqtt_topic.join(" "), e)
}
}
}
async fn connection_task(self) {
let mut opts = MqttOptions::new(
&self.config.client_id,
&self.config.address,
self.config.port,
);
opts.set_keep_alive(Duration::from_secs(self.config.keepalive))
.set_max_packet_size(150000, 150000);
if let (Some(username), Some(password)) = (&self.config.username, &self.config.password) {
opts.set_credentials(username, password);
}
let (client, mut eventloop) = AsyncClient::new(opts, 10);
for topic in &self.config.topic {
client.subscribe(topic, QoS::AtLeastOnce).await.unwrap();
}
async fn mqtt_task(self, mut eventloop: EventLoop) {
let mut bo = backoff::ExponentialBackoff::default();
loop {
match eventloop.poll().await {
Ok(event) => {
bo.reset();
self.handle_event(&event)
self.handle_event(&event).await
}
Err(e) => {
error!("Connection error: {}", e);
error!(target: "mqtt-exporter::mqtt", "Connection error: {}", e);
let delay = bo.next_backoff();
match delay {
Some(delay) => {
debug!("Backing off for {}s", delay.as_secs_f32());
debug!(target: "mqtt-exporter::mqtt", "Backing off for {}s", delay.as_secs_f32());
tokio::time::sleep(delay).await
}
None => {
error!("Connection timed out, giving up");
error!(target: "mqtt-exporter::mqtt", "Connection timed out, giving up");
exit(-1)
}
}
@@ -283,14 +311,43 @@ impl App {
}
async fn prom_task(&self) {
let builder = PrometheusBuilder::new()
.with_http_listener((self.config.prometheus_address, self.config.prometheus_port))
.idle_timeout(MetricKindMask::ALL, Some(Duration::from_secs(300)));
let builder = if self.config.expose_client_id {
builder.add_global_label("client_id", &self.config.client_id)
let builder = if self.config.mqtt_expose_client_id {
builder.add_global_label("client_id", &self.config.mqtt_client_id)
} else {
builder
};
builder.install().unwrap();
}
fn new(config: AppConfig, client: AsyncClient) -> Self {
let mut transformers: Vec<Box<dyn Transform>> = Vec::new();
if !config.zwave_topic_prefix.is_empty() {
transformers.push(Box::new(ZwaveNormalize::new(&config.zwave_topic_prefix)))
}
if !config.esphome_topic_prefixes.is_empty() {
transformers.push(Box::new(EsphomeNormalize::new(
&config.esphome_topic_prefixes,
)))
}
if !config.hubitat_topic_prefixes.is_empty() && !config.hubitat_topic_prefixes[0].is_empty()
{
transformers.push(Box::new(HubitatNormalize::new(
&config.hubitat_topic_prefixes,
)))
}
if config.zigbee2mqtt_availability {
transformers.push(Box::new(Zigbee2MqttAvailability::default()))
}
debug!(target: "mqtt-exporter::config", "Transformers enabled: {:?}", transformers);
Self {
config,
client,
transformers,
}
}
}
#[tokio::main]
@@ -299,19 +356,37 @@ async fn main() {
let config = Config::builder()
.add_source(
config::Environment::with_prefix("MQTT")
config::Environment::with_prefix("")
.prefix_separator("")
.try_parsing(true)
.separator("__")
.list_separator(",")
.with_list_parse_key("topic")
.with_list_parse_key("ignored_topics"),
.with_list_parse_key("mqtt_topic")
.with_list_parse_key("mqtt_ignored_topics")
.with_list_parse_key("esphome_topic_prefixes")
.with_list_parse_key("hubitat_topic_prefixes"),
)
.build()
.unwrap();
let app_config = config.try_deserialize().unwrap();
info!("Loaded config: {app_config:?}");
let app_config: AppConfig = config.try_deserialize().unwrap();
info!(target: "mqtt-exporter::config", "Loaded config: {app_config:?}");
let app = App { config: app_config };
let mut opts = MqttOptions::new(
&app_config.mqtt_client_id,
&app_config.mqtt_address,
app_config.mqtt_port,
);
opts.set_keep_alive(Duration::from_secs(app_config.mqtt_keepalive))
.set_max_packet_size(150000, 150000);
if let (Some(username), Some(password)) = (&app_config.mqtt_username, &app_config.mqtt_password)
{
opts.set_credentials(username, password);
}
let (client, mut eventloop) = AsyncClient::new(opts, 10);
let app = App::new(app_config, client);
app.prom_task().await;
let task = tokio::spawn(app.connection_task());
task.await.unwrap();
app.mqtt_task(eventloop).await;
}

189
src/transforms.rs Normal file
View File

@@ -0,0 +1,189 @@
use itertools::Itertools;
use json::JsonValue;
use log::{debug, error};
use std::fmt::Debug;
pub trait Transform: Debug {
fn applies_to(&self, topic: &str) -> bool;
fn transform(
&self,
topic: &str,
data: JsonValue,
labels: &mut Vec<(String, String)>,
) -> (String, JsonValue);
}
#[derive(Debug)]
pub struct ZwaveNormalize {
topic_prefix: String,
}
impl ZwaveNormalize {
pub fn new(topic_prefix: &str) -> Self {
Self {
topic_prefix: topic_prefix.to_owned(),
}
}
}
impl Transform for ZwaveNormalize {
fn applies_to(&self, topic: &str) -> bool {
topic.starts_with(&self.topic_prefix)
}
fn transform(
&self,
topic: &str,
mut data: JsonValue,
labels: &mut Vec<(String, String)>,
) -> (String, JsonValue) {
if topic.contains("nodeinfo") || !topic.contains("endpoint_") {
debug!("zwave topic isn't relevant");
return (topic.to_owned(), JsonValue::Null);
}
if !data.is_object() || !data.has_key("value") {
debug!("zwave payload doesn't include value");
return (topic.to_owned(), JsonValue::Null);
}
debug!("Normalizing zwave2mqtt payload");
let parts = topic.split('/').collect_vec();
if let Some((properties_index, _)) = parts
.iter()
.enumerate()
.find(|(_, v)| v.starts_with("endpoint_"))
{
let parts = topic.split('/').collect_vec();
let topic = parts[..properties_index]
.iter()
.map(|s| s.to_lowercase())
.join("/");
if let Ok(endpoint) = parts[properties_index]
.split_at("endpoint_".len())
.1
.parse::<u64>()
{
labels.push(("endpoint".to_owned(), endpoint.to_string()));
} else {
labels.push(("endpoint".to_owned(), parts[properties_index].to_owned()))
}
let properties = parts[properties_index + 1..]
.iter()
.map(|s| s.to_lowercase())
.join("_");
let mut value = JsonValue::new_object();
value.insert(&properties, data.remove("value")).unwrap();
debug!("Normalized topic: {topic} value: {value}");
(topic, value)
} else {
error!(target: "ZwaveNormalize", "Unable to find propreties index topic: {topic:?} value: {data:?}");
(topic.to_owned(), JsonValue::Null)
}
}
}
#[derive(Debug)]
pub struct EsphomeNormalize {
topic_prefixes: Vec<String>,
}
impl EsphomeNormalize {
pub fn new(topic_prefixes: &[String]) -> Self {
Self {
topic_prefixes: topic_prefixes.to_vec(),
}
}
}
// Not sure why you'd want this, it de-normalizes the values since it moves them to separate topics per esphome node, but okay...
impl Transform for EsphomeNormalize {
fn applies_to(&self, topic: &str) -> bool {
self.topic_prefixes.iter().any(|pfx| topic.starts_with(pfx))
}
fn transform(
&self,
topic: &str,
data: JsonValue,
labels: &mut Vec<(String, String)>,
) -> (String, JsonValue) {
// <TOPIC_PREFIX>/<COMPONENT_TYPE>/<COMPONENT_NAME>/state
let parts = topic.split('/').collect_vec();
labels.push(("esphome_type".to_owned(), parts[1].to_owned()));
labels.push(("esphome_name".to_owned(), parts[2].to_owned()));
let topic = format!("{}/{}", parts[0].to_lowercase(), parts[1].to_lowercase());
let mut value = JsonValue::new_object();
value.insert(parts[parts.len() - 2], data).unwrap();
(topic, value)
}
}
#[derive(Debug)]
pub struct HubitatNormalize {
topic_prefixes: Vec<String>,
}
impl HubitatNormalize {
pub fn new(topic_prefixes: &[String]) -> Self {
Self {
topic_prefixes: topic_prefixes.to_vec(),
}
}
}
impl Transform for HubitatNormalize {
fn applies_to(&self, topic: &str) -> bool {
self.topic_prefixes.iter().any(|pfx| topic.starts_with(pfx))
}
fn transform(
&self,
topic: &str,
data: JsonValue,
labels: &mut Vec<(String, String)>,
) -> (String, JsonValue) {
// hubitat/hub1/some room/temperature/value
let parts = topic.split('/').collect_vec();
if parts.len() < 3 {
return (topic.to_owned(), data);
}
let topic = parts[0..3].iter().map(|s| s.to_lowercase()).join("_");
labels.push(("hubitat_hub".to_owned(), parts[1].to_owned()));
labels.push(("hubitat_room".to_owned(), parts[2].to_owned()));
let mut value = JsonValue::new_object();
value.insert(parts[parts.len() - 2], data).unwrap();
(topic, value)
}
}
const ZIGBEE2MQTT_AVAILABILITY_SUFFIX: &str = "availability";
#[derive(Debug, Default)]
pub struct Zigbee2MqttAvailability {}
impl Transform for Zigbee2MqttAvailability {
fn applies_to(&self, topic: &str) -> bool {
topic.ends_with(ZIGBEE2MQTT_AVAILABILITY_SUFFIX)
}
fn transform(
&self,
topic: &str,
mut data: JsonValue,
_labels: &mut Vec<(String, String)>,
) -> (String, JsonValue) {
if data.has_key("state") {
let topic = topic
.split_at(topic.len() - ZIGBEE2MQTT_AVAILABILITY_SUFFIX.len() - 1)
.0;
let mut value = JsonValue::new_object();
value
.insert("zigbee_availability", data.remove("state"))
.unwrap();
(topic.to_owned(), value)
} else {
(topic.to_owned(), data)
}
}
}