initial deployment
This commit is contained in:
parent
d2ed54c377
commit
38006566af
@ -20,3 +20,6 @@ serde = { version = "1.0.218", features = ["derive"] }
|
|||||||
serde_regex = "1.1.0"
|
serde_regex = "1.1.0"
|
||||||
tokio = { version = "1.44.0", features = ["net", "rt-multi-thread"] }
|
tokio = { version = "1.44.0", features = ["net", "rt-multi-thread"] }
|
||||||
tokio-util = { version = "0.7.13", features = ["codec"] }
|
tokio-util = { version = "0.7.13", features = ["codec"] }
|
||||||
|
|
||||||
|
[profile.release]
|
||||||
|
lto = "fat"
|
65
src/main.rs
65
src/main.rs
@ -2,25 +2,24 @@ use backoff::backoff::Backoff;
|
|||||||
use config::Config;
|
use config::Config;
|
||||||
use core::str;
|
use core::str;
|
||||||
use json::JsonValue;
|
use json::JsonValue;
|
||||||
use log::{debug, error, info};
|
use log::{debug, error, info, warn};
|
||||||
use metrics::gauge;
|
use metrics::gauge;
|
||||||
use metrics_exporter_prometheus::PrometheusBuilder;
|
use metrics_exporter_prometheus::PrometheusBuilder;
|
||||||
use metrics_util::MetricKindMask;
|
use metrics_util::MetricKindMask;
|
||||||
use mqtt_exporter::transforms::*;
|
|
||||||
use regex::Regex;
|
use regex::Regex;
|
||||||
use rumqttc::{
|
use rumqttc::{
|
||||||
AsyncClient,
|
AsyncClient, ConnAck,
|
||||||
Event::{self},
|
Event::{self},
|
||||||
MqttOptions, Packet, Publish, QoS,
|
MqttOptions, Packet, Publish, QoS, SubscribeFilter,
|
||||||
};
|
};
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use std::{
|
use std::{
|
||||||
net::{IpAddr, Ipv6Addr},
|
net::{IpAddr, Ipv6Addr},
|
||||||
process::exit,
|
process::exit,
|
||||||
time::Duration,
|
time::{Duration, SystemTime, UNIX_EPOCH},
|
||||||
};
|
};
|
||||||
|
|
||||||
use std::time::{SystemTime, UNIX_EPOCH};
|
use mqtt_exporter::transforms::*;
|
||||||
|
|
||||||
struct ConfigDefaults;
|
struct ConfigDefaults;
|
||||||
impl ConfigDefaults {
|
impl ConfigDefaults {
|
||||||
@ -178,7 +177,7 @@ impl App {
|
|||||||
let metric_s = metric_path.join("_");
|
let metric_s = metric_path.join("_");
|
||||||
let mut labels = labels.clone();
|
let mut labels = labels.clone();
|
||||||
labels.push((self.config.topic_label.to_owned(), topic.to_owned()));
|
labels.push((self.config.topic_label.to_owned(), topic.to_owned()));
|
||||||
debug!("metric: {metric_s} value: {v} labels: {labels:?}");
|
debug!(target: "mqtt-exporter::prom", "metric: {metric_s} value: {v} labels: {labels:?}");
|
||||||
|
|
||||||
if self.config.expose_last_seen {
|
if self.config.expose_last_seen {
|
||||||
let metric_ts = format!("{}_ts", metric_s);
|
let metric_ts = format!("{}_ts", metric_s);
|
||||||
@ -213,15 +212,30 @@ impl App {
|
|||||||
fn handle_event(&self, event: &Event) {
|
fn handle_event(&self, event: &Event) {
|
||||||
match event {
|
match event {
|
||||||
Event::Incoming(Packet::Publish(notification)) => self.handle_publish(notification),
|
Event::Incoming(Packet::Publish(notification)) => self.handle_publish(notification),
|
||||||
|
Event::Incoming(Packet::ConnAck(ack)) => self.handle_connack(ack),
|
||||||
|
// Event::Incoming(Packet::SubAck(ack)) => self.handle_suback(ack),
|
||||||
Event::Outgoing(_) => {}
|
Event::Outgoing(_) => {}
|
||||||
e => debug!("Unhandled event: {:?}", e),
|
e => debug!(target: "mqtt-exporter::mqtt", "Unhandled event: {:?}", e),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_publish(&self, event: &Publish) {
|
fn handle_publish(&self, event: &Publish) {
|
||||||
match Self::deserialize_payload(&event.payload) {
|
match Self::deserialize_payload(&event.payload) {
|
||||||
Ok(parsed) => self.update_metrics(&event.topic, parsed),
|
Ok(parsed) => self.update_metrics(&event.topic, parsed),
|
||||||
Err(e) => error!("Error deserializing JSON payload: {:?}", e),
|
Err(e) => {
|
||||||
|
error!(target: "mqtt-exporter::mqtt", "Error deserializing JSON payload: {:?}", e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_connack(&self, ack: &ConnAck) {
|
||||||
|
match ack.code {
|
||||||
|
rumqttc::ConnectReturnCode::Success => {
|
||||||
|
info!(target: "mqtt-exporter::mqtt", "Successfully connected to MQTT broker")
|
||||||
|
}
|
||||||
|
code => {
|
||||||
|
warn!(target: "mqtt-exporter::mqtt", "Non success connection return code {:?}", code)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -231,7 +245,7 @@ impl App {
|
|||||||
if !&self.topic_filter(raw_topic) {
|
if !&self.topic_filter(raw_topic) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
debug!("topic {} payload {}", raw_topic, data.pretty(2));
|
debug!(target: "mqtt-exporter::mqtt", "topic {} payload {}", raw_topic, data.pretty(2));
|
||||||
let mut topic = raw_topic.to_owned();
|
let mut topic = raw_topic.to_owned();
|
||||||
|
|
||||||
(topic, data) = self
|
(topic, data) = self
|
||||||
@ -249,6 +263,23 @@ impl App {
|
|||||||
self.parse_metrics(data, &topic, &topic, metric_path, &labels);
|
self.parse_metrics(data, &topic, &topic, metric_path, &labels);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn subscribe(&self, client: &AsyncClient) {
|
||||||
|
match client
|
||||||
|
.subscribe_many(self.config.mqtt_topic.iter().map(|topic| SubscribeFilter {
|
||||||
|
path: topic.to_owned(),
|
||||||
|
qos: QoS::AtLeastOnce,
|
||||||
|
}))
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(_) => {
|
||||||
|
info!(target: "mqtt-exporter::mqtt", "Successfully subscribed to topics `{}`", &self.config.mqtt_topic.join(" "))
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
error!(target: "mqtt-exporter::mqtt", "Failed to subscribe to topics `{}` - {}", &self.config.mqtt_topic.join(" "), e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async fn mqtt_task(self) {
|
async fn mqtt_task(self) {
|
||||||
let mut opts = MqttOptions::new(
|
let mut opts = MqttOptions::new(
|
||||||
&self.config.mqtt_client_id,
|
&self.config.mqtt_client_id,
|
||||||
@ -266,9 +297,7 @@ impl App {
|
|||||||
|
|
||||||
let (client, mut eventloop) = AsyncClient::new(opts, 10);
|
let (client, mut eventloop) = AsyncClient::new(opts, 10);
|
||||||
|
|
||||||
for topic in &self.config.mqtt_topic {
|
self.subscribe(&client).await;
|
||||||
client.subscribe(topic, QoS::AtLeastOnce).await.unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut bo = backoff::ExponentialBackoff::default();
|
let mut bo = backoff::ExponentialBackoff::default();
|
||||||
|
|
||||||
@ -279,15 +308,15 @@ impl App {
|
|||||||
self.handle_event(&event)
|
self.handle_event(&event)
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("Connection error: {}", e);
|
error!(target: "mqtt-exporter::mqtt", "Connection error: {}", e);
|
||||||
let delay = bo.next_backoff();
|
let delay = bo.next_backoff();
|
||||||
match delay {
|
match delay {
|
||||||
Some(delay) => {
|
Some(delay) => {
|
||||||
debug!("Backing off for {}s", delay.as_secs_f32());
|
debug!(target: "mqtt-exporter::mqtt", "Backing off for {}s", delay.as_secs_f32());
|
||||||
tokio::time::sleep(delay).await
|
tokio::time::sleep(delay).await
|
||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
error!("Connection timed out, giving up");
|
error!(target: "mqtt-exporter::mqtt", "Connection timed out, giving up");
|
||||||
exit(-1)
|
exit(-1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -325,7 +354,7 @@ impl App {
|
|||||||
if config.zigbee2mqtt_availability {
|
if config.zigbee2mqtt_availability {
|
||||||
transformers.push(Box::new(Zigbee2MqttAvailability::default()))
|
transformers.push(Box::new(Zigbee2MqttAvailability::default()))
|
||||||
}
|
}
|
||||||
debug!("Transformers enabled: {:?}", transformers);
|
debug!(target: "mqtt-exporter::config", "Transformers enabled: {:?}", transformers);
|
||||||
Self {
|
Self {
|
||||||
config,
|
config,
|
||||||
transformers,
|
transformers,
|
||||||
@ -352,7 +381,7 @@ async fn main() {
|
|||||||
.build()
|
.build()
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let app_config = config.try_deserialize().unwrap();
|
let app_config = config.try_deserialize().unwrap();
|
||||||
info!("Loaded config: {app_config:?}");
|
info!(target: "mqtt-exporter::config", "Loaded config: {app_config:?}");
|
||||||
|
|
||||||
let app = App::new(app_config);
|
let app = App::new(app_config);
|
||||||
app.prom_task().await;
|
app.prom_task().await;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user