Compare commits

..

3 Commits

Author SHA1 Message Date
d3eab1da12 prs10: improve serial handling 2026-02-04 11:50:27 -08:00
46eedec11e fixup log level handling 2026-02-04 11:49:56 -08:00
30b48f686f improve error handling with fatal!() 2026-02-04 11:49:24 -08:00
3 changed files with 102 additions and 25 deletions

View File

@@ -1,6 +1,14 @@
pub mod sources;
pub mod targets;
#[macro_export]
macro_rules! fatal {
($($arg:tt)*) => {{
tracing::error!($($arg)*);
std::process::exit(-1);
}};
}
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use figment::{

View File

@@ -28,12 +28,29 @@ enum Level {
Error,
}
impl From<Level> for tracing::Level {
fn from(level: Level) -> Self {
match level {
Level::Debug => tracing::Level::DEBUG,
Level::Info => tracing::Level::INFO,
Level::Warn => tracing::Level::WARN,
Level::Error => tracing::Level::ERROR,
}
}
}
impl Level {
fn level(self) -> tracing::Level {
self.into()
}
}
#[derive(Parser)]
struct Args {
/// TOML configuration to load
#[arg(short, long, default_value_t = String::from("config.toml"))]
config_file: String,
#[arg(value_enum, default_value_t = Level::Warn)]
#[arg(value_enum, default_value_t = Level::Info)]
log_level: Level,
}
@@ -102,14 +119,16 @@ async fn dummy_consumer(mut chan: ChimemonTargetChannel, cancel: CancellationTok
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args = Args::parse();
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::default()))
.with_env_filter(
EnvFilter::try_from_default_env()
.unwrap_or(EnvFilter::default().add_directive(args.log_level.level().into())),
)
// .event_format(format::Format::default().pretty())
.with_span_events(FmtSpan::CLOSE)
.init();
let args = Args::parse();
info!("{PROGRAM_NAME} v{VERSION} starting...");
let fig = Figment::new()
.merge(Config::default())
@@ -206,6 +225,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
debug!("Task setup complete, tasks: {}", tasks.len());
ctrlc::set_handler(move || {
if shutdown_token.is_cancelled() {
info!("Forced shutdown");
std::process::exit(1);
}
info!("Shutting down");
shutdown_token.cancel()
})

View File

