commit 1809701a82dc5351d10dc8e9c39a6ff688e5552a Author: Keenan Tims Date: Wed Oct 19 08:38:51 2022 -0700 Initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ede170d --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +/target +Cargo.lock +config.toml \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..c0116ae --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "chimemon" +version = "0.1.0" +edition = "2021" + +[dependencies] +serde = "1.0" +serde_derive = "1.0" +influxdb2 = "0.3" +tokio = { version = "1", features = ["rt"] } +clap = { version = "4.0", features = ["derive"] } +log = "0.4" +figment = { version = "0.10", features = ["toml"] } +gethostname = "0.3" +env_logger = "0.9.1" +futures = "0.3.24" + +[dependencies.chrony-candm] +git = "https://github.com/aws/chrony-candm" +branch = "main" diff --git a/config.toml.dist b/config.toml.dist new file mode 100644 index 0000000..4f2cb54 --- /dev/null +++ b/config.toml.dist @@ -0,0 +1,16 @@ +[sources.chrony] +enabled = true +poll_interval = 10 +timeout = 1 +measurement_prefix = "chrony." +tracking_measurement = "tracking" +sources_measurement = "sources" + +[influxdb] +# url = "http://localhost:8086" # defaults to localhost +org = "default" +bucket = "default" +token = "" + +[influxdb.tags] +# host = "localhost" # defaults to gethostname() diff --git a/src/chrony.rs b/src/chrony.rs new file mode 100644 index 0000000..9c9a40a --- /dev/null +++ b/src/chrony.rs @@ -0,0 +1,84 @@ +use chrony_candm::reply::{self, ReplyBody}; +use chrony_candm::request::{self, RequestBody}; +use std::net::SocketAddr; +use tokio::time::timeout; + +pub struct ChronyClient { + pub server: SocketAddr, + client: chrony_candm::Client, + timeout: std::time::Duration, +} + +impl ChronyClient { + pub fn new(client: chrony_candm::Client, server: SocketAddr, timeout: std::time::Duration) -> Self { + Self { + server, + client, + timeout, + } + } + pub async fn get_tracking(&self) -> Result { + let reply = timeout( + self.timeout, + self.client.query(RequestBody::Tracking, self.server), + ) + .await??; + + match reply.body { + ReplyBody::Tracking(tracking) => Ok(tracking), + _ => Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "Unexpected response type", + )), + } + } + + pub async fn get_sources(&self) -> Result, std::io::Error> { + let reply = timeout( + self.timeout, + self.client.query(RequestBody::NSources, self.server), + ) + .await??; + + let nsources = match reply.body { + ReplyBody::NSources(ns) => Ok(i32::try_from(ns.n_sources).unwrap()), + _ => Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "Unexpected response type", + )), + }?; + + let mut res = Vec::with_capacity( + nsources + .try_into() + .expect("Ridiculously unconvertible number of sources"), + ); + + for x in 0..nsources { + res.push(self.get_source(x).await?); + } + + Ok(res) + } + + async fn get_source(&self, index: i32) -> Result { + let reply = timeout( + self.timeout, + self.client.query( + RequestBody::SourceData(request::SourceData { index: index }), + self.server, + ), + ) + .await??; + + let sourcedata = match reply.body { + ReplyBody::SourceData(sourcedata) => Ok(sourcedata), + _ => Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "Invalid response", + )), + }?; + + Ok(sourcedata) + } +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..c168075 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,143 @@ +use chrony_candm::reply; +use figment::{ + providers::{Format, Serialized, Toml}, + util::map, + value::Map, + Figment, +}; +use gethostname::gethostname; +use influxdb2::models::DataPoint; +use serde_derive::{Deserialize, Serialize}; +use std::time::{SystemTime, UNIX_EPOCH}; +use std::{ + net::{IpAddr, Ipv4Addr}, + path::Path, +}; + +#[derive(Serialize, Deserialize, Clone)] +pub struct InfluxConfig { + pub url: String, + pub org: String, + pub bucket: String, + pub token: String, + pub tags: Map, +} + +impl Default for InfluxConfig { + fn default() -> Self { + let host = gethostname().into_string().unwrap(); + InfluxConfig { + url: "http://localhost:8086".into(), + org: "default".into(), + bucket: "default".into(), + token: "".into(), + tags: map! { "host".into() => host }, + } + } +} + +#[derive(Serialize, Deserialize, Clone)] +pub struct ChronyConfig { + pub enabled: bool, + pub timeout: u64, + pub poll_interval: u64, + pub measurement_prefix: String, + pub tracking_measurement: String, + pub sources_measurement: String, + pub host: IpAddr, + pub port: u16, +} + +impl Default for ChronyConfig { + fn default() -> Self { + ChronyConfig { + enabled: false, + timeout: 5, + poll_interval: 60, + measurement_prefix: "chrony.".into(), + tracking_measurement: "tracking".into(), + sources_measurement: "sources".into(), + host: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), + port: 323, + } + } +} + +#[derive(Serialize, Deserialize, Clone, Default)] +pub struct SourcesConfig { + pub chrony: ChronyConfig, +} + +#[derive(Serialize, Deserialize, Clone, Default)] +pub struct Config { + pub influxdb: InfluxConfig, + pub sources: SourcesConfig, +} + +pub fn load_config(filename: &Path) -> Result> { + let config = Figment::from(Serialized::defaults(Config::default())) + .merge(Toml::file(filename)) + .extract()?; + Ok(config) +} + +pub fn datapoint_from_tracking( + t: &reply::Tracking, + config: &Config, +) -> Result> { + let now = SystemTime::now().duration_since(UNIX_EPOCH)?; + let measurement = config.sources.chrony.measurement_prefix.to_owned() + + &config.sources.chrony.tracking_measurement; + let mut builder = + DataPoint::builder(&measurement).timestamp(now.as_nanos().try_into().unwrap()); + for (key, value) in &config.influxdb.tags { + builder = builder.tag(key, value); + } + + let point = builder + .field("ref_id", t.ref_id as i64) + .field("ref_ip_addr", t.ip_addr.to_string()) + .field("stratum", t.stratum as i64) + .field("leap_status", t.leap_status as i64) + .field("current_correction", f64::from(t.current_correction)) + .field("last_offset", f64::from(t.last_offset)) + .field("rms_offset", f64::from(t.rms_offset)) + .field("freq_ppm", f64::from(t.freq_ppm)) + .field("resid_freq_ppm", f64::from(t.resid_freq_ppm)) + .field("skew_ppm", f64::from(t.skew_ppm)) + .field("root_delay", f64::from(t.root_delay)) + .field("root_dispersion", f64::from(t.root_dispersion)) + .field("last_update_interval", f64::from(t.last_update_interval)) + .build()?; + + Ok(point) +} + +pub fn datapoint_from_sourcedata( + d: &reply::SourceData, + config: &Config, +) -> Result> { + let now = SystemTime::now().duration_since(UNIX_EPOCH)?; + let measurement = config.sources.chrony.measurement_prefix.to_owned() + + &config.sources.chrony.sources_measurement; + let mut builder = + DataPoint::builder(&measurement).timestamp(now.as_nanos().try_into().unwrap()); + for (key, value) in &config.influxdb.tags { + builder = builder.tag(key, value) + } + builder = builder + .tag("ip", d.ip_addr.to_string()) + .field("poll", d.poll as i64) + .field("stratum", d.stratum as i64) + .field("state", d.state as i64) + .field("mode", d.mode as i64) + .field("flags", d.flags.bits() as i64) + .field("reachability", d.reachability as i64) + .field("since_sample", d.since_sample as i64) + .field("orig_latest_meas", f64::from(d.orig_latest_meas)) + .field("latest_meas", f64::from(d.latest_meas)) + .field("latest_meas_err", f64::from(d.latest_meas_err)); + let point = builder.build()?; + + Ok(point) +} diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..2f0e485 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,118 @@ +mod chrony; + +use clap::Parser; +use env_logger; +use futures::{future::join_all, prelude::*}; +use log::{error, info, warn}; +use std::{path::Path, time::Duration}; +use std::net::SocketAddr; +use tokio::runtime::Handle; + +use crate::chrony::*; +use chimemon::*; + +const PROGRAM_NAME: &str = "chimemon"; +const VERSION: &str = "0.0.1"; + +const DEFAULT_RETRY_DELAY: Duration = Duration::new(60, 0); +const MAX_RETRY_DELAY: Duration = Duration::new(1800, 0); + +#[derive(Parser)] +struct Args { + /// TOML configuration to load + #[arg(short, long, default_value_t = String::from("config.toml"))] + config_file: String, +} + +async fn do_chrony_poll( + config: &Config, + chrony: &ChronyClient, + influx: &influxdb2::Client, +) -> Result<(), Box> { + let tracking = chrony.get_tracking().await?; + let sources = chrony.get_sources().await?; + + let mut frame = Vec::new(); + + let tracking_data = datapoint_from_tracking(&tracking, &config)?; + + info!(target: "chrony", "Writing tracking data: {:?}", tracking_data); + + frame.push(tracking_data); + for ds in sources { + let source_data = datapoint_from_sourcedata(&ds, &config)?; + info!(target: "chrony", "Writing source data: {:?}", source_data); + frame.push(source_data); + } + + influx + .write(&config.influxdb.bucket, stream::iter(frame)) + .await?; + Ok(()) +} + +#[tokio::main(flavor = "current_thread")] +async fn main() -> Result<(), Box> { + env_logger::init(); + let args = Args::parse(); + info!("{} v{} starting...", PROGRAM_NAME, VERSION); + let config = load_config(Path::new(&args.config_file))?; + let mut tasks = Vec::new(); + + info!( + "Connecting to influxdb {} org: {} using token", + &config.influxdb.url, &config.influxdb.org + ); + let influx = influxdb2::Client::new( + &config.influxdb.url, + &config.influxdb.org, + &config.influxdb.token, + ); + + if config.sources.chrony.enabled { + let chrony = ChronyClient::new( + chrony_candm::Client::spawn(&Handle::current(), Default::default()), + SocketAddr::new( + config.sources.chrony.host, + config.sources.chrony.port, + ), + Duration::from_secs(config.sources.chrony.timeout), + ); + + let mut retry_delay = DEFAULT_RETRY_DELAY; + let mut fail_count = 0; + + let interval = Duration::new(config.sources.chrony.poll_interval, 0); + let mut interval = tokio::time::interval(interval); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + tasks.push(tokio::spawn(async move { + info!(target: "chrony", "Chrony task started"); + loop { + interval.tick().await; + match do_chrony_poll(&config, &chrony, &influx.to_owned()).await { + Ok(_) => { + fail_count = 0; + retry_delay = DEFAULT_RETRY_DELAY; + } + Err(e) => { + warn!(target: "chrony", "Error in chrony task: {}", e.to_string()); + if fail_count % 4 == 0 { + // exponential backoff, every 4 failures double the retry timer, cap at max + retry_delay = std::cmp::min(MAX_RETRY_DELAY, retry_delay * 2); + } + fail_count += 1; + } + } + if fail_count > 0 { + tokio::time::sleep(retry_delay).await; + interval.reset(); + }; + } + })); + } + + join_all(tasks).await; + + Ok(()) +}