wait for connection to subscribe to fix retry

This commit is contained in:
Keenan Tims 2025-03-09 01:39:50 -08:00
parent cc4e1503f6
commit 374e8f07e0
Signed by: ktims
GPG Key ID: 11230674D69038D4
3 changed files with 37 additions and 34 deletions

2
Cargo.lock generated
View File

@ -983,7 +983,7 @@ dependencies = [
[[package]] [[package]]
name = "mqtt-exporter" name = "mqtt-exporter"
version = "0.1.1" version = "0.1.2"
dependencies = [ dependencies = [
"backoff", "backoff",
"bytes", "bytes",

View File

@ -1,6 +1,6 @@
[package] [package]
name = "mqtt-exporter" name = "mqtt-exporter"
version = "0.1.1" version = "0.1.2"
edition = "2021" edition = "2021"
[dependencies] [dependencies]

View File

@ -10,7 +10,7 @@ use regex::Regex;
use rumqttc::{ use rumqttc::{
AsyncClient, ConnAck, AsyncClient, ConnAck,
Event::{self}, Event::{self},
MqttOptions, Packet, Publish, QoS, SubscribeFilter, EventLoop, MqttOptions, Packet, Publish, QoS, SubscribeFilter,
}; };
use serde::Deserialize; use serde::Deserialize;
use std::{ use std::{
@ -110,6 +110,7 @@ struct AppConfig {
} }
struct App { struct App {
config: AppConfig, config: AppConfig,
client: AsyncClient,
transformers: Vec<Box<dyn Transform>>, transformers: Vec<Box<dyn Transform>>,
} }
@ -209,10 +210,10 @@ impl App {
Ok(json::parse(str::from_utf8(payload)?)?) Ok(json::parse(str::from_utf8(payload)?)?)
} }
fn handle_event(&self, event: &Event) { async fn handle_event(&self, event: &Event) {
match event { match event {
Event::Incoming(Packet::Publish(notification)) => self.handle_publish(notification), Event::Incoming(Packet::Publish(notification)) => self.handle_publish(notification),
Event::Incoming(Packet::ConnAck(ack)) => self.handle_connack(ack), Event::Incoming(Packet::ConnAck(ack)) => self.handle_connack(ack).await,
// Event::Incoming(Packet::SubAck(ack)) => self.handle_suback(ack), // Event::Incoming(Packet::SubAck(ack)) => self.handle_suback(ack),
Event::Outgoing(_) => {} Event::Outgoing(_) => {}
e => debug!(target: "mqtt-exporter::mqtt", "Unhandled event: {:?}", e), e => debug!(target: "mqtt-exporter::mqtt", "Unhandled event: {:?}", e),
@ -228,10 +229,11 @@ impl App {
} }
} }
fn handle_connack(&self, ack: &ConnAck) { async fn handle_connack(&self, ack: &ConnAck) {
match ack.code { match ack.code {
rumqttc::ConnectReturnCode::Success => { rumqttc::ConnectReturnCode::Success => {
info!(target: "mqtt-exporter::mqtt", "Successfully connected to MQTT broker") info!(target: "mqtt-exporter::mqtt", "Successfully connected to MQTT broker");
self.subscribe().await;
} }
code => { code => {
warn!(target: "mqtt-exporter::mqtt", "Non success connection return code {:?}", code) warn!(target: "mqtt-exporter::mqtt", "Non success connection return code {:?}", code)
@ -263,8 +265,9 @@ impl App {
self.parse_metrics(data, &topic, &topic, metric_path, &labels); self.parse_metrics(data, &topic, &topic, metric_path, &labels);
} }
async fn subscribe(&self, client: &AsyncClient) { async fn subscribe(&self) {
match client match self
.client
.subscribe_many(self.config.mqtt_topic.iter().map(|topic| SubscribeFilter { .subscribe_many(self.config.mqtt_topic.iter().map(|topic| SubscribeFilter {
path: topic.to_owned(), path: topic.to_owned(),
qos: QoS::AtLeastOnce, qos: QoS::AtLeastOnce,
@ -280,32 +283,14 @@ impl App {
} }
} }
async fn mqtt_task(self) { async fn mqtt_task(self, mut eventloop: EventLoop) {
let mut opts = MqttOptions::new(
&self.config.mqtt_client_id,
&self.config.mqtt_address,
self.config.mqtt_port,
);
opts.set_keep_alive(Duration::from_secs(self.config.mqtt_keepalive))
.set_max_packet_size(150000, 150000);
if let (Some(username), Some(password)) =
(&self.config.mqtt_username, &self.config.mqtt_password)
{
opts.set_credentials(username, password);
}
let (client, mut eventloop) = AsyncClient::new(opts, 10);
self.subscribe(&client).await;
let mut bo = backoff::ExponentialBackoff::default(); let mut bo = backoff::ExponentialBackoff::default();
loop { loop {
match eventloop.poll().await { match eventloop.poll().await {
Ok(event) => { Ok(event) => {
bo.reset(); bo.reset();
self.handle_event(&event) self.handle_event(&event).await
} }
Err(e) => { Err(e) => {
error!(target: "mqtt-exporter::mqtt", "Connection error: {}", e); error!(target: "mqtt-exporter::mqtt", "Connection error: {}", e);
@ -336,7 +321,7 @@ impl App {
builder.install().unwrap(); builder.install().unwrap();
} }
fn new(config: AppConfig) -> Self { fn new(config: AppConfig, client: AsyncClient) -> Self {
let mut transformers: Vec<Box<dyn Transform>> = Vec::new(); let mut transformers: Vec<Box<dyn Transform>> = Vec::new();
if !config.zwave_topic_prefix.is_empty() { if !config.zwave_topic_prefix.is_empty() {
transformers.push(Box::new(ZwaveNormalize::new(&config.zwave_topic_prefix))) transformers.push(Box::new(ZwaveNormalize::new(&config.zwave_topic_prefix)))
@ -346,7 +331,8 @@ impl App {
&config.esphome_topic_prefixes, &config.esphome_topic_prefixes,
))) )))
} }
if !config.hubitat_topic_prefixes.is_empty() && !config.hubitat_topic_prefixes[0].is_empty() { if !config.hubitat_topic_prefixes.is_empty() && !config.hubitat_topic_prefixes[0].is_empty()
{
transformers.push(Box::new(HubitatNormalize::new( transformers.push(Box::new(HubitatNormalize::new(
&config.hubitat_topic_prefixes, &config.hubitat_topic_prefixes,
))) )))
@ -355,8 +341,10 @@ impl App {
transformers.push(Box::new(Zigbee2MqttAvailability::default())) transformers.push(Box::new(Zigbee2MqttAvailability::default()))
} }
debug!(target: "mqtt-exporter::config", "Transformers enabled: {:?}", transformers); debug!(target: "mqtt-exporter::config", "Transformers enabled: {:?}", transformers);
Self { Self {
config, config,
client,
transformers, transformers,
} }
} }
@ -380,10 +368,25 @@ async fn main() {
) )
.build() .build()
.unwrap(); .unwrap();
let app_config = config.try_deserialize().unwrap(); let app_config: AppConfig = config.try_deserialize().unwrap();
info!(target: "mqtt-exporter::config", "Loaded config: {app_config:?}"); info!(target: "mqtt-exporter::config", "Loaded config: {app_config:?}");
let app = App::new(app_config); let mut opts = MqttOptions::new(
&app_config.mqtt_client_id,
&app_config.mqtt_address,
app_config.mqtt_port,
);
opts.set_keep_alive(Duration::from_secs(app_config.mqtt_keepalive))
.set_max_packet_size(150000, 150000);
if let (Some(username), Some(password)) = (&app_config.mqtt_username, &app_config.mqtt_password)
{
opts.set_credentials(username, password);
}
let (client, mut eventloop) = AsyncClient::new(opts, 10);
let app = App::new(app_config, client);
app.prom_task().await; app.prom_task().await;
app.mqtt_task().await; app.mqtt_task(eventloop).await;
} }