Compare commits

...

3 Commits

Author SHA1 Message Date
d3eab1da12 prs10: improve serial handling 2026-02-04 11:50:27 -08:00
46eedec11e fixup log level handling 2026-02-04 11:49:56 -08:00
30b48f686f improve error handling with fatal!() 2026-02-04 11:49:24 -08:00
3 changed files with 102 additions and 25 deletions

View File

@@ -1,6 +1,14 @@
pub mod sources; pub mod sources;
pub mod targets; pub mod targets;
#[macro_export]
macro_rules! fatal {
($($arg:tt)*) => {{
tracing::error!($($arg)*);
std::process::exit(-1);
}};
}
use async_trait::async_trait; use async_trait::async_trait;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use figment::{ use figment::{

View File

@@ -28,12 +28,29 @@ enum Level {
Error, Error,
} }
impl From<Level> for tracing::Level {
fn from(level: Level) -> Self {
match level {
Level::Debug => tracing::Level::DEBUG,
Level::Info => tracing::Level::INFO,
Level::Warn => tracing::Level::WARN,
Level::Error => tracing::Level::ERROR,
}
}
}
impl Level {
fn level(self) -> tracing::Level {
self.into()
}
}
#[derive(Parser)] #[derive(Parser)]
struct Args { struct Args {
/// TOML configuration to load /// TOML configuration to load
#[arg(short, long, default_value_t = String::from("config.toml"))] #[arg(short, long, default_value_t = String::from("config.toml"))]
config_file: String, config_file: String,
#[arg(value_enum, default_value_t = Level::Warn)] #[arg(value_enum, default_value_t = Level::Info)]
log_level: Level, log_level: Level,
} }
@@ -102,14 +119,16 @@ async fn dummy_consumer(mut chan: ChimemonTargetChannel, cancel: CancellationTok
#[tokio::main(flavor = "current_thread")] #[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error>> { async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args = Args::parse();
tracing_subscriber::fmt() tracing_subscriber::fmt()
.with_env_filter(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::default())) .with_env_filter(
EnvFilter::try_from_default_env()
.unwrap_or(EnvFilter::default().add_directive(args.log_level.level().into())),
)
// .event_format(format::Format::default().pretty()) // .event_format(format::Format::default().pretty())
.with_span_events(FmtSpan::CLOSE) .with_span_events(FmtSpan::CLOSE)
.init(); .init();
let args = Args::parse();
info!("{PROGRAM_NAME} v{VERSION} starting..."); info!("{PROGRAM_NAME} v{VERSION} starting...");
let fig = Figment::new() let fig = Figment::new()
.merge(Config::default()) .merge(Config::default())
@@ -206,6 +225,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
debug!("Task setup complete, tasks: {}", tasks.len()); debug!("Task setup complete, tasks: {}", tasks.len());
ctrlc::set_handler(move || { ctrlc::set_handler(move || {
if shutdown_token.is_cancelled() {
info!("Forced shutdown");
std::process::exit(1);
}
info!("Shutting down"); info!("Shutting down");
shutdown_token.cancel() shutdown_token.cancel()
}) })

View File

@@ -1,11 +1,14 @@
use std::any::type_name; use std::any::type_name;
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait; use async_trait::async_trait;
use bitflags::bitflags; use bitflags::bitflags;
use itertools::Itertools; use itertools::Itertools;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, ReadHalf, WriteHalf}; use tokio::io::{
AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, BufWriter, ReadHalf, WriteHalf,
};
use tokio::select; use tokio::select;
use tokio::sync::OnceCell; use tokio::sync::OnceCell;
use tokio::time::{interval, timeout}; use tokio::time::{interval, timeout};
@@ -15,7 +18,7 @@ use tracing::{debug, debug_span, error, info, instrument, warn};
use crate::{ use crate::{
ChimemonMessage, ChimemonSource, ChimemonSourceChannel, MetricTags, Prs10Config, SourceMetric, ChimemonMessage, ChimemonSource, ChimemonSourceChannel, MetricTags, Prs10Config, SourceMetric,
SourceReport, SourceReportDetails, SourceStatus, SourceReport, SourceReportDetails, SourceStatus, fatal,
}; };
#[derive(Debug)] #[derive(Debug)]
@@ -116,6 +119,7 @@ bitflags! {
const EFC_LOW = (1<<3); const EFC_LOW = (1<<3);
const CAL_VOLTAGE_HIGH = (1<<4); const CAL_VOLTAGE_HIGH = (1<<4);
const CAL_VOLTAGE_LOW = (1<<5); const CAL_VOLTAGE_LOW = (1<<5);
const _ = !0;
} }
} }
@@ -205,19 +209,28 @@ impl TryFrom<&[u8]> for Prs10Status {
.map(|s| str::from_utf8(s).unwrap().parse::<u8>()) .map(|s| str::from_utf8(s).unwrap().parse::<u8>())
.collect_tuple() .collect_tuple()
.ok_or("Not enough parts in ST reply")?; .ok_or("Not enough parts in ST reply")?;
let volt_lamp_flags = volt_lamp_flags?;
let rf_flags = rf_flags?;
let temp_flags = temp_flags?;
let fll_flags = fll_flags?;
let pps_flags = pps_flags?;
let system_flags = system_flags?;
Ok(Self { Ok(Self {
volt_lamp_flags: Prs10PowerLampFlags::from_bits(volt_lamp_flags?) volt_lamp_flags: Prs10PowerLampFlags::from_bits(volt_lamp_flags).ok_or_else(|| {
.ok_or("Invalid bits set ({volt_lamp_flags}) for power/lamp flags")?, format!("Invalid bits set ({volt_lamp_flags}) for power/lamp flags")
rf_flags: Prs10RfFlags::from_bits(rf_flags?) })?,
.ok_or("Invalid bits set ({rf_flags}) for RF flags")?, rf_flags: Prs10RfFlags::from_bits(rf_flags)
temp_flags: Prs10TempFlags::from_bits(temp_flags?) .ok_or_else(|| format!("Invalid bits set ({rf_flags}) for RF flags"))?,
.ok_or("Invalid bits set ({temp_flags}) for temp flags")?, temp_flags: Prs10TempFlags::from_bits(temp_flags)
fll_flags: Prs10FllFlags::from_bits(fll_flags?) .ok_or_else(|| format!("Invalid bits set ({temp_flags}) for temp flags"))?,
.ok_or("Invalid bits set ({fll_flags}) for FLL flags")?, fll_flags: Prs10FllFlags::from_bits(fll_flags)
pps_flags: Prs10PpsFlags::from_bits(pps_flags?) .ok_or_else(|| format!("Invalid bits set ({fll_flags}) for FLL flags"))?,
.ok_or("Invalid bits set ({pps_flags}) for PPS flags")?, pps_flags: Prs10PpsFlags::from_bits(pps_flags)
system_flags: Prs10SystemFlags::from_bits(system_flags?) .ok_or_else(|| format!("Invalid bits set ({pps_flags}) for PPS flags"))?,
.ok_or("Invalid bits set ({system_flags}) for system flags")?, system_flags: Prs10SystemFlags::from_bits(system_flags)
.ok_or_else(|| format!("Invalid bits set ({system_flags}) for system flags"))?,
}) })
} }
} }
@@ -313,7 +326,7 @@ pub struct Prs10Monitor {
name: String, name: String,
config: Prs10Config, config: Prs10Config,
rx: ReadHalf<SerialStream>, rx: ReadHalf<SerialStream>,
tx: WriteHalf<SerialStream>, tx: BufWriter<WriteHalf<SerialStream>>,
info: OnceCell<Prs10Info>, info: OnceCell<Prs10Info>,
} }
@@ -324,8 +337,9 @@ impl Prs10Monitor {
#[instrument(level = "debug", skip_all, fields(cmd = String::from_utf8_lossy(cmd).to_string()))] #[instrument(level = "debug", skip_all, fields(cmd = String::from_utf8_lossy(cmd).to_string()))]
pub async fn cmd_response(&mut self, cmd: &[u8]) -> Result<Vec<u8>, std::io::Error> { pub async fn cmd_response(&mut self, cmd: &[u8]) -> Result<Vec<u8>, std::io::Error> {
self.tx.write_all(cmd).await.unwrap(); self.tx.write_all(cmd).await?;
self.tx.write_u8(b'\r').await.unwrap(); self.tx.write_u8(b'\r').await?;
self.tx.flush().await?;
let mut reader = BufReader::new(&mut self.rx); let mut reader = BufReader::new(&mut self.rx);
let mut buf = Vec::new(); let mut buf = Vec::new();
let read = timeout(self.config.timeout, reader.read_until(b'\r', &mut buf)).await??; let read = timeout(self.config.timeout, reader.read_until(b'\r', &mut buf)).await??;
@@ -476,6 +490,20 @@ impl Prs10Monitor {
}), }),
})) }))
} }
async fn reset_rx_state(&mut self) -> Result<(), Box<dyn std::error::Error>> {
// flush any pending input and potential responses from the receiver side
self.tx.write_u8(b'\r').await?;
self.tx.flush().await?;
let mut discard = vec![];
loop {
match timeout(Duration::from_millis(100), self.rx.read_buf(&mut discard)).await {
Ok(_) => discard.clear(),
Err(_) => break,
}
}
Ok(())
}
} }
#[async_trait] #[async_trait]
@@ -488,15 +516,28 @@ impl ChimemonSource for Prs10Monitor {
.parity(tokio_serial::Parity::None) .parity(tokio_serial::Parity::None)
.stop_bits(tokio_serial::StopBits::One) .stop_bits(tokio_serial::StopBits::One)
.flow_control(tokio_serial::FlowControl::None); .flow_control(tokio_serial::FlowControl::None);
let mut port = SerialStream::open(&builder).expect("Must be able to open serial port"); let mut port = SerialStream::open(&builder).unwrap_or_else(|e| {
port.set_exclusive(true).expect("Can't lock serial port"); fatal!(
"Failed to open serial port `{}` ({})",
config.port,
e.to_string()
)
});
port.set_exclusive(true).unwrap_or_else(|e| {
fatal!(
"Can't lock serial port `{}` ({})",
config.port,
e.to_string()
)
});
info!( info!(
"Opened serial port {}@{}", "Opened serial port {}@{}",
port.name().unwrap(), port.name().unwrap(),
port.baud_rate().unwrap() port.baud_rate().unwrap()
); );
let (rx, tx) = tokio::io::split(port);
let (rx, tx) = tokio::io::split(port);
let tx = BufWriter::new(tx);
Self { Self {
name: name.to_owned(), name: name.to_owned(),
config, config,
@@ -505,10 +546,15 @@ impl ChimemonSource for Prs10Monitor {
info: OnceCell::new(), info: OnceCell::new(),
} }
} }
async fn run(mut self, chan: ChimemonSourceChannel, cancel: CancellationToken) { async fn run(mut self, chan: ChimemonSourceChannel, cancel: CancellationToken) {
info!("PRS10 task started"); info!("PRS10 task started");
if let Err(e) = self.reset_rx_state().await {
error!(error = ?e, "Error clearing PRS10 RX state");
return;
}
if let Err(e) = self.set_info().await { if let Err(e) = self.set_info().await {
error!("Error starting PRS10: {e:?}"); error!(error = ?e, "Error starting PRS10");
return; return;
} }
info!( info!(