use async_trait::async_trait; use chimemon::{ChimemonSource, ChimemonSourceChannel, Config}; use chrony_candm::reply::{self, ReplyBody, SourceMode}; use chrony_candm::request::{self, RequestBody}; use chrony_candm::{blocking_query, ClientOptions}; use influxdb2::models::DataPoint; use log::{info, warn}; use std::net::{SocketAddr, ToSocketAddrs}; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use tokio::join; pub struct ChronyClient { pub server: SocketAddr, client_options: ClientOptions, config: Config, } fn datapoint_from_tracking( t: &reply::Tracking, config: &Config, ) -> Result> { let now = SystemTime::now().duration_since(UNIX_EPOCH)?; let measurement = config.sources.chrony.measurement_prefix.to_owned() + &config.sources.chrony.tracking_measurement; let mut builder = DataPoint::builder(&measurement).timestamp(now.as_nanos().try_into().unwrap()); for (key, value) in &config.influxdb.tags { builder = builder.tag(key, value); } let point = builder .field("ref_id", t.ref_id as i64) .field("ref_ip_addr", t.ip_addr.to_string()) .field("stratum", t.stratum as i64) .field("leap_status", t.leap_status as i64) .field("current_correction", f64::from(t.current_correction)) .field("last_offset", f64::from(t.last_offset)) .field("rms_offset", f64::from(t.rms_offset)) .field("freq_ppm", f64::from(t.freq_ppm)) .field("resid_freq_ppm", f64::from(t.resid_freq_ppm)) .field("skew_ppm", f64::from(t.skew_ppm)) .field("root_delay", f64::from(t.root_delay)) .field("root_dispersion", f64::from(t.root_dispersion)) .field("last_update_interval", f64::from(t.last_update_interval)) .build()?; Ok(point) } pub fn datapoint_from_sourcedata( d: &reply::SourceData, config: &Config, ) -> Result> { let now = SystemTime::now().duration_since(UNIX_EPOCH)?; let measurement = config.sources.chrony.measurement_prefix.to_owned() + &config.sources.chrony.sources_measurement; let mut builder = DataPoint::builder(&measurement).timestamp(now.as_nanos().try_into().unwrap()); for (key, value) in &config.influxdb.tags { builder = builder.tag(key, value) } builder = builder .tag("ref_id", d.ip_addr.to_string()) .tag( "mode", match d.mode { SourceMode::Client => String::from("server"), SourceMode::Peer => String::from("peer"), SourceMode::Ref => String::from("refclock"), }, ) .tag( "state", match d.state { reply::SourceState::Selected => String::from("best"), reply::SourceState::NonSelectable => String::from("unusable"), reply::SourceState::Falseticker => String::from("falseticker"), reply::SourceState::Jittery => String::from("jittery"), reply::SourceState::Unselected => String::from("combined"), reply::SourceState::Selectable => String::from("unused"), }, ) .field("poll", d.poll as i64) .field("stratum", d.stratum as i64) .field("flags", d.flags.bits() as i64) .field("reachability", d.reachability.count_ones() as i64) .field("since_sample", d.since_sample as i64) .field("orig_latest_meas", f64::from(d.orig_latest_meas)) .field("latest_meas", f64::from(d.latest_meas)) .field("latest_meas_err", f64::from(d.latest_meas_err)); let point = builder.build()?; Ok(point) } impl ChronyClient { pub fn new(config: Config) -> Self { let server = config .sources .chrony .host .to_socket_addrs() .unwrap() .next() .expect("Unable to parse host:port:"); let client_options = ClientOptions { n_tries: 3, timeout: Duration::from_secs(config.sources.chrony.timeout), }; ChronyClient { server, client_options, config, } } async fn query(&self, request: RequestBody) -> Result { let server = self.server.clone(); let client_options = self.client_options.clone(); tokio::task::spawn_blocking(move || blocking_query(request, client_options, &server)) .await .map_err(|e| { std::io::Error::new( std::io::ErrorKind::Other, format!("Error joining thread: {}", e), ) })? } pub async fn get_tracking(&self) -> Result { let reply = self.query(RequestBody::Tracking).await?; match reply.body { ReplyBody::Tracking(tracking) => Ok(tracking), _ => Err(std::io::Error::new( std::io::ErrorKind::InvalidData, "Unexpected response type", )), } } pub async fn get_sources(&self) -> Result, std::io::Error> { let reply = self.query(RequestBody::NSources).await?; let nsources = match reply.body { ReplyBody::NSources(ns) => Ok(i32::try_from(ns.n_sources).unwrap()), _ => Err(std::io::Error::new( std::io::ErrorKind::InvalidData, "Unexpected response type", )), }?; let mut res = Vec::with_capacity( nsources .try_into() .expect("Ridiculously unconvertible number of sources"), ); for x in 0..nsources { res.push(self.get_source(x).await?); } Ok(res) } async fn get_source(&self, index: i32) -> Result { let reply = self .query(RequestBody::SourceData(request::SourceData { index })) .await?; let sourcedata = match reply.body { ReplyBody::SourceData(sourcedata) => Ok(sourcedata), _ => Err(std::io::Error::new( std::io::ErrorKind::InvalidData, "Invalid response", )), }?; // if sourcedata.mode == SourceMode::Ref { // // Get the name if it's a refclock // let reply = timeout( // self.timeout, // self.client.query( // RequestBody::NtpSourceName(request::NtpSourceName { ip_addr: sourcedata.ip_addr }), // self.server, // ), // ) // .await??; // let sourcename = match reply.body { // ReplyBody::NtpSourceName(sourcename) => Ok(sourcename), // _ => Err(std::io::Error::new( // std::io::ErrorKind::InvalidData, // "Invalid response", // )), // }?; // sourcedata.ip_addr = sourcename; // } Ok(sourcedata) } async fn tracking_poll( &self, chan: &ChimemonSourceChannel, ) -> Result<(), Box> { let tracking = self.get_tracking().await?; let tracking_data = datapoint_from_tracking(&tracking, &self.config)?; info!("Sending tracking data"); chan.send(tracking_data.into()) .expect("Unable to send tracking data to targets"); Ok(()) } async fn sources_poll( &self, chan: &ChimemonSourceChannel, ) -> Result<(), Box> { let sources = self.get_sources().await?; let mut dps = Vec::with_capacity(sources.len()); for ds in sources { let source_data = datapoint_from_sourcedata(&ds, &self.config)?; dps.push(source_data); } info!("Sending source data"); chan.send(dps.into()) .expect("Unable to send source data to targets"); Ok(()) } } #[async_trait] impl ChimemonSource for ChronyClient { async fn run(self, chan: ChimemonSourceChannel) { info!("Chrony task started"); let mut t_interval = tokio::time::interval(Duration::from_secs( self.config.sources.chrony.tracking_interval, )); let mut s_interval = tokio::time::interval(Duration::from_secs( self.config.sources.chrony.sources_interval, )); let t_future = async { let lchan = chan.clone(); loop { t_interval.tick().await; match self.tracking_poll(&lchan).await { Ok(_) => (), Err(e) => { warn!("Error in chrony task: {}", e.to_string()); } } } }; let s_future = async { let lchan = chan.clone(); loop { s_interval.tick().await; match self.sources_poll(&lchan).await { Ok(_) => (), Err(e) => { warn!("Error in chrony task: {}", e.to_string()); } } } }; join!(t_future, s_future); } }