From 08871a578238dd1a27ebef5336c4470da817c321 Mon Sep 17 00:00:00 2001 From: Keenan Tims Date: Wed, 4 Feb 2026 03:13:39 -0800 Subject: [PATCH] cancellation --- Cargo.lock | 60 ++++++++++++++++++ Cargo.toml | 1 + src/lib.rs | 11 ++-- src/main.rs | 111 ++++++++++++++++++++++----------- src/sources/chrony.rs | 51 +++++++-------- src/sources/gpsd.rs | 6 +- src/sources/hwmon.rs | 77 +++++++++++++---------- src/sources/prs10.rs | 8 ++- src/sources/uccm.rs | 11 +++- src/targets/chrony_refclock.rs | 73 ++++++++++++---------- 10 files changed, 268 insertions(+), 141 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7429acc..2bc2ee9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -177,6 +177,15 @@ version = "2.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "812e12b5285cc515a9c72a5c1d3b6d46a19dac5acfef5265968c166106e31dd3" +[[package]] +name = "block2" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cdeb9d870516001442e364c5220d3574d2da8dc765554b4a617230d33fa58ef5" +dependencies = [ + "objc2", +] + [[package]] name = "bumpalo" version = "3.19.1" @@ -236,6 +245,7 @@ dependencies = [ "chrono", "chrony-candm", "clap", + "ctrlc", "figment", "futures", "gethostname", @@ -393,6 +403,17 @@ dependencies = [ "memchr", ] +[[package]] +name = "ctrlc" +version = "3.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73736a89c4aff73035ba2ed2e565061954da00d4970fc9ac25dcc85a2a20d790" +dependencies = [ + "dispatch2", + "nix 0.30.1", + "windows-sys 0.61.2", +] + [[package]] name = "darling" version = "0.21.3" @@ -448,6 +469,18 @@ dependencies = [ "serde_core", ] +[[package]] +name = "dispatch2" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89a09f22a6c6069a18470eb92d2298acf25463f14256d24778e1230d789a2aec" +dependencies = [ + "bitflags 2.10.0", + "block2", + "libc", + "objc2", +] + [[package]] name = "displaydoc" version = "0.2.5" @@ -1274,6 +1307,18 @@ dependencies = [ "libc", ] +[[package]] +name = "nix" +version = "0.30.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74523f3a35e05aba87a1d978330aef40f67b0304ac79c1c00b294c9830543db6" +dependencies = [ + "bitflags 2.10.0", + "cfg-if", + "cfg_aliases", + "libc", +] + [[package]] name = "nom" version = "7.1.3" @@ -1339,6 +1384,21 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "objc2" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7c2599ce0ec54857b29ce62166b0ed9b4f6f1a70ccc9a71165b6154caca8c05" +dependencies = [ + "objc2-encode", +] + +[[package]] +name = "objc2-encode" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef25abbcd74fb2609453eb695bd2f860d389e457f67dc17cafc8b8cbc89d0c33" + [[package]] name = "once_cell" version = "1.21.3" diff --git a/Cargo.toml b/Cargo.toml index 73867d1..f40fd2b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,6 +35,7 @@ bitflags = "2.10.0" influxdb2 = "0.3.9" chrono = "0.4.43" serde_with = "3.16.1" +ctrlc = "3.5.1" [dependencies.chrony-candm] git = "https://github.com/aws/chrony-candm" diff --git a/src/lib.rs b/src/lib.rs index 39d02f8..75225a4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -14,6 +14,7 @@ use influxdb2::models::DataPoint; use serde_derive::{Deserialize, Serialize}; use serde_with::{DurationSeconds, serde_as}; use tokio::sync::broadcast::*; +use tokio_util::sync::CancellationToken; use std::{fmt::Debug, path::Path, sync::Arc}; @@ -89,14 +90,14 @@ impl Default for ChronySockConfig { } } -#[derive(Serialize, Deserialize, Clone)] +#[derive(Serialize, Deserialize, Clone, Debug)] pub struct HwmonSensorConfig { - pub name: String, + pub device: String, pub sensor: String, } #[serde_as] -#[derive(Serialize, Deserialize, Clone)] +#[derive(Serialize, Deserialize, Clone, Debug)] #[serde(default)] pub struct HwmonConfig { pub enabled: bool, @@ -354,10 +355,10 @@ pub type ChimemonTargetChannel = Receiver; pub trait ChimemonSource { type Config; fn new(name: &str, config: Self::Config) -> Self; - async fn run(self, chan: ChimemonSourceChannel); + async fn run(self, chan: ChimemonSourceChannel, cancel: CancellationToken); } #[async_trait] pub trait ChimemonTarget { - async fn run(self, chan: ChimemonTargetChannel); + async fn run(self, chan: ChimemonTargetChannel, cancel: CancellationToken); } diff --git a/src/main.rs b/src/main.rs index 4dfda5a..d334bf2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,7 +5,8 @@ use figment::{ }; use futures::future::join_all; use std::path::Path; -use tokio::{sync::broadcast, task::JoinHandle}; +use tokio::{select, sync::broadcast, task::JoinHandle}; +use tokio_util::sync::CancellationToken; use tracing::{Instrument, debug, error, info, info_span, warn}; use tracing_subscriber::{self, EnvFilter, fmt::format::FmtSpan}; @@ -36,43 +37,69 @@ struct Args { log_level: Level, } -fn run_source(config: NamedSourceConfig, chan: ChimemonSourceChannel) -> Option> { +fn run_source( + config: NamedSourceConfig, + chan: ChimemonSourceChannel, + shutdown: CancellationToken, +) -> Option> { let NamedSourceConfig { name, source } = config; match source { SourceConfig::Chrony(source_config) if source_config.enabled => { let c = ChronyClient::new(&name, source_config); Some(tokio::spawn( - c.run(chan).instrument(info_span!("chrony-task")), + c.run(chan, shutdown).instrument(info_span!("chrony-task")), )) } SourceConfig::Gpsd(source_config) if source_config.enabled => { let c = GpsdSource::new(&name, source_config); Some(tokio::spawn( - c.run(chan).instrument(info_span!("gpsd-task")), + c.run(chan, shutdown).instrument(info_span!("gpsd-task")), )) } SourceConfig::Hwmon(source_config) if source_config.enabled => { let c = HwmonSource::new(&name, source_config); Some(tokio::spawn( - c.run(chan).instrument(info_span!("hwmon-task")), + c.run(chan, shutdown).instrument(info_span!("hwmon-task")), )) } SourceConfig::Prs10(source_config) if source_config.enabled => { let c = Prs10Monitor::new(&name, source_config); Some(tokio::spawn( - c.run(chan).instrument(info_span!("prs10-task")), + c.run(chan, shutdown).instrument(info_span!("prs10-task")), )) } SourceConfig::Uccm(source_config) if source_config.enabled => { let c = UCCMMonitor::new(&name, source_config); Some(tokio::spawn( - c.run(chan.clone()).instrument(info_span!("uccm-task")), + c.run(chan, shutdown).instrument(info_span!("uccm-task")), )) } _ => None, } } +async fn dummy_consumer(mut chan: ChimemonTargetChannel, cancel: CancellationToken) { + info!("Dummy receiver task started"); + loop { + select! { + _ = cancel.cancelled() => { + return + }, + Ok(m) = chan.recv() => { + match m { + ChimemonMessage::SourceReport(report) => { + let metrics = report.details.to_metrics(); + info!("instance: {} metrics: {metrics:?}", report.name); + } + msg => { + info!("message: {msg:?}"); + } + } + } + } + } +} + #[tokio::main(flavor = "current_thread")] async fn main() -> Result<(), Box> { tracing_subscriber::fmt() @@ -94,6 +121,8 @@ async fn main() -> Result<(), Box> { let (tx, _) = broadcast::channel(16); let sourcechan: ChimemonSourceChannel = tx; + let shutdown_token = CancellationToken::new(); + if config.influxdb.enabled { info!( "Connecting to influxdb {} org: {} using token", @@ -106,29 +135,44 @@ async fn main() -> Result<(), Box> { &config.influxdb.token, ); - let mut influxrx = sourcechan.subscribe(); + let mut influx_rx = sourcechan.subscribe(); + let influx_cancel = shutdown_token.clone(); tasks.push(tokio::spawn( async move { let stream = async_stream::stream! { - while let Ok(msg) = influxrx.recv().await { - match msg { ChimemonMessage::DataPoint(dp) => { - yield dp - }, ChimemonMessage::DataPoints(dps) => { - for p in dps { - yield p - } - }, _ => {} - } } + while let Ok(msg) = influx_rx.recv().await { + match msg { + ChimemonMessage::DataPoint(dp) => { + yield dp + }, + ChimemonMessage::DataPoints(dps) => { + for p in dps { + yield p + } + }, + _ => {} + } + } }; - influx.write(&config.influxdb.bucket, stream).await.unwrap(); + select! { + _ = influx_cancel.cancelled() => { + return + }, + res = influx.write(&config.influxdb.bucket, stream) => { + match res { + Err(e) => error!("Error writing to influx: {}", e.to_string()), + _ => warn!("Unexpectedly shutting down influx task"), + } + }, + } } .instrument(info_span!("influx-task")), )); } for source in config.sources { - if let Some(task) = run_source(source, sourcechan.clone()) { + if let Some(task) = run_source(source, sourcechan.clone(), shutdown_token.clone()) { tasks.push(task) } } @@ -141,7 +185,7 @@ async fn main() -> Result<(), Box> { if let Some(chrony_refclock) = chrony_refclock { tasks.push(tokio::spawn( chrony_refclock - .run(sourcechan.subscribe()) + .run(sourcechan.subscribe(), shutdown_token.clone()) .instrument(info_span!("chrony-refclock-task")), )); }; @@ -153,29 +197,20 @@ async fn main() -> Result<(), Box> { if sourcechan.receiver_count() == 0 { warn!("No consumers configured, events will be discarded"); - let mut chan = sourcechan.subscribe(); - // spawn a dummy task to reap the channel and keep the process alive tasks.push(tokio::spawn( - async move { - loop { - while let Ok(m) = chan.recv().await { - info!("received {m:?}"); - match m { - ChimemonMessage::SourceReport(report) => { - let metrics = report.details.to_metrics(); - info!("metrics: {metrics:?}"); - } - _ => {} - } - } - } - } - .instrument(info_span!("dummy-receiver-task")), - )) + dummy_consumer(sourcechan.subscribe(), shutdown_token.clone()) + .instrument(info_span!("dummy-consumer-task")), + )); } debug!("Task setup complete, tasks: {}", tasks.len()); + ctrlc::set_handler(move || { + info!("Shutting down"); + shutdown_token.cancel() + }) + .unwrap(); + join_all(tasks).await; Ok(()) diff --git a/src/sources/chrony.rs b/src/sources/chrony.rs index 7c4769e..76fc343 100644 --- a/src/sources/chrony.rs +++ b/src/sources/chrony.rs @@ -5,7 +5,9 @@ use async_trait::async_trait; use chrony_candm::reply::{self, ReplyBody, SourceMode}; use chrony_candm::request::{self, RequestBody}; use chrony_candm::{ClientOptions, blocking_query}; -use tokio::join; +use futures::future::join; +use tokio::{join, select}; +use tokio_util::sync::CancellationToken; use tracing::{info, warn}; use crate::{ @@ -297,38 +299,33 @@ impl ChimemonSource for ChronyClient { config, } } - async fn run(self, chan: ChimemonSourceChannel) { + async fn run(self, chan: ChimemonSourceChannel, cancel: CancellationToken) { info!("Chrony task started"); let mut t_interval = tokio::time::interval(self.config.tracking_interval); let mut s_interval = tokio::time::interval(self.config.sources_interval); - - let t_future = async { - let lchan = chan.clone(); - loop { - t_interval.tick().await; - - match self.tracking_poll(&lchan).await { - Ok(_) => (), - Err(e) => { - warn!("Error in chrony task: {}", e.to_string()); + loop { + select! { + _ = cancel.cancelled() => { + return + }, + _ = t_interval.tick() => { + match self.tracking_poll(&chan).await { + Ok(_) => (), + Err(e) => { + warn!("Error in chrony task: {}", e.to_string()); + } + } + }, + _ = s_interval.tick() => { + match self.sources_poll(&chan).await { + Ok(_) => (), + Err(e) => { + warn!("Error in chrony task: {}", e.to_string()); + } } } } - }; - let s_future = async { - let lchan = chan.clone(); - loop { - s_interval.tick().await; - - match self.sources_poll(&lchan).await { - Ok(_) => (), - Err(e) => { - warn!("Error in chrony task: {}", e.to_string()); - } - } - } - }; - join!(t_future, s_future); + } } } diff --git a/src/sources/gpsd.rs b/src/sources/gpsd.rs index 9f272c5..b027dcf 100644 --- a/src/sources/gpsd.rs +++ b/src/sources/gpsd.rs @@ -14,6 +14,7 @@ use serde_json; use tokio::net::{TcpStream, ToSocketAddrs, lookup_host}; use tokio::time::{interval, timeout}; use tokio_util::codec::{Framed, LinesCodec}; +use tokio_util::sync::CancellationToken; use tracing::{debug, debug_span, info, instrument, warn}; use crate::{ @@ -170,7 +171,7 @@ impl ChimemonSource for GpsdSource { .unwrap(); rt.block_on(Self::inner_new(name, config)).unwrap() } - async fn run(mut self, mut chan: ChimemonSourceChannel) { + async fn run(mut self, mut chan: ChimemonSourceChannel, cancel: CancellationToken) { info!("gpsd task started"); self.conn.conn().await.unwrap(); let mut ticker = interval(self.config.interval); @@ -184,6 +185,9 @@ impl ChimemonSource for GpsdSource { loop { let framed = self.conn.framed.as_mut().expect("must be connected"); tokio::select! { + _ = cancel.cancelled() => { + return + }, _ = ticker.tick() => { self.send_status(&mut chan).await }, diff --git a/src/sources/hwmon.rs b/src/sources/hwmon.rs index 943bbc8..a777891 100644 --- a/src/sources/hwmon.rs +++ b/src/sources/hwmon.rs @@ -1,6 +1,8 @@ use std::{fs::File, io::Read, path::PathBuf, sync::Arc}; use async_trait::async_trait; +use tokio::select; +use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, warn}; use crate::{ @@ -106,50 +108,59 @@ impl ChimemonSource for HwmonSource { let sensors = config .sensors .iter() - .map(|(k, v)| Arc::new(HwmonSensor::new(k, &v.name, &v.sensor))) + .map(|(k, v)| Arc::new(HwmonSensor::new(k, &v.device, &v.sensor))) .collect(); - + debug!("config: {config:?}"); HwmonSource { name: name.to_owned(), config, sensors, } } - async fn run(self, chan: ChimemonSourceChannel) { + async fn run(self, chan: ChimemonSourceChannel, cancel: CancellationToken) { info!("hwmon task started"); let mut interval = tokio::time::interval(self.config.interval); loop { - interval.tick().await; - let mut values = Vec::new(); - for s in &self.sensors { - if let Ok(sensor_val) = HwmonSource::get_raw_value(s).await { - debug!( - "hwmon {} raw value {}", - s.value_path.to_string_lossy(), - sensor_val - ); - if let Ok(parsed) = sensor_val.trim().parse::() { - values.push((s.clone(), parsed)); - } else { - error!( - "Unable to parse sensor value {sensor_val} at {}", - s.value_path.to_string_lossy() - ); + select! { + _ = cancel.cancelled() => { return; }, + _ = interval.tick() => { + let mut values = Vec::new(); + for s in &self.sensors { + debug!("Sensor {s:?}"); + match HwmonSource::get_raw_value(s).await { + Ok(sensor_val) => { + debug!( + "hwmon {} raw value {}", + s.value_path.to_string_lossy(), + sensor_val + ); + if let Ok(parsed) = sensor_val.trim().parse::() { + values.push((s.clone(), parsed)); + } else { + error!( + "Unable to parse sensor value {sensor_val} at {}", + s.value_path.to_string_lossy() + ); + } + }, + Err(e) => { + error!("Unable to get hwmon sensor value ({})", e.to_string()); + continue + } + } + } + let report = SourceReport { + name: self.name.clone(), + status: SourceStatus::Healthy, + details: Arc::new(HwmonReport { values }), + }; + info!("Writing hwmon data"); + match chan.send(report.into()) { + Ok(_) => {} + Err(e) => { + warn!("Unable to send to message channel ({e})") + } } - } else { - error!("Unable to get hwmon sensor value"); - } - } - let report = SourceReport { - name: self.name.clone(), - status: SourceStatus::Healthy, - details: Arc::new(HwmonReport { values }), - }; - info!("Writing hwmon data"); - match chan.send(report.into()) { - Ok(_) => {} - Err(e) => { - warn!("Unable to send to message channel ({e})") } } } diff --git a/src/sources/prs10.rs b/src/sources/prs10.rs index a6e4a1f..cf1f264 100644 --- a/src/sources/prs10.rs +++ b/src/sources/prs10.rs @@ -10,6 +10,7 @@ use tokio::select; use tokio::sync::OnceCell; use tokio::time::{interval, timeout}; use tokio_serial::{SerialPort, SerialStream}; +use tokio_util::sync::CancellationToken; use tracing::{debug, debug_span, error, info, instrument, warn}; use crate::{ @@ -504,8 +505,8 @@ impl ChimemonSource for Prs10Monitor { info: OnceCell::new(), } } - async fn run(mut self, chan: ChimemonSourceChannel) { - info!("PRS10 task starting"); + async fn run(mut self, chan: ChimemonSourceChannel, cancel: CancellationToken) { + info!("PRS10 task started"); if let Err(e) = self.set_info().await { error!("Error starting PRS10: {e:?}"); return; @@ -522,6 +523,9 @@ impl ChimemonSource for Prs10Monitor { loop { let msg = select! { + _ = cancel.cancelled() => { + return; + }, _ = status_timer.tick() => { self.status_poll().await }, diff --git a/src/sources/uccm.rs b/src/sources/uccm.rs index 37f1568..dd9f412 100644 --- a/src/sources/uccm.rs +++ b/src/sources/uccm.rs @@ -8,14 +8,16 @@ use byteorder::{BigEndian, ReadBytesExt}; use bytes::{Buf, BytesMut}; use chrono::{DateTime, Duration, NaiveDateTime, Utc}; use figment::value::Map; +use futures::future::join; use influxdb2::models::DataPoint; use influxdb2::models::data_point::DataPointBuilder; use itertools::Itertools; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, ReadHalf, WriteHalf}; -use tokio::join; use tokio::sync::Mutex; use tokio::time::sleep; +use tokio::{join, select}; use tokio_serial::{SerialPort, SerialStream}; +use tokio_util::sync::CancellationToken; use tracing::{debug, info, warn}; use crate::{ @@ -596,7 +598,7 @@ impl ChimemonSource for UCCMMonitor { } } - async fn run(mut self, chan: ChimemonSourceChannel) { + async fn run(mut self, chan: ChimemonSourceChannel, cancel: CancellationToken) { info!("UCCM task starting"); if self.get_info().await.is_err() { warn!("Error starting UCCM"); @@ -622,6 +624,11 @@ impl ChimemonSource for UCCMMonitor { // wfut.await; // } // }); + // + select! { + _ = cancel.cancelled() => { return }, + _ = rx_handle => { return } + }; join!(rx_handle).0.unwrap(); } diff --git a/src/targets/chrony_refclock.rs b/src/targets/chrony_refclock.rs index e828b0d..638ccca 100644 --- a/src/targets/chrony_refclock.rs +++ b/src/targets/chrony_refclock.rs @@ -4,7 +4,9 @@ use std::path::PathBuf; use async_trait::async_trait; use libc::{c_double, c_int, timeval}; -use tracing::debug; +use tokio::select; +use tokio_util::sync::CancellationToken; +use tracing::{debug, warn}; use crate::{ChimemonMessage, ChimemonTarget, ChimemonTargetChannel, ChronySockConfig}; @@ -35,41 +37,46 @@ impl ChronySockServer { #[async_trait] impl ChimemonTarget for ChronySockServer { - async fn run(mut self, mut chan: ChimemonTargetChannel) { + async fn run(mut self, mut chan: ChimemonTargetChannel, cancel: CancellationToken) { loop { - let msg = chan.recv().await.unwrap(); - match msg { - ChimemonMessage::TimeReport(tr) => { - if tr.valid { - { - let frame = ChronyTimeReport { - tv: timeval { - tv_sec: TryInto::::try_into( - tr.system_time.timestamp(), - ) - .unwrap(), - tv_usec: tr.system_time.timestamp_subsec_micros() - as libc::suseconds_t, - }, - offset: tr.offset.num_nanoseconds().unwrap() as f64 / 1e9, - leap: if tr.leap_flag { 1 } else { 0 }, - pulse: 0, - _pad: 0, - magic: CHRONY_MAGIC, - }; - let bs = unsafe { - std::slice::from_raw_parts( - (&frame as *const ChronyTimeReport) as *const u8, - mem::size_of::(), - ) - }; - debug!("Sending to chrony sock {:#?}", frame); - let sock = UnixDatagram::unbound().unwrap(); - sock.send_to(bs, &self.sock_path).unwrap(); - } + select! { + _ = cancel.cancelled() => { return } + msg = chan.recv() => { + match msg { + Ok(ChimemonMessage::TimeReport(tr)) => { + if tr.valid { + { + let frame = ChronyTimeReport { + tv: timeval { + tv_sec: TryInto::::try_into( + tr.system_time.timestamp(), + ) + .unwrap(), + tv_usec: tr.system_time.timestamp_subsec_micros() + as libc::suseconds_t, + }, + offset: tr.offset.num_nanoseconds().unwrap() as f64 / 1e9, + leap: if tr.leap_flag { 1 } else { 0 }, + pulse: 0, + _pad: 0, + magic: CHRONY_MAGIC, + }; + let bs = unsafe { + std::slice::from_raw_parts( + (&frame as *const ChronyTimeReport) as *const u8, + mem::size_of::(), + ) + }; + debug!("Sending to chrony sock {:#?}", frame); + let sock = UnixDatagram::unbound().unwrap(); + sock.send_to(bs, &self.sock_path).unwrap(); + } + } + }, + Err(e) => warn!("Error receiving from channel: {}", e.to_string()), + _ => continue, } } - _ => continue, } } }