Initial commit

This commit is contained in:
Keenan Tims 2022-10-19 08:38:51 -07:00
commit 1809701a82
Signed by: ktims
GPG Key ID: 11230674D69038D4
6 changed files with 384 additions and 0 deletions

3
.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
/target
Cargo.lock
config.toml

20
Cargo.toml Normal file
View File

@ -0,0 +1,20 @@
[package]
name = "chimemon"
version = "0.1.0"
edition = "2021"
[dependencies]
serde = "1.0"
serde_derive = "1.0"
influxdb2 = "0.3"
tokio = { version = "1", features = ["rt"] }
clap = { version = "4.0", features = ["derive"] }
log = "0.4"
figment = { version = "0.10", features = ["toml"] }
gethostname = "0.3"
env_logger = "0.9.1"
futures = "0.3.24"
[dependencies.chrony-candm]
git = "https://github.com/aws/chrony-candm"
branch = "main"

16
config.toml.dist Normal file
View File

@ -0,0 +1,16 @@
[sources.chrony]
enabled = true
poll_interval = 10
timeout = 1
measurement_prefix = "chrony."
tracking_measurement = "tracking"
sources_measurement = "sources"
[influxdb]
# url = "http://localhost:8086" # defaults to localhost
org = "default"
bucket = "default"
token = ""
[influxdb.tags]
# host = "localhost" # defaults to gethostname()

84
src/chrony.rs Normal file
View File

@ -0,0 +1,84 @@
use chrony_candm::reply::{self, ReplyBody};
use chrony_candm::request::{self, RequestBody};
use std::net::SocketAddr;
use tokio::time::timeout;
pub struct ChronyClient {
pub server: SocketAddr,
client: chrony_candm::Client,
timeout: std::time::Duration,
}
impl ChronyClient {
pub fn new(client: chrony_candm::Client, server: SocketAddr, timeout: std::time::Duration) -> Self {
Self {
server,
client,
timeout,
}
}
pub async fn get_tracking(&self) -> Result<reply::Tracking, std::io::Error> {
let reply = timeout(
self.timeout,
self.client.query(RequestBody::Tracking, self.server),
)
.await??;
match reply.body {
ReplyBody::Tracking(tracking) => Ok(tracking),
_ => Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"Unexpected response type",
)),
}
}
pub async fn get_sources(&self) -> Result<Vec<reply::SourceData>, std::io::Error> {
let reply = timeout(
self.timeout,
self.client.query(RequestBody::NSources, self.server),
)
.await??;
let nsources = match reply.body {
ReplyBody::NSources(ns) => Ok(i32::try_from(ns.n_sources).unwrap()),
_ => Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"Unexpected response type",
)),
}?;
let mut res = Vec::with_capacity(
nsources
.try_into()
.expect("Ridiculously unconvertible number of sources"),
);
for x in 0..nsources {
res.push(self.get_source(x).await?);
}
Ok(res)
}
async fn get_source(&self, index: i32) -> Result<reply::SourceData, std::io::Error> {
let reply = timeout(
self.timeout,
self.client.query(
RequestBody::SourceData(request::SourceData { index: index }),
self.server,
),
)
.await??;
let sourcedata = match reply.body {
ReplyBody::SourceData(sourcedata) => Ok(sourcedata),
_ => Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"Invalid response",
)),
}?;
Ok(sourcedata)
}
}

143
src/lib.rs Normal file
View File