@@ -1,11 +1,14 @@
use std::any::type_name;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use bitflags::bitflags;
use itertools::Itertools;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, ReadHalf, WriteHalf};
use tokio::io::{
AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, BufWriter, ReadHalf, WriteHalf,
};
use tokio::select;
use tokio::sync::OnceCell;
use tokio::time::{interval, timeout};
@@ -15,7 +18,7 @@ use tracing::{debug, debug_span, error, info, instrument, warn};
use crate::{
ChimemonMessage, ChimemonSource, ChimemonSourceChannel, MetricTags, Prs10Config, SourceMetric,
SourceReport, SourceReportDetails, SourceStatus,
SourceReport, SourceReportDetails, SourceStatus, fatal,
};
#[derive(Debug)]
@@ -116,6 +119,7 @@ bitflags! {
const EFC_LOW = (1<<3);
const CAL_VOLTAGE_HIGH = (1<<4);
const CAL_VOLTAGE_LOW = (1<<5);
const _ = !0;
}
}
@@ -205,19 +209,28 @@ impl TryFrom<&[u8]> for Prs10Status {
.map(|s| str::from_utf8(s).unwrap().parse::<u8>())
.collect_tuple()
.ok_or("Not enough parts in ST reply")?;
let volt_lamp_flags = volt_lamp_flags?;
let rf_flags = rf_flags?;
let temp_flags = temp_flags?;
let fll_flags = fll_flags?;
let pps_flags = pps_flags?;
let system_flags = system_flags?;
Ok(Self {
volt_lamp_flags: Prs10PowerLampFlags::from_bits(volt_lamp_flags?)
.ok_or("Invalid bits set ({volt_lamp_flags}) for power/lamp flags")?,
rf_flags: Prs10RfFlags::from_bits(rf_flags?)
.ok_or("Invalid bits set ({rf_flags}) for RF flags")?,
temp_flags: Prs10TempFlags::from_bits(temp_flags?)
.ok_or("Invalid bits set ({temp_flags}) for temp flags")?,
fll_flags: Prs10FllFlags::from_bits(fll_flags?)
.ok_or("Invalid bits set ({fll_flags}) for FLL flags")?,
pps_flags: Prs10PpsFlags::from_bits(pps_flags?)
.ok_or("Invalid bits set ({pps_flags}) for PPS flags")?,
system_flags: Prs10SystemFlags::from_bits(system_flags?)
.ok_or("Invalid bits set ({system_flags}) for system flags")?,
volt_lamp_flags: Prs10PowerLampFlags::from_bits(volt_lamp_flags).ok_or_else(|| {
format!("Invalid bits set ({volt_lamp_flags}) for power/lamp flags")
})?,
rf_flags: Prs10RfFlags::from_bits(rf_flags)
.ok_or_else(|| format!("Invalid bits set ({rf_flags}) for RF flags"))?,
temp_flags: Prs10TempFlags::from_bits(temp_flags)
.ok_or_else(|| format!("Invalid bits set ({temp_flags}) for temp flags"))?,
fll_flags: Prs10FllFlags::from_bits(fll_flags)
.ok_or_else(|| format!("Invalid bits set ({fll_flags}) for FLL flags"))?,
pps_flags: Prs10PpsFlags::from_bits(pps_flags)
.ok_or_else(|| format!("Invalid bits set ({pps_flags}) for PPS flags"))?,
system_flags: Prs10SystemFlags::from_bits(system_flags)
.ok_or_else(|| format!("Invalid bits set ({system_flags}) for system flags"))?,
})
}
}
@@ -313,7 +326,7 @@ pub struct Prs10Monitor {
name: String,
config: Prs10Config,
rx: ReadHalf<SerialStream>,
tx: WriteHalf<SerialStream>,
tx: BufWriter<WriteHalf<SerialStream>>,
info: OnceCell<Prs10Info>,
}
@@ -324,8 +337,9 @@ impl Prs10Monitor {
#[instrument(level = "debug", skip_all, fields(cmd = String::from_utf8_lossy(cmd).to_string()))]
pub async fn cmd_response(&mut self, cmd: &[u8]) -> Result<Vec<u8>, std::io::Error> {
self.tx.write_all(cmd).await.unwrap();
self.tx.write_u8(b'\r').await.unwrap();
self.tx.write_all(cmd).await?;
self.tx.write_u8(b'\r').await?;
self.tx.flush().await?;
let mut reader = BufReader::new(&mut self.rx);
let mut buf = Vec::new();
let read = timeout(self.config.timeout, reader.read_until(b'\r', &mut buf)).await??;
@@ -476,6 +490,20 @@ impl Prs10Monitor {
}),
}))
}
async fn reset_rx_state(&mut self) -> Result<(), Box<dyn std::error::Error>> {
// flush any pending input and potential responses from the receiver side
self.tx.write_u8(b'\r').await?;
self.tx.flush().await?;
let mut discard = vec![];
loop {
match timeout(Duration::from_millis(100), self.rx.read_buf(&mut discard)).await {
Ok(_) => discard.clear(),
Err(_) => break,
}
}
Ok(())
}
}
#[async_trait]
@@ -488,15 +516,28 @@ impl ChimemonSource for Prs10Monitor {
.parity(tokio_serial::Parity::None)
.stop_bits(tokio_serial::StopBits::One)
.flow_control(tokio_serial::FlowControl::None);
let mut port = SerialStream::open(&builder).expect("Must be able to open serial port");
port.set_exclusive(true).expect("Can't lock serial port");
let mut port = SerialStream::open(&builder).unwrap_or_else(|e| {
fatal!(
"Failed to open serial port `{}` ({})",
config.port,
e.to_string()
)
});
port.set_exclusive(true).unwrap_or_else(|e| {
fatal!(
"Can't lock serial port `{}` ({})",
config.port,
e.to_string()
)
});
info!(
"Opened serial port {}@{}",
port.name().unwrap(),
port.baud_rate().unwrap()
);
let (rx, tx) = tokio::io::split(port);
let (rx, tx) = tokio::io::split(port);
let tx = BufWriter::new(tx);
Self {
name: name.to_owned(),
config,
@@ -505,10 +546,15 @@ impl ChimemonSource for Prs10Monitor {
info: OnceCell::new(),
}
}
async fn run(mut self, chan: ChimemonSourceChannel, cancel: CancellationToken) {
info!("PRS10 task started");
if let Err(e) = self.reset_rx_state().await {
error!(error = ?e, "Error clearing PRS10 RX state");
return;
}
if let Err(e) = self.set_info().await {
error!("Error starting PRS10: {e:?}");
error!(error = ?e, "Error starting PRS10");
return;
}
info!(