diff --git a/src/config.rs b/src/config.rs new file mode 100644 index 0000000..fa34932 --- /dev/null +++ b/src/config.rs @@ -0,0 +1,219 @@ +use std::{collections::BTreeMap, path::Path}; + +use figment::{ + Figment, Provider, + providers::{Format, Serialized, Toml}, + util::map, + value::Map, +}; +use gethostname::gethostname; +use serde_derive::{Deserialize, Serialize}; +use serde_with::{DurationSeconds, serde_as}; + +#[derive(Serialize, Deserialize, Clone)] +pub struct InfluxConfig { + pub enabled: bool, + pub url: String, + pub org: String, + pub bucket: String, + pub token: String, + pub tags: Map, +} + +impl Default for InfluxConfig { + fn default() -> Self { + let host = gethostname().into_string().unwrap(); + InfluxConfig { + enabled: false, + url: "http://localhost:8086".into(), + org: "default".into(), + bucket: "default".into(), + token: "".into(), + tags: map! { "host".into() => host }, + } + } +} + +#[serde_as] +#[derive(Serialize, Deserialize, Clone)] +#[serde(default)] +pub struct ChronyConfig { + pub enabled: bool, + #[serde_as(as = "DurationSeconds")] + pub timeout: std::time::Duration, + #[serde_as(as = "DurationSeconds")] + pub tracking_interval: std::time::Duration, + #[serde_as(as = "DurationSeconds")] + pub sources_interval: std::time::Duration, + pub measurement_prefix: String, + pub tracking_measurement: String, + pub sources_measurement: String, + pub host: String, +} + +impl Default for ChronyConfig { + fn default() -> Self { + ChronyConfig { + enabled: false, + timeout: std::time::Duration::from_secs(5), + tracking_interval: std::time::Duration::from_secs(60), + sources_interval: std::time::Duration::from_secs(300), + measurement_prefix: "chrony.".into(), + tracking_measurement: "tracking".into(), + sources_measurement: "sources".into(), + host: "127.0.0.1:323".into(), + } + } +} + +#[derive(Serialize, Deserialize, Clone)] +#[serde(default)] +pub struct ChronySockConfig { + pub enabled: bool, + pub sock: String, +} + +impl Default for ChronySockConfig { + fn default() -> Self { + ChronySockConfig { + enabled: false, + sock: "".into(), + } + } +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct HwmonSensorConfig { + pub device: String, + pub sensor: String, +} + +#[serde_as] +#[derive(Serialize, Deserialize, Clone, Debug)] +#[serde(default)] +pub struct HwmonConfig { + pub enabled: bool, + #[serde_as(as = "DurationSeconds")] + pub interval: std::time::Duration, + pub measurement: String, + pub sensors: Map, +} + +impl Default for HwmonConfig { + fn default() -> Self { + HwmonConfig { + enabled: false, + interval: std::time::Duration::from_secs(60), + measurement: "hwmon".into(), + sensors: map! {}, + } + } +} + +#[serde_as] +#[derive(Serialize, Deserialize, Clone, Debug)] +#[serde(default)] +pub struct GpsdConfig { + pub enabled: bool, + #[serde_as(as = "DurationSeconds")] + pub interval: std::time::Duration, + pub host: String, +} + +impl Default for GpsdConfig { + fn default() -> Self { + GpsdConfig { + enabled: false, + interval: std::time::Duration::from_secs(60), + host: "localhost:2947".into(), + } + } +} + +#[serde_as] +#[derive(Serialize, Deserialize, Clone, Debug)] +#[serde(default)] +pub struct Prs10Config { + pub enabled: bool, + pub port: String, + pub baud: u32, + #[serde_as(as = "DurationSeconds")] + pub timeout: std::time::Duration, + #[serde_as(as = "DurationSeconds")] + pub status_interval: std::time::Duration, + #[serde_as(as = "DurationSeconds")] + pub stats_interval: std::time::Duration, +} + +impl Default for Prs10Config { + fn default() -> Self { + Prs10Config { + enabled: false, + port: "/dev/ttyS0".into(), + baud: 9600, + timeout: std::time::Duration::from_secs(1), + status_interval: std::time::Duration::from_secs(10), + stats_interval: std::time::Duration::from_secs(30), + } + } +} + +#[serde_as] +#[derive(Serialize, Deserialize, Clone)] +#[serde(default)] +pub struct UCCMConfig { + pub enabled: bool, + pub port: String, + pub baud: u32, + #[serde_as(as = "DurationSeconds")] + pub status_interval: std::time::Duration, + #[serde_as(as = "DurationSeconds")] + pub timeout: std::time::Duration, + pub measurement: String, +} + +impl Default for UCCMConfig { + fn default() -> Self { + UCCMConfig { + enabled: false, + port: "/dev/ttyS0".into(), + baud: 57600, + status_interval: std::time::Duration::from_secs(10), + timeout: std::time::Duration::from_secs(1), + measurement: "uccm_gpsdo".into(), + } + } +} + +#[derive(Serialize, Deserialize, Clone)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum SourceConfig { + Chrony(ChronyConfig), + Hwmon(HwmonConfig), + Uccm(UCCMConfig), + Gpsd(GpsdConfig), + Prs10(Prs10Config), +} + +#[derive(Serialize, Deserialize, Clone)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum TargetConfig { + ChronySock(ChronySockConfig), + InfluxDb(InfluxConfig), +} + +#[derive(Serialize, Deserialize, Clone, Default)] +pub struct Config { + pub influxdb: InfluxConfig, + pub sources: BTreeMap, + pub targets: BTreeMap, +} + +impl Provider for Config { + fn metadata(&self) -> figment::Metadata { + figment::Metadata::named("Default config") + } + fn data(&self) -> Result, figment::Error> { + Serialized::defaults(Config::default()).data() + } +} diff --git a/src/lib.rs b/src/lib.rs index a5c3e7d..5fe807b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,4 @@ +pub mod config; pub mod sources; pub mod targets; @@ -11,168 +12,11 @@ macro_rules! fatal { use async_trait::async_trait; use chrono::{DateTime, Utc}; -use figment::{ - Figment, Provider, - providers::{Format, Serialized, Toml}, - util::map, - value::Map, -}; -use gethostname::gethostname; -use influxdb2::models::DataPoint; -use serde_derive::{Deserialize, Serialize}; -use serde_with::{DurationSeconds, serde_as}; + use tokio::sync::broadcast::*; use tokio_util::sync::CancellationToken; -use std::{fmt::Debug, path::Path, sync::Arc}; - -#[derive(Serialize, Deserialize, Clone)] -pub struct InfluxConfig { - pub enabled: bool, - pub url: String, - pub org: String, - pub bucket: String, - pub token: String, - pub tags: Map, -} - -impl Default for InfluxConfig { - fn default() -> Self { - let host = gethostname().into_string().unwrap(); - InfluxConfig { - enabled: false, - url: "http://localhost:8086".into(), - org: "default".into(), - bucket: "default".into(), - token: "".into(), - tags: map! { "host".into() => host }, - } - } -} - -#[serde_as] -#[derive(Serialize, Deserialize, Clone)] -#[serde(default)] -pub struct ChronyConfig { - pub enabled: bool, - #[serde_as(as = "DurationSeconds")] - pub timeout: std::time::Duration, - #[serde_as(as = "DurationSeconds")] - pub tracking_interval: std::time::Duration, - #[serde_as(as = "DurationSeconds")] - pub sources_interval: std::time::Duration, - pub measurement_prefix: String, - pub tracking_measurement: String, - pub sources_measurement: String, - pub host: String, -} - -impl Default for ChronyConfig { - fn default() -> Self { - ChronyConfig { - enabled: false, - timeout: std::time::Duration::from_secs(5), - tracking_interval: std::time::Duration::from_secs(60), - sources_interval: std::time::Duration::from_secs(300), - measurement_prefix: "chrony.".into(), - tracking_measurement: "tracking".into(), - sources_measurement: "sources".into(), - host: "127.0.0.1:323".into(), - } - } -} - -#[derive(Serialize, Deserialize, Clone)] -#[serde(default)] -pub struct ChronySockConfig { - pub enabled: bool, - pub sock: String, -} - -impl Default for ChronySockConfig { - fn default() -> Self { - ChronySockConfig { - enabled: false, - sock: "".into(), - } - } -} - -#[derive(Serialize, Deserialize, Clone, Debug)] -pub struct HwmonSensorConfig { - pub device: String, - pub sensor: String, -} - -#[serde_as] -#[derive(Serialize, Deserialize, Clone, Debug)] -#[serde(default)] -pub struct HwmonConfig { - pub enabled: bool, - #[serde_as(as = "DurationSeconds")] - pub interval: std::time::Duration, - pub measurement: String, - pub sensors: Map, -} - -impl Default for HwmonConfig { - fn default() -> Self { - HwmonConfig { - enabled: false, - interval: std::time::Duration::from_secs(60), - measurement: "hwmon".into(), - sensors: map! {}, - } - } -} - -#[serde_as] -#[derive(Serialize, Deserialize, Clone, Debug)] -#[serde(default)] -pub struct GpsdConfig { - pub enabled: bool, - #[serde_as(as = "DurationSeconds")] - pub interval: std::time::Duration, - pub host: String, -} - -impl Default for GpsdConfig { - fn default() -> Self { - GpsdConfig { - enabled: false, - interval: std::time::Duration::from_secs(60), - host: "localhost:2947".into(), - } - } -} - -#[serde_as] -#[derive(Serialize, Deserialize, Clone, Debug)] -#[serde(default)] -pub struct Prs10Config { - pub enabled: bool, - pub port: String, - pub baud: u32, - #[serde_as(as = "DurationSeconds")] - pub timeout: std::time::Duration, - #[serde_as(as = "DurationSeconds")] - pub status_interval: std::time::Duration, - #[serde_as(as = "DurationSeconds")] - pub stats_interval: std::time::Duration, -} - -impl Default for Prs10Config { - fn default() -> Self { - Prs10Config { - enabled: false, - port: "/dev/ttyS0".into(), - baud: 9600, - timeout: std::time::Duration::from_secs(1), - status_interval: std::time::Duration::from_secs(10), - stats_interval: std::time::Duration::from_secs(30), - } - } -} +use std::{fmt::Debug, sync::Arc}; #[derive(Clone, Debug)] pub struct TimeReport { @@ -247,103 +91,12 @@ pub struct SourceReport { pub details: Arc, } -#[serde_as] -#[derive(Serialize, Deserialize, Clone)] -#[serde(default)] -pub struct UCCMConfig { - pub enabled: bool, - pub port: String, - pub baud: u32, - #[serde_as(as = "DurationSeconds")] - pub status_interval: std::time::Duration, - #[serde_as(as = "DurationSeconds")] - pub timeout: std::time::Duration, - pub measurement: String, -} - -impl Default for UCCMConfig { - fn default() -> Self { - UCCMConfig { - enabled: false, - port: "/dev/ttyS0".into(), - baud: 57600, - status_interval: std::time::Duration::from_secs(10), - timeout: std::time::Duration::from_secs(1), - measurement: "uccm_gpsdo".into(), - } - } -} - -#[derive(Serialize, Deserialize, Clone, Default)] -pub struct SourcesConfig { - pub chrony: ChronyConfig, - pub hwmon: HwmonConfig, - pub uccm: UCCMConfig, - pub gpsd: GpsdConfig, - pub prs10: Prs10Config, -} - -#[derive(Serialize, Deserialize, Clone)] -#[serde(tag = "type", rename_all = "snake_case")] -pub enum SourceConfig { - Chrony(ChronyConfig), - Hwmon(HwmonConfig), - Uccm(UCCMConfig), - Gpsd(GpsdConfig), - Prs10(Prs10Config), -} - -#[derive(Serialize, Deserialize, Clone)] -pub struct NamedSourceConfig { - pub name: String, - #[serde(flatten)] - pub source: SourceConfig, -} - -#[derive(Serialize, Deserialize, Clone, Default)] -pub struct TargetsConfig { - pub chrony: ChronySockConfig, -} - -#[derive(Serialize, Deserialize, Clone, Default)] -pub struct Config { - pub influxdb: InfluxConfig, - pub sources: Vec, - pub targets: TargetsConfig, -} - -impl Provider for Config { - fn metadata(&self) -> figment::Metadata { - figment::Metadata::named("Default config") - } - fn data(&self) -> Result, figment::Error> { - Serialized::defaults(Config::default()).data() - } -} - -pub fn load_config(filename: &Path) -> Figment { - Figment::from(Serialized::defaults(Config::default())).merge(Toml::file(filename)) -} - #[derive(Debug, Clone)] pub enum ChimemonMessage { - DataPoint(DataPoint), - DataPoints(Vec), TimeReport(TimeReport), SourceReport(SourceReport), } -impl From for ChimemonMessage { - fn from(dp: DataPoint) -> Self { - ChimemonMessage::DataPoint(dp) - } -} -impl From> for ChimemonMessage { - fn from(dps: Vec) -> Self { - ChimemonMessage::DataPoints(dps) - } -} - impl From for ChimemonMessage { fn from(tr: TimeReport) -> Self { ChimemonMessage::TimeReport(tr) @@ -368,5 +121,7 @@ pub trait ChimemonSource { #[async_trait] pub trait ChimemonTarget { + type Config; + fn new(name: &str, config: Self::Config) -> Self; async fn run(self, chan: ChimemonTargetChannel, cancel: CancellationToken); } diff --git a/src/main.rs b/src/main.rs index a7da091..086bd4f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,7 +9,11 @@ use tokio_util::sync::CancellationToken; use tracing::{Instrument, debug, error, info, info_span, warn}; use tracing_subscriber::{self, EnvFilter, fmt::format::FmtSpan}; -use chimemon::*; +use chimemon::{ + config::{SourceConfig, TargetConfig}, + *, +}; +use config::Config; use sources::{ chrony::ChronyClient, gpsd::GpsdSource, hwmon::HwmonSource, prs10::Prs10Monitor, uccm::UCCMMonitor, @@ -54,11 +58,11 @@ struct Args { } fn run_source( - config: NamedSourceConfig, + name: &str, + source: SourceConfig, chan: ChimemonSourceChannel, shutdown: CancellationToken, ) -> Option> { - let NamedSourceConfig { name, source } = config; match source { SourceConfig::Chrony(source_config) if source_config.enabled => { let c = ChronyClient::new(&name, source_config); @@ -94,6 +98,28 @@ fn run_source( } } +fn run_target( + name: &str, + target: TargetConfig, + chan: ChimemonTargetChannel, + cancel: CancellationToken, +) -> Option> { + match target { + TargetConfig::ChronySock(source_config) if source_config.enabled => { + let c = ChronySockServer::new(name, source_config); + Some(tokio::spawn( + c.run(chan, cancel) + .instrument(info_span!("chronysock-task")), + )) + } + TargetConfig::InfluxDb(source_config) if source_config.enabled => { + warn!("influx not implemented"); + None + } + _ => None, + } +} + async fn dummy_consumer(mut chan: ChimemonTargetChannel, cancel: CancellationToken) { info!("Dummy receiver task started"); loop { @@ -141,72 +167,22 @@ async fn main() -> Result<(), Box> { let shutdown_token = CancellationToken::new(); - if config.influxdb.enabled { - info!( - "Connecting to influxdb {} org: {} using token", - &config.influxdb.url, &config.influxdb.org - ); - let config = config.clone(); - let influx = influxdb2::Client::new( - &config.influxdb.url, - &config.influxdb.org, - &config.influxdb.token, - ); - - let mut influx_rx = sourcechan.subscribe(); - let influx_cancel = shutdown_token.clone(); - - tasks.push(tokio::spawn( - async move { - let stream = async_stream::stream! { - while let Ok(msg) = influx_rx.recv().await { - match msg { - ChimemonMessage::DataPoint(dp) => { - yield dp - }, - ChimemonMessage::DataPoints(dps) => { - for p in dps { - yield p - } - }, - _ => {} - } - } - }; - select! { - _ = influx_cancel.cancelled() => { - return - }, - res = influx.write(&config.influxdb.bucket, stream) => { - match res { - Err(e) => error!("Error writing to influx: {}", e.to_string()), - _ => warn!("Unexpectedly shutting down influx task"), - } - }, - } - } - .instrument(info_span!("influx-task")), - )); - } - - for source in config.sources { - if let Some(task) = run_source(source, sourcechan.clone(), shutdown_token.clone()) { + for (name, source) in config.sources { + if let Some(task) = run_source(&name, source, sourcechan.clone(), shutdown_token.clone()) { tasks.push(task) } } - let chrony_refclock = if config.targets.chrony.enabled { - Some(ChronySockServer::new(config.targets.chrony.to_owned())) - } else { - None - }; - if let Some(chrony_refclock) = chrony_refclock { - tasks.push(tokio::spawn( - chrony_refclock - .run(sourcechan.subscribe(), shutdown_token.clone()) - .instrument(info_span!("chrony-refclock-task")), - )); - }; + for (name, target) in config.targets { + if let Some(task) = run_target( + &name, + target, + sourcechan.subscribe(), + shutdown_token.clone(), + ) { + tasks.push(task) + } + } if tasks.len() == 0 { error!("No tasks configured, exiting."); diff --git a/src/sources/chrony.rs b/src/sources/chrony.rs index 76fc343..a952f28 100644 --- a/src/sources/chrony.rs +++ b/src/sources/chrony.rs @@ -5,14 +5,13 @@ use async_trait::async_trait; use chrony_candm::reply::{self, ReplyBody, SourceMode}; use chrony_candm::request::{self, RequestBody}; use chrony_candm::{ClientOptions, blocking_query}; -use futures::future::join; -use tokio::{join, select}; +use tokio::select; use tokio_util::sync::CancellationToken; use tracing::{info, warn}; use crate::{ - ChimemonSource, ChimemonSourceChannel, ChronyConfig, MetricTags, SourceMetric, SourceReport, - SourceReportDetails, SourceStatus, + ChimemonSource, ChimemonSourceChannel, MetricTags, SourceMetric, SourceReport, + SourceReportDetails, SourceStatus, config::ChronyConfig, }; pub struct ChronyClient { @@ -313,7 +312,7 @@ impl ChimemonSource for ChronyClient { match self.tracking_poll(&chan).await { Ok(_) => (), Err(e) => { - warn!("Error in chrony task: {}", e.to_string()); + warn!(error = ?e, "Error in chrony tracking task"); } } }, @@ -321,7 +320,7 @@ impl ChimemonSource for ChronyClient { match self.sources_poll(&chan).await { Ok(_) => (), Err(e) => { - warn!("Error in chrony task: {}", e.to_string()); + warn!(error = ?e, "Error in chrony sources task"); } } } diff --git a/src/sources/gpsd.rs b/src/sources/gpsd.rs index b027dcf..1871e20 100644 --- a/src/sources/gpsd.rs +++ b/src/sources/gpsd.rs @@ -15,11 +15,11 @@ use tokio::net::{TcpStream, ToSocketAddrs, lookup_host}; use tokio::time::{interval, timeout}; use tokio_util::codec::{Framed, LinesCodec}; use tokio_util::sync::CancellationToken; -use tracing::{debug, debug_span, info, instrument, warn}; +use tracing::{debug, debug_span, error, info, instrument, warn}; use crate::{ - ChimemonMessage, ChimemonSource, ChimemonSourceChannel, GpsdConfig, SourceMetric, SourceReport, - SourceReportDetails, SourceStatus, + ChimemonMessage, ChimemonSource, ChimemonSourceChannel, SourceMetric, SourceReport, + SourceReportDetails, SourceStatus, config::GpsdConfig, }; pub struct GpsdSource { @@ -116,17 +116,21 @@ impl GpsdSource { .and_then(|sky| sky.tdop) .map_or(f64::INFINITY, |tdop| tdop as f64); - chan.send(ChimemonMessage::SourceReport(SourceReport { - name: self.name.clone(), - status: SourceStatus::Unknown, - details: Arc::new(GpsdSourceReport { - fix_type: tpv.map_or(GpsdFixType::Unknown, |tpv| tpv.mode.into()), - sats_tracked, - sats_visible, - tdop, - }), - })) - .unwrap(); + if let Err(e) = chan.send( + SourceReport { + name: self.name.clone(), + status: SourceStatus::Unknown, + details: Arc::new(GpsdSourceReport { + fix_type: tpv.map_or(GpsdFixType::Unknown, |tpv| tpv.mode.into()), + sats_tracked, + sats_visible, + tdop, + }), + } + .into(), + ) { + error!(error = ?e, "Unable to send to channel") + } } fn handle_msg(&mut self, msg: String) -> Result<(), Box> { diff --git a/src/sources/hwmon.rs b/src/sources/hwmon.rs index a777891..de3caf7 100644 --- a/src/sources/hwmon.rs +++ b/src/sources/hwmon.rs @@ -6,9 +6,10 @@ use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, warn}; use crate::{ - ChimemonSource, ChimemonSourceChannel, HwmonConfig, MetricTags, SourceMetric, SourceReport, - SourceReportDetails, SourceStatus, + ChimemonSource, ChimemonSourceChannel, MetricTags, SourceMetric, SourceReport, + SourceReportDetails, SourceStatus, config::HwmonConfig, }; + pub struct HwmonSource { name: String, config: HwmonConfig, diff --git a/src/sources/prs10.rs b/src/sources/prs10.rs index f83de01..3ce697b 100644 --- a/src/sources/prs10.rs +++ b/src/sources/prs10.rs @@ -17,8 +17,8 @@ use tokio_util::sync::CancellationToken; use tracing::{debug, debug_span, error, info, instrument, warn}; use crate::{ - ChimemonMessage, ChimemonSource, ChimemonSourceChannel, MetricTags, Prs10Config, SourceMetric, - SourceReport, SourceReportDetails, SourceStatus, fatal, + ChimemonMessage, ChimemonSource, ChimemonSourceChannel, MetricTags, SourceMetric, SourceReport, + SourceReportDetails, SourceStatus, config::Prs10Config, fatal, }; #[derive(Debug)] @@ -429,7 +429,7 @@ impl Prs10Monitor { #[instrument(skip_all)] async fn status_poll(&mut self) -> Result> { let status = self.get_status().await?; - Ok(ChimemonMessage::SourceReport(SourceReport { + Ok(SourceReport { name: self.name.clone(), status: if status.is_healthy() { SourceStatus::Healthy @@ -437,7 +437,8 @@ impl Prs10Monitor { SourceStatus::Unknown }, details: Arc::new(status), - })) + } + .into()) } #[instrument(skip_all)] @@ -459,7 +460,7 @@ impl Prs10Monitor { } drop(stats_guard); - Ok(ChimemonMessage::SourceReport(SourceReport { + Ok(SourceReport { name: self.name.clone(), status: SourceStatus::Unknown, details: Arc::new(Prs10Stats { @@ -488,7 +489,8 @@ impl Prs10Monitor { mul_amp_gain_volts: analog_values[18], rf_lock_volts: analog_values[19], }), - })) + } + .into()) } async fn reset_rx_state(&mut self) -> Result<(), Box> { diff --git a/src/sources/uccm.rs b/src/sources/uccm.rs index cb4f56a..8591e6a 100644 --- a/src/sources/uccm.rs +++ b/src/sources/uccm.rs @@ -7,7 +7,6 @@ use bitflags::bitflags; use byteorder::{BigEndian, ReadBytesExt}; use bytes::{Buf, BytesMut}; use chrono::{DateTime, Duration, NaiveDateTime, Utc}; -use figment::value::Map; use itertools::Itertools; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, ReadHalf, WriteHalf}; use tokio::select; @@ -19,7 +18,7 @@ use tracing::{debug, error, info, warn}; use crate::{ ChimemonMessage, ChimemonSource, ChimemonSourceChannel, SourceMetric, SourceReport, - SourceReportDetails, SourceStatus, TimeReport, UCCMConfig, + SourceReportDetails, SourceStatus, TimeReport, config::UCCMConfig, }; pub const GPS_EPOCH: i64 = 315964800; // Doesn't seem possible to have a const DateTime object @@ -465,36 +464,41 @@ impl UCCMMonitor { && tod .flags .contains(UCCMFlags::OSC_LOCK | UCCMFlags::HAVE_GPS_TIME); - chan.send(ChimemonMessage::TimeReport(TimeReport { - system_time: sysnow, - offset, - leaps: tod.leaps as isize, - leap_flag: tod.flags.contains(UCCMFlags::LEAP_FLAG), - valid, - })) + chan.send( + TimeReport { + system_time: sysnow, + offset, + leaps: tod.leaps as isize, + leap_flag: tod.flags.contains(UCCMFlags::LEAP_FLAG), + valid, + } + .into(), + ) .expect("Unable to send to channel"); if sysnow - last_sent_report >= Duration::from_std(self.config.status_interval).unwrap() { if let Some(loop_diag) = &last_loop_diag { - if let Err(e) = - chan.send(ChimemonMessage::SourceReport(SourceReport { + if let Err(e) = chan.send( + SourceReport { name: "uccm".to_owned(), status: SourceStatus::Unknown, details: loop_diag.clone(), - })) - { + } + .into(), + ) { error!(error = ?e, "Unable to send message to channel"); } } if let Some(gps_sats) = &last_gps_sats { - if let Err(e) = - chan.send(ChimemonMessage::SourceReport(SourceReport { + if let Err(e) = chan.send( + SourceReport { name: "uccm".to_owned(), status: SourceStatus::Unknown, details: gps_sats.clone(), - })) - { + } + .into(), + ) { error!(error = ?e, "Unable to send message to channel"); } } diff --git a/src/targets/chrony_refclock.rs b/src/targets/chrony_refclock.rs index 638ccca..f8a695d 100644 --- a/src/targets/chrony_refclock.rs +++ b/src/targets/chrony_refclock.rs @@ -6,13 +6,14 @@ use async_trait::async_trait; use libc::{c_double, c_int, timeval}; use tokio::select; use tokio_util::sync::CancellationToken; -use tracing::{debug, warn}; +use tracing::{debug, info, warn}; -use crate::{ChimemonMessage, ChimemonTarget, ChimemonTargetChannel, ChronySockConfig}; +use crate::{ChimemonMessage, ChimemonTarget, ChimemonTargetChannel, config::ChronySockConfig}; const CHRONY_MAGIC: c_int = 0x534f434b; pub struct ChronySockServer { + name: String, sock_path: PathBuf, } @@ -27,17 +28,19 @@ pub struct ChronyTimeReport { magic: c_int, } -impl ChronySockServer { - pub fn new(config: ChronySockConfig) -> Self { - ChronySockServer { - sock_path: config.sock.into(), - } - } -} +impl ChronySockServer {} #[async_trait] impl ChimemonTarget for ChronySockServer { + type Config = ChronySockConfig; + fn new(name: &str, config: ChronySockConfig) -> Self { + ChronySockServer { + name: name.to_owned(), + sock_path: config.sock.into(), + } + } async fn run(mut self, mut chan: ChimemonTargetChannel, cancel: CancellationToken) { + info!("Chrony refclock task started"); loop { select! { _ = cancel.cancelled() => { return } diff --git a/src/targets/mod.rs b/src/targets/mod.rs index e171b67..e3bfe56 100644 --- a/src/targets/mod.rs +++ b/src/targets/mod.rs @@ -1 +1,2 @@ pub mod chrony_refclock; +// pub mod influx;