@ -0,0 +1,143 @@
use chrony_candm::reply;
use figment::{
providers::{Format, Serialized, Toml},
util::map,
value::Map,
Figment,
};
use gethostname::gethostname;
use influxdb2::models::DataPoint;
use serde_derive::{Deserialize, Serialize};
use std::time::{SystemTime, UNIX_EPOCH};
use std::{
net::{IpAddr, Ipv4Addr},
path::Path,
};
#[derive(Serialize, Deserialize, Clone)]
pub struct InfluxConfig {
pub url: String,
pub org: String,
pub bucket: String,
pub token: String,
pub tags: Map<String, String>,
}
impl Default for InfluxConfig {
fn default() -> Self {
let host = gethostname().into_string().unwrap();
InfluxConfig {
url: "http://localhost:8086".into(),
org: "default".into(),
bucket: "default".into(),
token: "".into(),
tags: map! { "host".into() => host },
}
}
}
#[derive(Serialize, Deserialize, Clone)]
pub struct ChronyConfig {
pub enabled: bool,
pub timeout: u64,
pub poll_interval: u64,
pub measurement_prefix: String,
pub tracking_measurement: String,
pub sources_measurement: String,
pub host: IpAddr,
pub port: u16,
}
impl Default for ChronyConfig {
fn default() -> Self {
ChronyConfig {
enabled: false,
timeout: 5,
poll_interval: 60,
measurement_prefix: "chrony.".into(),
tracking_measurement: "tracking".into(),
sources_measurement: "sources".into(),
host: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
port: 323,
}
}
}
#[derive(Serialize, Deserialize, Clone, Default)]
pub struct SourcesConfig {
pub chrony: ChronyConfig,
}
#[derive(Serialize, Deserialize, Clone, Default)]
pub struct Config {
pub influxdb: InfluxConfig,
pub sources: SourcesConfig,
}
pub fn load_config(filename: &Path) -> Result<Config, Box<dyn std::error::Error>> {
let config = Figment::from(Serialized::defaults(Config::default()))
.merge(Toml::file(filename))
.extract()?;
Ok(config)
}
pub fn datapoint_from_tracking(
t: &reply::Tracking,
config: &Config,
) -> Result<DataPoint, Box<dyn std::error::Error>> {
let now = SystemTime::now().duration_since(UNIX_EPOCH)?;
let measurement = config.sources.chrony.measurement_prefix.to_owned()
+ &config.sources.chrony.tracking_measurement;
let mut builder =
DataPoint::builder(&measurement).timestamp(now.as_nanos().try_into().unwrap());
for (key, value) in &config.influxdb.tags {
builder = builder.tag(key, value);
}
let point = builder
.field("ref_id", t.ref_id as i64)
.field("ref_ip_addr", t.ip_addr.to_string())
.field("stratum", t.stratum as i64)
.field("leap_status", t.leap_status as i64)
.field("current_correction", f64::from(t.current_correction))
.field("last_offset", f64::from(t.last_offset))
.field("rms_offset", f64::from(t.rms_offset))
.field("freq_ppm", f64::from(t.freq_ppm))
.field("resid_freq_ppm", f64::from(t.resid_freq_ppm))
.field("skew_ppm", f64::from(t.skew_ppm))
.field("root_delay", f64::from(t.root_delay))
.field("root_dispersion", f64::from(t.root_dispersion))
.field("last_update_interval", f64::from(t.last_update_interval))
.build()?;
Ok(point)
}
pub fn datapoint_from_sourcedata(
d: &reply::SourceData,
config: &Config,
) -> Result<DataPoint, Box<dyn std::error::Error>> {
let now = SystemTime::now().duration_since(UNIX_EPOCH)?;
let measurement = config.sources.chrony.measurement_prefix.to_owned()
+ &config.sources.chrony.sources_measurement;
let mut builder =
DataPoint::builder(&measurement).timestamp(now.as_nanos().try_into().unwrap());
for (key, value) in &config.influxdb.tags {
builder = builder.tag(key, value)
}
builder = builder
.tag("ip", d.ip_addr.to_string())
.field("poll", d.poll as i64)
.field("stratum", d.stratum as i64)
.field("state", d.state as i64)
.field("mode", d.mode as i64)
.field("flags", d.flags.bits() as i64)
.field("reachability", d.reachability as i64)
.field("since_sample", d.since_sample as i64)
.field("orig_latest_meas", f64::from(d.orig_latest_meas))
.field("latest_meas", f64::from(d.latest_meas))
.field("latest_meas_err", f64::from(d.latest_meas_err));
let point = builder.build()?;
Ok(point)
}

118
src/main.rs Normal file
View File

@ -0,0 +1,118 @@
mod chrony;
use clap::Parser;
use env_logger;
use futures::{future::join_all, prelude::*};
use log::{error, info, warn};
use std::{path::Path, time::Duration};
use std::net::SocketAddr;
use tokio::runtime::Handle;
use crate::chrony::*;
use chimemon::*;
const PROGRAM_NAME: &str = "chimemon";
const VERSION: &str = "0.0.1";
const DEFAULT_RETRY_DELAY: Duration = Duration::new(60, 0);
const MAX_RETRY_DELAY: Duration = Duration::new(1800, 0);
#[derive(Parser)]
struct Args {
/// TOML configuration to load
#[arg(short, long, default_value_t = String::from("config.toml"))]
config_file: String,
}
async fn do_chrony_poll(
config: &Config,
chrony: &ChronyClient,
influx: &influxdb2::Client,
) -> Result<(), Box<dyn std::error::Error>> {
let tracking = chrony.get_tracking().await?;
let sources = chrony.get_sources().await?;
let mut frame = Vec::new();
let tracking_data = datapoint_from_tracking(&tracking, &config)?;
info!(target: "chrony", "Writing tracking data: {:?}", tracking_data);
frame.push(tracking_data);
for ds in sources {
let source_data = datapoint_from_sourcedata(&ds, &config)?;
info!(target: "chrony", "Writing source data: {:?}", source_data);
frame.push(source_data);
}
influx
.write(&config.influxdb.bucket, stream::iter(frame))
.await?;
Ok(())
}
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();
let args = Args::parse();
info!("{} v{} starting...", PROGRAM_NAME, VERSION);
let config = load_config(Path::new(&args.config_file))?;
let mut tasks = Vec::new();
info!(
"Connecting to influxdb {} org: {} using token",
&config.influxdb.url, &config.influxdb.org
);
let influx = influxdb2::Client::new(
&config.influxdb.url,
&config.influxdb.org,
&config.influxdb.token,
);
if config.sources.chrony.enabled {
let chrony = ChronyClient::new(
chrony_candm::Client::spawn(&Handle::current(), Default::default()),
SocketAddr::new(
config.sources.chrony.host,
config.sources.chrony.port,
),
Duration::from_secs(config.sources.chrony.timeout),
);
let mut retry_delay = DEFAULT_RETRY_DELAY;
let mut fail_count = 0;
let interval = Duration::new(config.sources.chrony.poll_interval, 0);
let mut interval = tokio::time::interval(interval);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
tasks.push(tokio::spawn(async move {
info!(target: "chrony", "Chrony task started");
loop {
interval.tick().await;
match do_chrony_poll(&config, &chrony, &influx.to_owned()).await {
Ok(_) => {
fail_count = 0;
retry_delay = DEFAULT_RETRY_DELAY;
}
Err(e) => {
warn!(target: "chrony", "Error in chrony task: {}", e.to_string());
if fail_count % 4 == 0 {
// exponential backoff, every 4 failures double the retry timer, cap at max
retry_delay = std::cmp::min(MAX_RETRY_DELAY, retry_delay * 2);
}
fail_count += 1;
}
}
if fail_count > 0 {
tokio::time::sleep(retry_delay).await;
interval.reset();
};
}
}));
}
join_all(tasks).await;
Ok(())
}