cancellation

This commit is contained in:
2026-02-04 03:13:39 -08:00
parent 156df9ae86
commit 8fb0315153
9 changed files with 199 additions and 95 deletions

60
Cargo.lock generated
View File

@@ -177,6 +177,15 @@ version = "2.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "812e12b5285cc515a9c72a5c1d3b6d46a19dac5acfef5265968c166106e31dd3"
[[package]]
name = "block2"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cdeb9d870516001442e364c5220d3574d2da8dc765554b4a617230d33fa58ef5"
dependencies = [
"objc2",
]
[[package]]
name = "bumpalo"
version = "3.19.1"
@@ -236,6 +245,7 @@ dependencies = [
"chrono",
"chrony-candm",
"clap",
"ctrlc",
"figment",
"futures",
"gethostname",
@@ -393,6 +403,17 @@ dependencies = [
"memchr",
]
[[package]]
name = "ctrlc"
version = "3.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73736a89c4aff73035ba2ed2e565061954da00d4970fc9ac25dcc85a2a20d790"
dependencies = [
"dispatch2",
"nix 0.30.1",
"windows-sys 0.61.2",
]
[[package]]
name = "darling"
version = "0.21.3"
@@ -448,6 +469,18 @@ dependencies = [
"serde_core",
]
[[package]]
name = "dispatch2"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89a09f22a6c6069a18470eb92d2298acf25463f14256d24778e1230d789a2aec"
dependencies = [
"bitflags 2.10.0",
"block2",
"libc",
"objc2",
]
[[package]]
name = "displaydoc"
version = "0.2.5"
@@ -1274,6 +1307,18 @@ dependencies = [
"libc",
]
[[package]]
name = "nix"
version = "0.30.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "74523f3a35e05aba87a1d978330aef40f67b0304ac79c1c00b294c9830543db6"
dependencies = [
"bitflags 2.10.0",
"cfg-if",
"cfg_aliases",
"libc",
]
[[package]]
name = "nom"
version = "7.1.3"
@@ -1339,6 +1384,21 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "objc2"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b7c2599ce0ec54857b29ce62166b0ed9b4f6f1a70ccc9a71165b6154caca8c05"
dependencies = [
"objc2-encode",
]
[[package]]
name = "objc2-encode"
version = "4.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ef25abbcd74fb2609453eb695bd2f860d389e457f67dc17cafc8b8cbc89d0c33"
[[package]]
name = "once_cell"
version = "1.21.3"

View File

@@ -35,6 +35,7 @@ bitflags = "2.10.0"
influxdb2 = "0.3.9"
chrono = "0.4.43"
serde_with = "3.16.1"
ctrlc = "3.5.1"
[dependencies.chrony-candm]
git = "https://github.com/aws/chrony-candm"

View File

@@ -14,6 +14,7 @@ use influxdb2::models::DataPoint;
use serde_derive::{Deserialize, Serialize};
use serde_with::{DurationSeconds, serde_as};
use tokio::sync::broadcast::*;
use tokio_util::sync::CancellationToken;
use std::{fmt::Debug, path::Path, sync::Arc};
@@ -89,14 +90,14 @@ impl Default for ChronySockConfig {
}
}
#[derive(Serialize, Deserialize, Clone)]
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct HwmonSensorConfig {
pub name: String,
pub device: String,
pub sensor: String,
}
#[serde_as]
#[derive(Serialize, Deserialize, Clone)]
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(default)]
pub struct HwmonConfig {
pub enabled: bool,
@@ -354,7 +355,7 @@ pub type ChimemonTargetChannel = Receiver<ChimemonMessage>;
pub trait ChimemonSource {
type Config;
fn new(name: &str, config: Self::Config) -> Self;
async fn run(self, chan: ChimemonSourceChannel);
async fn run(self, chan: ChimemonSourceChannel, cancel: CancellationToken);
}
#[async_trait]

View File

@@ -5,7 +5,8 @@ use figment::{
};
use futures::future::join_all;
use std::path::Path;
use tokio::{sync::broadcast, task::JoinHandle};
use tokio::{select, sync::broadcast, task::JoinHandle};
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, debug, error, info, info_span, warn};
use tracing_subscriber::{self, EnvFilter, fmt::format::FmtSpan};
@@ -36,43 +37,69 @@ struct Args {
log_level: Level,
}
fn run_source(config: NamedSourceConfig, chan: ChimemonSourceChannel) -> Option<JoinHandle<()>> {
fn run_source(
config: NamedSourceConfig,
chan: ChimemonSourceChannel,
shutdown: CancellationToken,
) -> Option<JoinHandle<()>> {
let NamedSourceConfig { name, source } = config;
match source {
SourceConfig::Chrony(source_config) if source_config.enabled => {
let c = ChronyClient::new(&name, source_config);
Some(tokio::spawn(
c.run(chan).instrument(info_span!("chrony-task")),
c.run(chan, shutdown).instrument(info_span!("chrony-task")),
))
}
SourceConfig::Gpsd(source_config) if source_config.enabled => {
let c = GpsdSource::new(&name, source_config);
Some(tokio::spawn(
c.run(chan).instrument(info_span!("gpsd-task")),
c.run(chan, shutdown).instrument(info_span!("gpsd-task")),
))
}
SourceConfig::Hwmon(source_config) if source_config.enabled => {
let c = HwmonSource::new(&name, source_config);
Some(tokio::spawn(
c.run(chan).instrument(info_span!("hwmon-task")),
c.run(chan, shutdown).instrument(info_span!("hwmon-task")),
))
}
SourceConfig::Prs10(source_config) if source_config.enabled => {
let c = Prs10Monitor::new(&name, source_config);
Some(tokio::spawn(
c.run(chan).instrument(info_span!("prs10-task")),
c.run(chan, shutdown).instrument(info_span!("prs10-task")),
))
}
SourceConfig::Uccm(source_config) if source_config.enabled => {
let c = UCCMMonitor::new(&name, source_config);
Some(tokio::spawn(
c.run(chan.clone()).instrument(info_span!("uccm-task")),
c.run(chan, shutdown).instrument(info_span!("uccm-task")),
))
}
_ => None,
}
}
async fn dummy_consumer(mut chan: ChimemonTargetChannel, cancel: CancellationToken) {
info!("Dummy receiver task started");
loop {
select! {
_ = cancel.cancelled() => {
return
},
Ok(m) = chan.recv() => {
match m {
ChimemonMessage::SourceReport(report) => {
let metrics = report.details.to_metrics();
info!("instance: {} metrics: {metrics:?}", report.name);
}
msg => {
info!("message: {msg:?}");
}
}
}
}
}
}
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt()
@@ -94,6 +121,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (tx, _) = broadcast::channel(16);
let sourcechan: ChimemonSourceChannel = tx;
let shutdown_token = CancellationToken::new();
if config.influxdb.enabled {
info!(
"Connecting to influxdb {} org: {} using token",
@@ -128,7 +157,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
for source in config.sources {
if let Some(task) = run_source(source, sourcechan.clone()) {
if let Some(task) = run_source(source, sourcechan.clone(), shutdown_token.clone()) {
tasks.push(task)
}
}
@@ -153,29 +182,19 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
if sourcechan.receiver_count() == 0 {
warn!("No consumers configured, events will be discarded");
let mut chan = sourcechan.subscribe();
// spawn a dummy task to reap the channel and keep the process alive
tasks.push(tokio::spawn(
async move {
loop {
while let Ok(m) = chan.recv().await {
info!("received {m:?}");
match m {
ChimemonMessage::SourceReport(report) => {
let metrics = report.details.to_metrics();
info!("metrics: {metrics:?}");
}
_ => {}
}
}
}
}
.instrument(info_span!("dummy-receiver-task")),
))
dummy_consumer(sourcechan.subscribe(), shutdown_token.clone())
.instrument(info_span!("dummy-consumer-task")),
));
}
debug!("Task setup complete, tasks: {}", tasks.len());
ctrlc::set_handler(move || {
info!("Shutting down");
shutdown_token.cancel()
});
join_all(tasks).await;
Ok(())

View File

@@ -5,7 +5,9 @@ use async_trait::async_trait;
use chrony_candm::reply::{self, ReplyBody, SourceMode};
use chrony_candm::request::{self, RequestBody};
use chrony_candm::{ClientOptions, blocking_query};
use tokio::join;
use futures::future::join;
use tokio::{join, select};
use tokio_util::sync::CancellationToken;
use tracing::{info, warn};
use crate::{
@@ -297,38 +299,33 @@ impl ChimemonSource for ChronyClient {
config,
}
}
async fn run(self, chan: ChimemonSourceChannel) {
async fn run(self, chan: ChimemonSourceChannel, cancel: CancellationToken) {
info!("Chrony task started");
let mut t_interval = tokio::time::interval(self.config.tracking_interval);
let mut s_interval = tokio::time::interval(self.config.sources_interval);
let t_future = async {
let lchan = chan.clone();
loop {
t_interval.tick().await;
match self.tracking_poll(&lchan).await {
Ok(_) => (),
Err(e) => {
warn!("Error in chrony task: {}", e.to_string());
loop {
select! {
_ = cancel.cancelled() => {
return
},
_ = t_interval.tick() => {
match self.tracking_poll(&chan).await {
Ok(_) => (),
Err(e) => {
warn!("Error in chrony task: {}", e.to_string());
}
}
},
_ = s_interval.tick() => {
match self.sources_poll(&chan).await {
Ok(_) => (),
Err(e) => {
warn!("Error in chrony task: {}", e.to_string());
}
}
}
}
};
let s_future = async {
let lchan = chan.clone();
loop {
s_interval.tick().await;
match self.sources_poll(&lchan).await {
Ok(_) => (),
Err(e) => {
warn!("Error in chrony task: {}", e.to_string());
}
}
}
};
join!(t_future, s_future);
}
}
}

View File

@@ -14,6 +14,7 @@ use serde_json;
use tokio::net::{TcpStream, ToSocketAddrs, lookup_host};
use tokio::time::{interval, timeout};
use tokio_util::codec::{Framed, LinesCodec};
use tokio_util::sync::CancellationToken;
use tracing::{debug, debug_span, info, instrument, warn};
use crate::{
@@ -170,7 +171,7 @@ impl ChimemonSource for GpsdSource {
.unwrap();
rt.block_on(Self::inner_new(name, config)).unwrap()
}
async fn run(mut self, mut chan: ChimemonSourceChannel) {
async fn run(mut self, mut chan: ChimemonSourceChannel, cancel: CancellationToken) {
info!("gpsd task started");
self.conn.conn().await.unwrap();
let mut ticker = interval(self.config.interval);
@@ -184,6 +185,9 @@ impl ChimemonSource for GpsdSource {
loop {
let framed = self.conn.framed.as_mut().expect("must be connected");
tokio::select! {
_ = cancel.cancelled() => {
return
},
_ = ticker.tick() => {
self.send_status(&mut chan).await
},

View File

@@ -1,6 +1,8 @@
use std::{fs::File, io::Read, path::PathBuf, sync::Arc};
use async_trait::async_trait;
use tokio::select;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
use crate::{
@@ -106,50 +108,59 @@ impl ChimemonSource for HwmonSource {
let sensors = config
.sensors
.iter()
.map(|(k, v)| Arc::new(HwmonSensor::new(k, &v.name, &v.sensor)))
.map(|(k, v)| Arc::new(HwmonSensor::new(k, &v.device, &v.sensor)))
.collect();
debug!("config: {config:?}");
HwmonSource {
name: name.to_owned(),
config,
sensors,
}
}
async fn run(self, chan: ChimemonSourceChannel) {
async fn run(self, chan: ChimemonSourceChannel, cancel: CancellationToken) {
info!("hwmon task started");
let mut interval = tokio::time::interval(self.config.interval);
loop {
interval.tick().await;
let mut values = Vec::new();
for s in &self.sensors {
if let Ok(sensor_val) = HwmonSource::get_raw_value(s).await {
debug!(
"hwmon {} raw value {}",
s.value_path.to_string_lossy(),
sensor_val
);
if let Ok(parsed) = sensor_val.trim().parse::<f64>() {
values.push((s.clone(), parsed));
} else {
error!(
"Unable to parse sensor value {sensor_val} at {}",
s.value_path.to_string_lossy()
);
select! {
_ = cancel.cancelled() => { return; },
_ = interval.tick() => {
let mut values = Vec::new();
for s in &self.sensors {
debug!("Sensor {s:?}");
match HwmonSource::get_raw_value(s).await {
Ok(sensor_val) => {
debug!(
"hwmon {} raw value {}",
s.value_path.to_string_lossy(),
sensor_val
);
if let Ok(parsed) = sensor_val.trim().parse::<f64>() {
values.push((s.clone(), parsed));
} else {
error!(
"Unable to parse sensor value {sensor_val} at {}",
s.value_path.to_string_lossy()
);
}
},
Err(e) => {
error!("Unable to get hwmon sensor value ({})", e.to_string());
continue
}
}
}
let report = SourceReport {
name: self.name.clone(),
status: SourceStatus::Healthy,
details: Arc::new(HwmonReport { values }),
};
info!("Writing hwmon data");
match chan.send(report.into()) {
Ok(_) => {}
Err(e) => {
warn!("Unable to send to message channel ({e})")
}
}
} else {
error!("Unable to get hwmon sensor value");
}
}
let report = SourceReport {
name: self.name.clone(),
status: SourceStatus::Healthy,
details: Arc::new(HwmonReport { values }),
};
info!("Writing hwmon data");
match chan.send(report.into()) {
Ok(_) => {}
Err(e) => {
warn!("Unable to send to message channel ({e})")
}
}
}

View File

@@ -10,6 +10,7 @@ use tokio::select;
use tokio::sync::OnceCell;
use tokio::time::{interval, timeout};
use tokio_serial::{SerialPort, SerialStream};
use tokio_util::sync::CancellationToken;
use tracing::{debug, debug_span, error, info, instrument, warn};
use crate::{
@@ -504,8 +505,8 @@ impl ChimemonSource for Prs10Monitor {
info: OnceCell::new(),
}
}
async fn run(mut self, chan: ChimemonSourceChannel) {
info!("PRS10 task starting");
async fn run(mut self, chan: ChimemonSourceChannel, cancel: CancellationToken) {
info!("PRS10 task started");
if let Err(e) = self.set_info().await {
error!("Error starting PRS10: {e:?}");
return;
@@ -522,6 +523,9 @@ impl ChimemonSource for Prs10Monitor {
loop {
let msg = select! {
_ = cancel.cancelled() => {
return;
},
_ = status_timer.tick() => {
self.status_poll().await
},

View File

@@ -8,14 +8,16 @@ use byteorder::{BigEndian, ReadBytesExt};
use bytes::{Buf, BytesMut};
use chrono::{DateTime, Duration, NaiveDateTime, Utc};
use figment::value::Map;
use futures::future::join;
use influxdb2::models::DataPoint;
use influxdb2::models::data_point::DataPointBuilder;
use itertools::Itertools;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, ReadHalf, WriteHalf};
use tokio::join;
use tokio::sync::Mutex;
use tokio::time::sleep;
use tokio::{join, select};
use tokio_serial::{SerialPort, SerialStream};
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, warn};
use crate::{
@@ -596,7 +598,7 @@ impl ChimemonSource for UCCMMonitor {
}
}
async fn run(mut self, chan: ChimemonSourceChannel) {
async fn run(mut self, chan: ChimemonSourceChannel, cancel: CancellationToken) {
info!("UCCM task starting");
if self.get_info().await.is_err() {
warn!("Error starting UCCM");
@@ -622,6 +624,11 @@ impl ChimemonSource for UCCMMonitor {
// wfut.await;
// }
// });
//
select! {
_ = cancel.cancelled() => { return },
_ = rx_handle => { return }
};
join!(rx_handle).0.unwrap();
}