use crate::{ ChimemonSource, ChimemonSourceChannel, Config, SourceMetric, SourceReport, SourceReportDetails, SourceStatus, }; use async_trait::async_trait; use chrony_candm::reply::{self, ReplyBody, SourceMode, SourceState}; use chrony_candm::request::{self, RequestBody}; use chrony_candm::{ClientOptions, blocking_query}; use influxdb2::models::DataPoint; use std::net::{SocketAddr, ToSocketAddrs}; use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use tokio::join; use tracing::{info, warn}; pub struct ChronyClient { pub server: SocketAddr, client_options: ClientOptions, config: Config, } #[derive(Debug)] pub struct ChronyTrackingReport { tags: Arc>, pub ref_id: i64, pub ref_ip_addr: String, pub stratum: i64, pub leap_status: i64, pub current_correction: f64, pub last_offset: f64, pub rms_offset: f64, pub freq_ppm: f64, pub resid_freq_ppm: f64, pub skew_ppm: f64, pub root_delay: f64, pub root_dispersion: f64, pub last_update_interval: f64, } impl SourceReportDetails for ChronyTrackingReport { fn is_healthy(&self) -> bool { true } fn to_metrics(&self) -> Vec { let tags = &self.tags; vec![ SourceMetric::new_int("ref_id", self.ref_id, tags.clone()), SourceMetric::new_int("stratum", self.stratum, tags.clone()), SourceMetric::new_int("leap_status", self.leap_status, tags.clone()), SourceMetric::new_float("current_correction", self.current_correction, tags.clone()), SourceMetric::new_float("last_offset", self.last_offset, tags.clone()), SourceMetric::new_float("rms_offset", self.rms_offset, tags.clone()), SourceMetric::new_float("freq_ppm", self.freq_ppm, tags.clone()), SourceMetric::new_float("resid_freq_ppm", self.resid_freq_ppm, tags.clone()), SourceMetric::new_float("skew_ppm", self.skew_ppm, tags.clone()), SourceMetric::new_float("root_delay", self.root_delay, tags.clone()), SourceMetric::new_float("root_dispersion", self.root_dispersion, tags.clone()), SourceMetric::new_float( "last_update_interval", self.last_update_interval, tags.clone(), ), ] } } #[derive(Debug)] pub struct ChronySourcesReport { pub sources: Vec, } impl SourceReportDetails for ChronySourcesReport { fn is_healthy(&self) -> bool { //TODO: think about whether there is an idea of unhealthy sources true } fn to_metrics(&self) -> Vec { let mut metrics = Vec::with_capacity(8 * self.sources.len()); for source in &self.sources { let tags = Arc::new(vec![ ("ref_id".to_owned(), source.ip_addr.to_string()), ( "mode".to_owned(), match source.mode { SourceMode::Client => String::from("server"), SourceMode::Peer => String::from("peer"), SourceMode::Ref => String::from("refclock"), }, ), ( "state".to_owned(), match source.state { reply::SourceState::Selected => String::from("best"), reply::SourceState::NonSelectable => String::from("unusable"), reply::SourceState::Falseticker => String::from("falseticker"), reply::SourceState::Jittery => String::from("jittery"), reply::SourceState::Unselected => String::from("combined"), reply::SourceState::Selectable => String::from("unused"), }, ), ]); metrics.extend([ SourceMetric::new_int("poll", source.poll as i64, tags.clone()), SourceMetric::new_int("stratum", source.stratum as i64, tags.clone()), SourceMetric::new_int("flags", source.flags.bits() as i64, tags.clone()), SourceMetric::new_int( "reachability", source.reachability.count_ones() as i64, tags.clone(), ), SourceMetric::new_int("since_sample", source.since_sample as i64, tags.clone()), SourceMetric::new_float( "orig_latest_meas", source.orig_latest_meas.into(), tags.clone(), ), SourceMetric::new_float("latest_meas", source.latest_meas.into(), tags.clone()), SourceMetric::new_float( "latest_meas_err", source.latest_meas_err.into(), tags.clone(), ), ]); } metrics } } fn report_from_tracking( t: &reply::Tracking, config: &Config, ) -> Result> { let report = ChronyTrackingReport { tags: Arc::new(vec![]), //TODO: allow configuring tags in the source ref_id: t.ref_id as i64, ref_ip_addr: t.ip_addr.to_string(), stratum: t.stratum as i64, leap_status: t.leap_status as i64, current_correction: t.current_correction.into(), last_offset: t.last_offset.into(), rms_offset: t.rms_offset.into(), freq_ppm: t.freq_ppm.into(), resid_freq_ppm: t.resid_freq_ppm.into(), skew_ppm: t.skew_ppm.into(), root_delay: t.root_delay.into(), root_dispersion: t.root_dispersion.into(), last_update_interval: t.last_update_interval.into(), }; Ok(report) } impl ChronyClient { pub fn new(config: Config) -> Self { let server = config .sources .chrony .host .to_socket_addrs() .unwrap() .next() .expect("Unable to parse host:port:"); let client_options = ClientOptions { n_tries: 3, timeout: Duration::from_secs(config.sources.chrony.timeout), }; ChronyClient { server, client_options, config, } } async fn query(&self, request: RequestBody) -> Result { let server = self.server; let client_options = self.client_options; tokio::task::spawn_blocking(move || blocking_query(request, client_options, &server)) .await .map_err(|e| { std::io::Error::new( std::io::ErrorKind::Other, format!("Error joining thread: {}", e), ) })? } pub async fn get_tracking(&self) -> Result { let reply = self.query(RequestBody::Tracking).await?; match reply.body { ReplyBody::Tracking(tracking) => Ok(tracking), _ => Err(std::io::Error::new( std::io::ErrorKind::InvalidData, "Unexpected response type", )), } } pub async fn get_sources(&self) -> Result, std::io::Error> { let reply = self.query(RequestBody::NSources).await?; let nsources = match reply.body { ReplyBody::NSources(ns) => Ok(i32::try_from(ns.n_sources).unwrap()), _ => Err(std::io::Error::new( std::io::ErrorKind::InvalidData, "Unexpected response type", )), }?; let mut res = Vec::with_capacity( nsources .try_into() .expect("Ridiculously unconvertible number of sources"), ); for x in 0..nsources { res.push(self.get_source(x).await?); } Ok(res) } async fn get_source(&self, index: i32) -> Result { let reply = self .query(RequestBody::SourceData(request::SourceData { index })) .await?; let sourcedata = match reply.body { ReplyBody::SourceData(sourcedata) => Ok(sourcedata), _ => Err(std::io::Error::new( std::io::ErrorKind::InvalidData, "Invalid response", )), }?; // if sourcedata.mode == SourceMode::Ref { // // Get the name if it's a refclock // let reply = timeout( // self.timeout, // self.client.query( // RequestBody::NtpSourceName(request::NtpSourceName { ip_addr: sourcedata.ip_addr }), // self.server, // ), // ) // .await??; // let sourcename = match reply.body { // ReplyBody::NtpSourceName(sourcename) => Ok(sourcename), // _ => Err(std::io::Error::new( // std::io::ErrorKind::InvalidData, // "Invalid response", // )), // }?; // sourcedata.ip_addr = sourcename; // } Ok(sourcedata) } async fn tracking_poll( &self, chan: &ChimemonSourceChannel, ) -> Result<(), Box> { let tracking = self.get_tracking().await?; let tracking_data = report_from_tracking(&tracking, &self.config)?; let report = SourceReport { name: "chrony-tracking".to_owned(), status: SourceStatus::Unknown, details: Arc::new(tracking_data), }; info!("Sending tracking data"); chan.send(report.into())?; Ok(()) } async fn sources_poll( &self, chan: &ChimemonSourceChannel, ) -> Result<(), Box> { let sources = self.get_sources().await?; let details = ChronySourcesReport { sources }; let report = SourceReport { name: "chrony-sources".to_owned(), status: SourceStatus::Unknown, details: Arc::new(details), }; info!("Sending source data"); chan.send(report.into())?; Ok(()) } } #[async_trait] impl ChimemonSource for ChronyClient { async fn run(self, chan: ChimemonSourceChannel) { info!("Chrony task started"); let mut t_interval = tokio::time::interval(Duration::from_secs( self.config.sources.chrony.tracking_interval, )); let mut s_interval = tokio::time::interval(Duration::from_secs( self.config.sources.chrony.sources_interval, )); let t_future = async { let lchan = chan.clone(); loop { t_interval.tick().await; match self.tracking_poll(&lchan).await { Ok(_) => (), Err(e) => { warn!("Error in chrony task: {}", e.to_string()); } } } }; let s_future = async { let lchan = chan.clone(); loop { s_interval.tick().await; match self.sources_poll(&lchan).await { Ok(_) => (), Err(e) => { warn!("Error in chrony task: {}", e.to_string()); } } } }; join!(t_future, s_future); } }