cancellation
This commit is contained in:
60
Cargo.lock
generated
60
Cargo.lock
generated
@@ -177,6 +177,15 @@ version = "2.10.0"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "812e12b5285cc515a9c72a5c1d3b6d46a19dac5acfef5265968c166106e31dd3"
|
checksum = "812e12b5285cc515a9c72a5c1d3b6d46a19dac5acfef5265968c166106e31dd3"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "block2"
|
||||||
|
version = "0.6.2"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "cdeb9d870516001442e364c5220d3574d2da8dc765554b4a617230d33fa58ef5"
|
||||||
|
dependencies = [
|
||||||
|
"objc2",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "bumpalo"
|
name = "bumpalo"
|
||||||
version = "3.19.1"
|
version = "3.19.1"
|
||||||
@@ -236,6 +245,7 @@ dependencies = [
|
|||||||
"chrono",
|
"chrono",
|
||||||
"chrony-candm",
|
"chrony-candm",
|
||||||
"clap",
|
"clap",
|
||||||
|
"ctrlc",
|
||||||
"figment",
|
"figment",
|
||||||
"futures",
|
"futures",
|
||||||
"gethostname",
|
"gethostname",
|
||||||
@@ -393,6 +403,17 @@ dependencies = [
|
|||||||
"memchr",
|
"memchr",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "ctrlc"
|
||||||
|
version = "3.5.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "73736a89c4aff73035ba2ed2e565061954da00d4970fc9ac25dcc85a2a20d790"
|
||||||
|
dependencies = [
|
||||||
|
"dispatch2",
|
||||||
|
"nix 0.30.1",
|
||||||
|
"windows-sys 0.61.2",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "darling"
|
name = "darling"
|
||||||
version = "0.21.3"
|
version = "0.21.3"
|
||||||
@@ -448,6 +469,18 @@ dependencies = [
|
|||||||
"serde_core",
|
"serde_core",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "dispatch2"
|
||||||
|
version = "0.3.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "89a09f22a6c6069a18470eb92d2298acf25463f14256d24778e1230d789a2aec"
|
||||||
|
dependencies = [
|
||||||
|
"bitflags 2.10.0",
|
||||||
|
"block2",
|
||||||
|
"libc",
|
||||||
|
"objc2",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "displaydoc"
|
name = "displaydoc"
|
||||||
version = "0.2.5"
|
version = "0.2.5"
|
||||||
@@ -1274,6 +1307,18 @@ dependencies = [
|
|||||||
"libc",
|
"libc",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "nix"
|
||||||
|
version = "0.30.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "74523f3a35e05aba87a1d978330aef40f67b0304ac79c1c00b294c9830543db6"
|
||||||
|
dependencies = [
|
||||||
|
"bitflags 2.10.0",
|
||||||
|
"cfg-if",
|
||||||
|
"cfg_aliases",
|
||||||
|
"libc",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "nom"
|
name = "nom"
|
||||||
version = "7.1.3"
|
version = "7.1.3"
|
||||||
@@ -1339,6 +1384,21 @@ dependencies = [
|
|||||||
"syn 1.0.109",
|
"syn 1.0.109",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "objc2"
|
||||||
|
version = "0.6.3"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "b7c2599ce0ec54857b29ce62166b0ed9b4f6f1a70ccc9a71165b6154caca8c05"
|
||||||
|
dependencies = [
|
||||||
|
"objc2-encode",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "objc2-encode"
|
||||||
|
version = "4.1.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "ef25abbcd74fb2609453eb695bd2f860d389e457f67dc17cafc8b8cbc89d0c33"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "once_cell"
|
name = "once_cell"
|
||||||
version = "1.21.3"
|
version = "1.21.3"
|
||||||
|
|||||||
@@ -35,6 +35,7 @@ bitflags = "2.10.0"
|
|||||||
influxdb2 = "0.3.9"
|
influxdb2 = "0.3.9"
|
||||||
chrono = "0.4.43"
|
chrono = "0.4.43"
|
||||||
serde_with = "3.16.1"
|
serde_with = "3.16.1"
|
||||||
|
ctrlc = "3.5.1"
|
||||||
|
|
||||||
[dependencies.chrony-candm]
|
[dependencies.chrony-candm]
|
||||||
git = "https://github.com/aws/chrony-candm"
|
git = "https://github.com/aws/chrony-candm"
|
||||||
|
|||||||
11
src/lib.rs
11
src/lib.rs
@@ -14,6 +14,7 @@ use influxdb2::models::DataPoint;
|
|||||||
use serde_derive::{Deserialize, Serialize};
|
use serde_derive::{Deserialize, Serialize};
|
||||||
use serde_with::{DurationSeconds, serde_as};
|
use serde_with::{DurationSeconds, serde_as};
|
||||||
use tokio::sync::broadcast::*;
|
use tokio::sync::broadcast::*;
|
||||||
|
use tokio_util::sync::CancellationToken;
|
||||||
|
|
||||||
use std::{fmt::Debug, path::Path, sync::Arc};
|
use std::{fmt::Debug, path::Path, sync::Arc};
|
||||||
|
|
||||||
@@ -89,14 +90,14 @@ impl Default for ChronySockConfig {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Clone)]
|
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||||
pub struct HwmonSensorConfig {
|
pub struct HwmonSensorConfig {
|
||||||
pub name: String,
|
pub device: String,
|
||||||
pub sensor: String,
|
pub sensor: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[serde_as]
|
#[serde_as]
|
||||||
#[derive(Serialize, Deserialize, Clone)]
|
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub struct HwmonConfig {
|
pub struct HwmonConfig {
|
||||||
pub enabled: bool,
|
pub enabled: bool,
|
||||||
@@ -354,10 +355,10 @@ pub type ChimemonTargetChannel = Receiver<ChimemonMessage>;
|
|||||||
pub trait ChimemonSource {
|
pub trait ChimemonSource {
|
||||||
type Config;
|
type Config;
|
||||||
fn new(name: &str, config: Self::Config) -> Self;
|
fn new(name: &str, config: Self::Config) -> Self;
|
||||||
async fn run(self, chan: ChimemonSourceChannel);
|
async fn run(self, chan: ChimemonSourceChannel, cancel: CancellationToken);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
pub trait ChimemonTarget {
|
pub trait ChimemonTarget {
|
||||||
async fn run(self, chan: ChimemonTargetChannel);
|
async fn run(self, chan: ChimemonTargetChannel, cancel: CancellationToken);
|
||||||
}
|
}
|
||||||
|
|||||||
103
src/main.rs
103
src/main.rs
@@ -5,7 +5,8 @@ use figment::{
|
|||||||
};
|
};
|
||||||
use futures::future::join_all;
|
use futures::future::join_all;
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
use tokio::{sync::broadcast, task::JoinHandle};
|
use tokio::{select, sync::broadcast, task::JoinHandle};
|
||||||
|
use tokio_util::sync::CancellationToken;
|
||||||
use tracing::{Instrument, debug, error, info, info_span, warn};
|
use tracing::{Instrument, debug, error, info, info_span, warn};
|
||||||
use tracing_subscriber::{self, EnvFilter, fmt::format::FmtSpan};
|
use tracing_subscriber::{self, EnvFilter, fmt::format::FmtSpan};
|
||||||
|
|
||||||
@@ -36,43 +37,69 @@ struct Args {
|
|||||||
log_level: Level,
|
log_level: Level,
|
||||||
}
|
}
|
||||||
|
|
||||||
fn run_source(config: NamedSourceConfig, chan: ChimemonSourceChannel) -> Option<JoinHandle<()>> {
|
fn run_source(
|
||||||
|
config: NamedSourceConfig,
|
||||||
|
chan: ChimemonSourceChannel,
|
||||||
|
shutdown: CancellationToken,
|
||||||
|
) -> Option<JoinHandle<()>> {
|
||||||
let NamedSourceConfig { name, source } = config;
|
let NamedSourceConfig { name, source } = config;
|
||||||
match source {
|
match source {
|
||||||
SourceConfig::Chrony(source_config) if source_config.enabled => {
|
SourceConfig::Chrony(source_config) if source_config.enabled => {
|
||||||
let c = ChronyClient::new(&name, source_config);
|
let c = ChronyClient::new(&name, source_config);
|
||||||
Some(tokio::spawn(
|
Some(tokio::spawn(
|
||||||
c.run(chan).instrument(info_span!("chrony-task")),
|
c.run(chan, shutdown).instrument(info_span!("chrony-task")),
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
SourceConfig::Gpsd(source_config) if source_config.enabled => {
|
SourceConfig::Gpsd(source_config) if source_config.enabled => {
|
||||||
let c = GpsdSource::new(&name, source_config);
|
let c = GpsdSource::new(&name, source_config);
|
||||||
Some(tokio::spawn(
|
Some(tokio::spawn(
|
||||||
c.run(chan).instrument(info_span!("gpsd-task")),
|
c.run(chan, shutdown).instrument(info_span!("gpsd-task")),
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
SourceConfig::Hwmon(source_config) if source_config.enabled => {
|
SourceConfig::Hwmon(source_config) if source_config.enabled => {
|
||||||
let c = HwmonSource::new(&name, source_config);
|
let c = HwmonSource::new(&name, source_config);
|
||||||
Some(tokio::spawn(
|
Some(tokio::spawn(
|
||||||
c.run(chan).instrument(info_span!("hwmon-task")),
|
c.run(chan, shutdown).instrument(info_span!("hwmon-task")),
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
SourceConfig::Prs10(source_config) if source_config.enabled => {
|
SourceConfig::Prs10(source_config) if source_config.enabled => {
|
||||||
let c = Prs10Monitor::new(&name, source_config);
|
let c = Prs10Monitor::new(&name, source_config);
|
||||||
Some(tokio::spawn(
|
Some(tokio::spawn(
|
||||||
c.run(chan).instrument(info_span!("prs10-task")),
|
c.run(chan, shutdown).instrument(info_span!("prs10-task")),
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
SourceConfig::Uccm(source_config) if source_config.enabled => {
|
SourceConfig::Uccm(source_config) if source_config.enabled => {
|
||||||
let c = UCCMMonitor::new(&name, source_config);
|
let c = UCCMMonitor::new(&name, source_config);
|
||||||
Some(tokio::spawn(
|
Some(tokio::spawn(
|
||||||
c.run(chan.clone()).instrument(info_span!("uccm-task")),
|
c.run(chan, shutdown).instrument(info_span!("uccm-task")),
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
_ => None,
|
_ => None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn dummy_consumer(mut chan: ChimemonTargetChannel, cancel: CancellationToken) {
|
||||||
|
info!("Dummy receiver task started");
|
||||||
|
loop {
|
||||||
|
select! {
|
||||||
|
_ = cancel.cancelled() => {
|
||||||
|
return
|
||||||
|
},
|
||||||
|
Ok(m) = chan.recv() => {
|
||||||
|
match m {
|
||||||
|
ChimemonMessage::SourceReport(report) => {
|
||||||
|
let metrics = report.details.to_metrics();
|
||||||
|
info!("instance: {} metrics: {metrics:?}", report.name);
|
||||||
|
}
|
||||||
|
msg => {
|
||||||
|
info!("message: {msg:?}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::main(flavor = "current_thread")]
|
#[tokio::main(flavor = "current_thread")]
|
||||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
tracing_subscriber::fmt()
|
tracing_subscriber::fmt()
|
||||||
@@ -94,6 +121,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||||||
let (tx, _) = broadcast::channel(16);
|
let (tx, _) = broadcast::channel(16);
|
||||||
let sourcechan: ChimemonSourceChannel = tx;
|
let sourcechan: ChimemonSourceChannel = tx;
|
||||||
|
|
||||||
|
let shutdown_token = CancellationToken::new();
|
||||||
|
|
||||||
if config.influxdb.enabled {
|
if config.influxdb.enabled {
|
||||||
info!(
|
info!(
|
||||||
"Connecting to influxdb {} org: {} using token",
|
"Connecting to influxdb {} org: {} using token",
|
||||||
@@ -106,29 +135,44 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||||||
&config.influxdb.token,
|
&config.influxdb.token,
|
||||||
);
|
);
|
||||||
|
|
||||||
let mut influxrx = sourcechan.subscribe();
|
let mut influx_rx = sourcechan.subscribe();
|
||||||
|
let influx_cancel = shutdown_token.clone();
|
||||||
|
|
||||||
tasks.push(tokio::spawn(
|
tasks.push(tokio::spawn(
|
||||||
async move {
|
async move {
|
||||||
let stream = async_stream::stream! {
|
let stream = async_stream::stream! {
|
||||||
while let Ok(msg) = influxrx.recv().await {
|
while let Ok(msg) = influx_rx.recv().await {
|
||||||
match msg { ChimemonMessage::DataPoint(dp) => {
|
match msg {
|
||||||
|
ChimemonMessage::DataPoint(dp) => {
|
||||||
yield dp
|
yield dp
|
||||||
}, ChimemonMessage::DataPoints(dps) => {
|
},
|
||||||
|
ChimemonMessage::DataPoints(dps) => {
|
||||||
for p in dps {
|
for p in dps {
|
||||||
yield p
|
yield p
|
||||||
}
|
}
|
||||||
}, _ => {}
|
},
|
||||||
} }
|
_ => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
};
|
};
|
||||||
influx.write(&config.influxdb.bucket, stream).await.unwrap();
|
select! {
|
||||||
|
_ = influx_cancel.cancelled() => {
|
||||||
|
return
|
||||||
|
},
|
||||||
|
res = influx.write(&config.influxdb.bucket, stream) => {
|
||||||
|
match res {
|
||||||
|
Err(e) => error!("Error writing to influx: {}", e.to_string()),
|
||||||
|
_ => warn!("Unexpectedly shutting down influx task"),
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
}
|
}
|
||||||
.instrument(info_span!("influx-task")),
|
.instrument(info_span!("influx-task")),
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
for source in config.sources {
|
for source in config.sources {
|
||||||
if let Some(task) = run_source(source, sourcechan.clone()) {
|
if let Some(task) = run_source(source, sourcechan.clone(), shutdown_token.clone()) {
|
||||||
tasks.push(task)
|
tasks.push(task)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -141,7 +185,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||||||
if let Some(chrony_refclock) = chrony_refclock {
|
if let Some(chrony_refclock) = chrony_refclock {
|
||||||
tasks.push(tokio::spawn(
|
tasks.push(tokio::spawn(
|
||||||
chrony_refclock
|
chrony_refclock
|
||||||
.run(sourcechan.subscribe())
|
.run(sourcechan.subscribe(), shutdown_token.clone())
|
||||||
.instrument(info_span!("chrony-refclock-task")),
|
.instrument(info_span!("chrony-refclock-task")),
|
||||||
));
|
));
|
||||||
};
|
};
|
||||||
@@ -153,29 +197,20 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||||||
|
|
||||||
if sourcechan.receiver_count() == 0 {
|
if sourcechan.receiver_count() == 0 {
|
||||||
warn!("No consumers configured, events will be discarded");
|
warn!("No consumers configured, events will be discarded");
|
||||||
let mut chan = sourcechan.subscribe();
|
|
||||||
// spawn a dummy task to reap the channel and keep the process alive
|
|
||||||
tasks.push(tokio::spawn(
|
tasks.push(tokio::spawn(
|
||||||
async move {
|
dummy_consumer(sourcechan.subscribe(), shutdown_token.clone())
|
||||||
loop {
|
.instrument(info_span!("dummy-consumer-task")),
|
||||||
while let Ok(m) = chan.recv().await {
|
));
|
||||||
info!("received {m:?}");
|
|
||||||
match m {
|
|
||||||
ChimemonMessage::SourceReport(report) => {
|
|
||||||
let metrics = report.details.to_metrics();
|
|
||||||
info!("metrics: {metrics:?}");
|
|
||||||
}
|
|
||||||
_ => {}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
.instrument(info_span!("dummy-receiver-task")),
|
|
||||||
))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
debug!("Task setup complete, tasks: {}", tasks.len());
|
debug!("Task setup complete, tasks: {}", tasks.len());
|
||||||
|
|
||||||
|
ctrlc::set_handler(move || {
|
||||||
|
info!("Shutting down");
|
||||||
|
shutdown_token.cancel()
|
||||||
|
})
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
join_all(tasks).await;
|
join_all(tasks).await;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|||||||
@@ -5,7 +5,9 @@ use async_trait::async_trait;
|
|||||||
use chrony_candm::reply::{self, ReplyBody, SourceMode};
|
use chrony_candm::reply::{self, ReplyBody, SourceMode};
|
||||||
use chrony_candm::request::{self, RequestBody};
|
use chrony_candm::request::{self, RequestBody};
|
||||||
use chrony_candm::{ClientOptions, blocking_query};
|
use chrony_candm::{ClientOptions, blocking_query};
|
||||||
use tokio::join;
|
use futures::future::join;
|
||||||
|
use tokio::{join, select};
|
||||||
|
use tokio_util::sync::CancellationToken;
|
||||||
use tracing::{info, warn};
|
use tracing::{info, warn};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
@@ -297,38 +299,33 @@ impl ChimemonSource for ChronyClient {
|
|||||||
config,
|
config,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
async fn run(self, chan: ChimemonSourceChannel) {
|
async fn run(self, chan: ChimemonSourceChannel, cancel: CancellationToken) {
|
||||||
info!("Chrony task started");
|
info!("Chrony task started");
|
||||||
|
|
||||||
let mut t_interval = tokio::time::interval(self.config.tracking_interval);
|
let mut t_interval = tokio::time::interval(self.config.tracking_interval);
|
||||||
let mut s_interval = tokio::time::interval(self.config.sources_interval);
|
let mut s_interval = tokio::time::interval(self.config.sources_interval);
|
||||||
|
|
||||||
let t_future = async {
|
|
||||||
let lchan = chan.clone();
|
|
||||||
loop {
|
loop {
|
||||||
t_interval.tick().await;
|
select! {
|
||||||
|
_ = cancel.cancelled() => {
|
||||||
match self.tracking_poll(&lchan).await {
|
return
|
||||||
|
},
|
||||||
|
_ = t_interval.tick() => {
|
||||||
|
match self.tracking_poll(&chan).await {
|
||||||
|
Ok(_) => (),
|
||||||
|
Err(e) => {
|
||||||
|
warn!("Error in chrony task: {}", e.to_string());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
_ = s_interval.tick() => {
|
||||||
|
match self.sources_poll(&chan).await {
|
||||||
Ok(_) => (),
|
Ok(_) => (),
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
warn!("Error in chrony task: {}", e.to_string());
|
warn!("Error in chrony task: {}", e.to_string());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
|
||||||
let s_future = async {
|
|
||||||
let lchan = chan.clone();
|
|
||||||
loop {
|
|
||||||
s_interval.tick().await;
|
|
||||||
|
|
||||||
match self.sources_poll(&lchan).await {
|
|
||||||
Ok(_) => (),
|
|
||||||
Err(e) => {
|
|
||||||
warn!("Error in chrony task: {}", e.to_string());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
|
||||||
join!(t_future, s_future);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -14,6 +14,7 @@ use serde_json;
|
|||||||
use tokio::net::{TcpStream, ToSocketAddrs, lookup_host};
|
use tokio::net::{TcpStream, ToSocketAddrs, lookup_host};
|
||||||
use tokio::time::{interval, timeout};
|
use tokio::time::{interval, timeout};
|
||||||
use tokio_util::codec::{Framed, LinesCodec};
|
use tokio_util::codec::{Framed, LinesCodec};
|
||||||
|
use tokio_util::sync::CancellationToken;
|
||||||
use tracing::{debug, debug_span, info, instrument, warn};
|
use tracing::{debug, debug_span, info, instrument, warn};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
@@ -170,7 +171,7 @@ impl ChimemonSource for GpsdSource {
|
|||||||
.unwrap();
|
.unwrap();
|
||||||
rt.block_on(Self::inner_new(name, config)).unwrap()
|
rt.block_on(Self::inner_new(name, config)).unwrap()
|
||||||
}
|
}
|
||||||
async fn run(mut self, mut chan: ChimemonSourceChannel) {
|
async fn run(mut self, mut chan: ChimemonSourceChannel, cancel: CancellationToken) {
|
||||||
info!("gpsd task started");
|
info!("gpsd task started");
|
||||||
self.conn.conn().await.unwrap();
|
self.conn.conn().await.unwrap();
|
||||||
let mut ticker = interval(self.config.interval);
|
let mut ticker = interval(self.config.interval);
|
||||||
@@ -184,6 +185,9 @@ impl ChimemonSource for GpsdSource {
|
|||||||
loop {
|
loop {
|
||||||
let framed = self.conn.framed.as_mut().expect("must be connected");
|
let framed = self.conn.framed.as_mut().expect("must be connected");
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
|
_ = cancel.cancelled() => {
|
||||||
|
return
|
||||||
|
},
|
||||||
_ = ticker.tick() => {
|
_ = ticker.tick() => {
|
||||||
self.send_status(&mut chan).await
|
self.send_status(&mut chan).await
|
||||||
},
|
},
|
||||||
|
|||||||
@@ -1,6 +1,8 @@
|
|||||||
use std::{fs::File, io::Read, path::PathBuf, sync::Arc};
|
use std::{fs::File, io::Read, path::PathBuf, sync::Arc};
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
|
use tokio::select;
|
||||||
|
use tokio_util::sync::CancellationToken;
|
||||||
use tracing::{debug, error, info, warn};
|
use tracing::{debug, error, info, warn};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
@@ -106,23 +108,27 @@ impl ChimemonSource for HwmonSource {
|
|||||||
let sensors = config
|
let sensors = config
|
||||||
.sensors
|
.sensors
|
||||||
.iter()
|
.iter()
|
||||||
.map(|(k, v)| Arc::new(HwmonSensor::new(k, &v.name, &v.sensor)))
|
.map(|(k, v)| Arc::new(HwmonSensor::new(k, &v.device, &v.sensor)))
|
||||||
.collect();
|
.collect();
|
||||||
|
debug!("config: {config:?}");
|
||||||
HwmonSource {
|
HwmonSource {
|
||||||
name: name.to_owned(),
|
name: name.to_owned(),
|
||||||
config,
|
config,
|
||||||
sensors,
|
sensors,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
async fn run(self, chan: ChimemonSourceChannel) {
|
async fn run(self, chan: ChimemonSourceChannel, cancel: CancellationToken) {
|
||||||
info!("hwmon task started");
|
info!("hwmon task started");
|
||||||
let mut interval = tokio::time::interval(self.config.interval);
|
let mut interval = tokio::time::interval(self.config.interval);
|
||||||
loop {
|
loop {
|
||||||
interval.tick().await;
|
select! {
|
||||||
|
_ = cancel.cancelled() => { return; },
|
||||||
|
_ = interval.tick() => {
|
||||||
let mut values = Vec::new();
|
let mut values = Vec::new();
|
||||||
for s in &self.sensors {
|
for s in &self.sensors {
|
||||||
if let Ok(sensor_val) = HwmonSource::get_raw_value(s).await {
|
debug!("Sensor {s:?}");
|
||||||
|
match HwmonSource::get_raw_value(s).await {
|
||||||
|
Ok(sensor_val) => {
|
||||||
debug!(
|
debug!(
|
||||||
"hwmon {} raw value {}",
|
"hwmon {} raw value {}",
|
||||||
s.value_path.to_string_lossy(),
|
s.value_path.to_string_lossy(),
|
||||||
@@ -136,8 +142,11 @@ impl ChimemonSource for HwmonSource {
|
|||||||
s.value_path.to_string_lossy()
|
s.value_path.to_string_lossy()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
} else {
|
},
|
||||||
error!("Unable to get hwmon sensor value");
|
Err(e) => {
|
||||||
|
error!("Unable to get hwmon sensor value ({})", e.to_string());
|
||||||
|
continue
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
let report = SourceReport {
|
let report = SourceReport {
|
||||||
@@ -154,4 +163,6 @@ impl ChimemonSource for HwmonSource {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ use tokio::select;
|
|||||||
use tokio::sync::OnceCell;
|
use tokio::sync::OnceCell;
|
||||||
use tokio::time::{interval, timeout};
|
use tokio::time::{interval, timeout};
|
||||||
use tokio_serial::{SerialPort, SerialStream};
|
use tokio_serial::{SerialPort, SerialStream};
|
||||||
|
use tokio_util::sync::CancellationToken;
|
||||||
use tracing::{debug, debug_span, error, info, instrument, warn};
|
use tracing::{debug, debug_span, error, info, instrument, warn};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
@@ -504,8 +505,8 @@ impl ChimemonSource for Prs10Monitor {
|
|||||||
info: OnceCell::new(),
|
info: OnceCell::new(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
async fn run(mut self, chan: ChimemonSourceChannel) {
|
async fn run(mut self, chan: ChimemonSourceChannel, cancel: CancellationToken) {
|
||||||
info!("PRS10 task starting");
|
info!("PRS10 task started");
|
||||||
if let Err(e) = self.set_info().await {
|
if let Err(e) = self.set_info().await {
|
||||||
error!("Error starting PRS10: {e:?}");
|
error!("Error starting PRS10: {e:?}");
|
||||||
return;
|
return;
|
||||||
@@ -522,6 +523,9 @@ impl ChimemonSource for Prs10Monitor {
|
|||||||
|
|
||||||
loop {
|
loop {
|
||||||
let msg = select! {
|
let msg = select! {
|
||||||
|
_ = cancel.cancelled() => {
|
||||||
|
return;
|
||||||
|
},
|
||||||
_ = status_timer.tick() => {
|
_ = status_timer.tick() => {
|
||||||
self.status_poll().await
|
self.status_poll().await
|
||||||
},
|
},
|
||||||
|
|||||||
@@ -8,14 +8,16 @@ use byteorder::{BigEndian, ReadBytesExt};
|
|||||||
use bytes::{Buf, BytesMut};
|
use bytes::{Buf, BytesMut};
|
||||||
use chrono::{DateTime, Duration, NaiveDateTime, Utc};
|
use chrono::{DateTime, Duration, NaiveDateTime, Utc};
|
||||||
use figment::value::Map;
|
use figment::value::Map;
|
||||||
|
use futures::future::join;
|
||||||
use influxdb2::models::DataPoint;
|
use influxdb2::models::DataPoint;
|
||||||
use influxdb2::models::data_point::DataPointBuilder;
|
use influxdb2::models::data_point::DataPointBuilder;
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, ReadHalf, WriteHalf};
|
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, ReadHalf, WriteHalf};
|
||||||
use tokio::join;
|
|
||||||
use tokio::sync::Mutex;
|
use tokio::sync::Mutex;
|
||||||
use tokio::time::sleep;
|
use tokio::time::sleep;
|
||||||
|
use tokio::{join, select};
|
||||||
use tokio_serial::{SerialPort, SerialStream};
|
use tokio_serial::{SerialPort, SerialStream};
|
||||||
|
use tokio_util::sync::CancellationToken;
|
||||||
use tracing::{debug, info, warn};
|
use tracing::{debug, info, warn};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
@@ -596,7 +598,7 @@ impl ChimemonSource for UCCMMonitor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn run(mut self, chan: ChimemonSourceChannel) {
|
async fn run(mut self, chan: ChimemonSourceChannel, cancel: CancellationToken) {
|
||||||
info!("UCCM task starting");
|
info!("UCCM task starting");
|
||||||
if self.get_info().await.is_err() {
|
if self.get_info().await.is_err() {
|
||||||
warn!("Error starting UCCM");
|
warn!("Error starting UCCM");
|
||||||
@@ -622,6 +624,11 @@ impl ChimemonSource for UCCMMonitor {
|
|||||||
// wfut.await;
|
// wfut.await;
|
||||||
// }
|
// }
|
||||||
// });
|
// });
|
||||||
|
//
|
||||||
|
select! {
|
||||||
|
_ = cancel.cancelled() => { return },
|
||||||
|
_ = rx_handle => { return }
|
||||||
|
};
|
||||||
|
|
||||||
join!(rx_handle).0.unwrap();
|
join!(rx_handle).0.unwrap();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,7 +4,9 @@ use std::path::PathBuf;
|
|||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use libc::{c_double, c_int, timeval};
|
use libc::{c_double, c_int, timeval};
|
||||||
use tracing::debug;
|
use tokio::select;
|
||||||
|
use tokio_util::sync::CancellationToken;
|
||||||
|
use tracing::{debug, warn};
|
||||||
|
|
||||||
use crate::{ChimemonMessage, ChimemonTarget, ChimemonTargetChannel, ChronySockConfig};
|
use crate::{ChimemonMessage, ChimemonTarget, ChimemonTargetChannel, ChronySockConfig};
|
||||||
|
|
||||||
@@ -35,11 +37,13 @@ impl ChronySockServer {
|
|||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl ChimemonTarget for ChronySockServer {
|
impl ChimemonTarget for ChronySockServer {
|
||||||
async fn run(mut self, mut chan: ChimemonTargetChannel) {
|
async fn run(mut self, mut chan: ChimemonTargetChannel, cancel: CancellationToken) {
|
||||||
loop {
|
loop {
|
||||||
let msg = chan.recv().await.unwrap();
|
select! {
|
||||||
|
_ = cancel.cancelled() => { return }
|
||||||
|
msg = chan.recv() => {
|
||||||
match msg {
|
match msg {
|
||||||
ChimemonMessage::TimeReport(tr) => {
|
Ok(ChimemonMessage::TimeReport(tr)) => {
|
||||||
if tr.valid {
|
if tr.valid {
|
||||||
{
|
{
|
||||||
let frame = ChronyTimeReport {
|
let frame = ChronyTimeReport {
|
||||||
@@ -68,9 +72,12 @@ impl ChimemonTarget for ChronySockServer {
|
|||||||
sock.send_to(bs, &self.sock_path).unwrap();
|
sock.send_to(bs, &self.sock_path).unwrap();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
},
|
||||||
|
Err(e) => warn!("Error receiving from channel: {}", e.to_string()),
|
||||||
_ => continue,
|
_ => continue,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user