Initial generalization of source workers

This commit is contained in:
Keenan Tims 2022-11-01 01:36:14 -07:00
parent 82906adca2
commit ed3cb89325
Signed by: ktims
GPG Key ID: 11230674D69038D4
7 changed files with 452 additions and 164 deletions

43
Cargo.lock generated
View File

@ -120,6 +120,12 @@ version = "3.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "572f695136211188308f16ad2ca5c851a712c464060ae6974944458eb83880ba"
[[package]]
name = "byteorder"
version = "1.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
[[package]]
name = "bytes"
version = "1.2.1"
@ -142,6 +148,7 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
name = "chimemon"
version = "0.1.0"
dependencies = [
"async-trait",
"chrony-candm",
"clap",
"env_logger",
@ -152,7 +159,9 @@ dependencies = [
"log",
"serde",
"serde_derive",
"sysctl",
"tokio",
"tokio-stream",
]
[[package]]
@ -1331,6 +1340,15 @@ version = "1.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4501abdff3ae82a1c1b477a17252eb69cee9e66eb915c1abaa4f44d873df9f09"
[[package]]
name = "same-file"
version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502"
dependencies = [
"winapi-util",
]
[[package]]
name = "schannel"
version = "0.1.20"
@ -1514,6 +1532,19 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "sysctl"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f99d037b2bef227ab8963f4b0acc33ecbb1f9a2e7365add7789372b387ec19e1"
dependencies = [
"bitflags",
"byteorder",
"libc",
"thiserror",
"walkdir",
]
[[package]]
name = "tap"
version = "1.0.1"
@ -1645,6 +1676,7 @@ dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
"tokio-util",
]
[[package]]
@ -1812,6 +1844,17 @@ version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
[[package]]
name = "walkdir"
version = "2.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "808cf2735cd4b6866113f648b791c6adc5714537bc222d9347bb203386ffda56"
dependencies = [
"same-file",
"winapi",
"winapi-util",
]
[[package]]
name = "want"
version = "0.3.0"

View File

@ -6,7 +6,7 @@ edition = "2021"
[dependencies]
serde = "1.0"
serde_derive = "1.0"
influxdb2 = "0.3"
influxdb2 = "0.3.3"
tokio = { version = "1", features = ["rt"] }
clap = { version = "4.0", features = ["derive"] }
log = "0.4"
@ -14,6 +14,9 @@ figment = { version = "0.10", features = ["toml"] }
gethostname = "0.3"
env_logger = "0.9.1"
futures = "0.3.24"
sysctl = "0.5.2"
async-trait = "0.1.58"
tokio-stream = { version = "0.1.11", features = ["sync"] }
[dependencies.chrony-candm]
git = "https://github.com/aws/chrony-candm"

View File

@ -1,16 +1,29 @@
[sources.chrony]
enabled = true
poll_interval = 10
timeout = 1
measurement_prefix = "chrony."
tracking_measurement = "tracking"
sources_measurement = "sources"
[sources]
[sources.chrony]
enabled = true
# host = "127.0.0.1:323" # default; port is required
tracking_interval = 10
sources_interval = 60
timeout = 1
measurement_prefix = "chrony."
tracking_measurement = "tracking"
sources_measurement = "sources"
[sources.hwmon]
enabled = true
interval = 60
measurement_prefix = "hwmon."
[sources.hwmon.sensors.temp1]
name = "hwmon0"
sensor = "temp1_input"
[influxdb]
# url = "http://localhost:8086" # defaults to localhost
url = "http://localhost:8086"
org = "default"
bucket = "default"
token = ""
[influxdb.tags]
# host = "localhost" # defaults to gethostname()
# host = "qubit" # default is the local hostname
# arbitrary = "tags" # are allowed

View File

@ -1,18 +1,114 @@
use chrony_candm::reply::{self, ReplyBody};
use async_trait::async_trait;
use chimemon::{ChimemonSource, ChimemonSourceChannel, Config};
use chrony_candm::reply::{self, ReplyBody, SourceMode};
use chrony_candm::request::{self, RequestBody};
use std::net::SocketAddr;
use tokio::time::timeout;
use influxdb2::models::DataPoint;
use log::{info, warn};
use std::net::{SocketAddr, ToSocketAddrs};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::runtime::Handle;
use tokio::{join, time::timeout};
pub struct ChronyClient {
pub server: SocketAddr,
config: Config,
client: chrony_candm::Client,
timeout: std::time::Duration,
}
fn datapoint_from_tracking(
t: &reply::Tracking,
config: &Config,
) -> Result<DataPoint, Box<dyn std::error::Error>> {
let now = SystemTime::now().duration_since(UNIX_EPOCH)?;
let measurement = config.sources.chrony.measurement_prefix.to_owned()
+ &config.sources.chrony.tracking_measurement;
let mut builder =
DataPoint::builder(&measurement).timestamp(now.as_nanos().try_into().unwrap());
for (key, value) in &config.influxdb.tags {
builder = builder.tag(key, value);
}
let point = builder
.field("ref_id", t.ref_id as i64)
.field("ref_ip_addr", t.ip_addr.to_string())
.field("stratum", t.stratum as i64)
.field("leap_status", t.leap_status as i64)
.field("current_correction", f64::from(t.current_correction))
.field("last_offset", f64::from(t.last_offset))
.field("rms_offset", f64::from(t.rms_offset))
.field("freq_ppm", f64::from(t.freq_ppm))
.field("resid_freq_ppm", f64::from(t.resid_freq_ppm))
.field("skew_ppm", f64::from(t.skew_ppm))
.field("root_delay", f64::from(t.root_delay))
.field("root_dispersion", f64::from(t.root_dispersion))
.field("last_update_interval", f64::from(t.last_update_interval))
.build()?;
Ok(point)
}
pub fn datapoint_from_sourcedata(
d: &reply::SourceData,
config: &Config,
) -> Result<DataPoint, Box<dyn std::error::Error>> {
let now = SystemTime::now().duration_since(UNIX_EPOCH)?;
let measurement = config.sources.chrony.measurement_prefix.to_owned()
+ &config.sources.chrony.sources_measurement;
let mut builder =
DataPoint::builder(&measurement).timestamp(now.as_nanos().try_into().unwrap());
for (key, value) in &config.influxdb.tags {
builder = builder.tag(key, value)
}
builder = builder
.tag("ref_id", d.ip_addr.to_string())
.tag(
"mode",
match d.mode {
SourceMode::Client => String::from("server"),
SourceMode::Peer => String::from("peer"),
SourceMode::Ref => String::from("refclock"),
},
)
.tag(
"state",
match d.state {
reply::SourceState::Selected => String::from("best"),
reply::SourceState::NonSelectable => String::from("unusable"),
reply::SourceState::Falseticker => String::from("falseticker"),
reply::SourceState::Jittery => String::from("jittery"),
reply::SourceState::Unselected => String::from("unused"),
reply::SourceState::Selectable => String::from("combined"),
},
)
.field("poll", d.poll as i64)
.field("stratum", d.stratum as i64)
.field("flags", d.flags.bits() as i64)
.field("reachability", d.reachability.count_ones() as i64)
.field("since_sample", d.since_sample as i64)
.field("orig_latest_meas", f64::from(d.orig_latest_meas))
.field("latest_meas", f64::from(d.latest_meas))
.field("latest_meas_err", f64::from(d.latest_meas_err));
let point = builder.build()?;
Ok(point)
}
impl ChronyClient {
pub fn new(client: chrony_candm::Client, server: SocketAddr, timeout: std::time::Duration) -> Self {
Self {
pub fn new(config: Config) -> Self {
let server = config
.sources
.chrony
.host
.to_socket_addrs()
.unwrap()
.next()
.expect("Unable to parse host:port:");
let client = chrony_candm::Client::spawn(&Handle::current(), Default::default());
let timeout = Duration::from_secs(config.sources.chrony.timeout);
ChronyClient {
server,
config,
client,
timeout,
}
@ -79,6 +175,100 @@ impl ChronyClient {
)),
}?;
// if sourcedata.mode == SourceMode::Ref {
// // Get the name if it's a refclock
// let reply = timeout(
// self.timeout,
// self.client.query(
// RequestBody::NtpSourceName(request::NtpSourceName { ip_addr: sourcedata.ip_addr }),
// self.server,
// ),
// )
// .await??;
// let sourcename = match reply.body {
// ReplyBody::NtpSourceName(sourcename) => Ok(sourcename),
// _ => Err(std::io::Error::new(
// std::io::ErrorKind::InvalidData,
// "Invalid response",
// )),
// }?;
// sourcedata.ip_addr = sourcename;
// }
Ok(sourcedata)
}
async fn tracking_poll(
&self,
chan: &ChimemonSourceChannel,
) -> Result<(), Box<dyn std::error::Error>> {
let tracking = self.get_tracking().await?;
let tracking_data = datapoint_from_tracking(&tracking, &self.config)?;
info!("Writing tracking data: {:?}", tracking_data);
chan.send(tracking_data)
.expect("Unable to send tracking data to targets");
Ok(())
}
async fn sources_poll(
&self,
chan: &ChimemonSourceChannel,
) -> Result<(), Box<dyn std::error::Error>> {
let sources = self.get_sources().await?;
for ds in sources {
let source_data = datapoint_from_sourcedata(&ds, &self.config)?;
info!("Writing source data: {:?}", source_data);
chan.send(source_data)
.expect("Unable to send source data to targets");
}
Ok(())
}
}
#[async_trait]
impl ChimemonSource for ChronyClient {
async fn run(self, chan: tokio::sync::broadcast::Sender<DataPoint>) {
info!("Chrony task started");
let mut t_interval = tokio::time::interval(Duration::from_secs(
self.config.sources.chrony.tracking_interval,
));
let mut s_interval = tokio::time::interval(Duration::from_secs(
self.config.sources.chrony.sources_interval,
));
let t_future = async {
let lchan = chan.clone();
loop {
t_interval.tick().await;
match self.tracking_poll(&lchan).await {
Ok(_) => (),
Err(e) => {
warn!("Error in chrony task: {}", e.to_string());
}
}
}
};
let s_future = async {
let lchan = chan.clone();
loop {
s_interval.tick().await;
match self.sources_poll(&lchan).await {
Ok(_) => (),
Err(e) => {
warn!("Error in chrony task: {}", e.to_string());
}
}
}
};
join!(t_future, s_future);
}
}

75
src/hwmon.rs Normal file
View File

@ -0,0 +1,75 @@
use async_trait::async_trait;
use chimemon::{ChimemonSource, ChimemonSourceChannel, Config};
use futures::{stream, StreamExt};
use influxdb2::models::DataPoint;
use log::{info, debug};
use std::{
path::{Path, PathBuf},
time::{Duration, SystemTime, UNIX_EPOCH},
};
pub struct HwmonSource {
config: Config,
sensors: Vec<HwmonSensor>,
}
struct HwmonSensor {
path: PathBuf,
device: String,
sensor: String,
name: String,
}
const HWMON_ROOT: &str = "/sys/class/hwmon";
impl HwmonSource {
pub fn new(config: Config) -> Self {
let sensors = config
.sources
.hwmon
.sensors
.iter()
.map(|(k, v)| HwmonSensor {
name: k.into(),
device: v.name.clone(),
sensor: v.sensor.clone(),
path: PathBuf::from(HWMON_ROOT).join(&v.name).join(&v.sensor),
})
.collect();
HwmonSource { config, sensors }
}
async fn get_raw_value(sensor: &HwmonSensor) -> Result<String, std::io::Error> {
tokio::fs::read_to_string(&sensor.path).await
}
}
#[async_trait]
impl ChimemonSource for HwmonSource {
async fn run(self, chan: tokio::sync::broadcast::Sender<DataPoint>) {
info!("hwmon task started");
let mut interval =
tokio::time::interval(Duration::from_secs(self.config.sources.hwmon.interval));
loop {
interval.tick().await;
stream::iter(&self.sensors).for_each_concurrent(None, |s| async {
let sensor_val = HwmonSource::get_raw_value(s)
.await
.expect("Unable to read sensor");
debug!("hwmon {} raw value {}", s.path.to_string_lossy(), sensor_val);
let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
let mut builder =
DataPoint::builder(&s.device).timestamp(now.as_nanos().try_into().unwrap());
for (key, value) in &self.config.influxdb.tags {
builder = builder.tag(key, value)
}
builder = builder.field(&s.name, sensor_val.trim().parse::<f64>().unwrap());
let dp = builder.build().unwrap();
info!("Writing hwmon data: {:?}", dp);
chan.send(dp).unwrap();
}).await;
}
}
}

View File

@ -1,4 +1,4 @@
use chrony_candm::reply;
use async_trait::async_trait;
use figment::{
providers::{Format, Serialized, Toml},
util::map,
@ -8,11 +8,8 @@ use figment::{
use gethostname::gethostname;
use influxdb2::models::DataPoint;
use serde_derive::{Deserialize, Serialize};
use std::time::{SystemTime, UNIX_EPOCH};
use std::{
net::{IpAddr, Ipv4Addr},
path::Path,
};
use std::path::Path;
use tokio::sync::broadcast::*;
#[derive(Serialize, Deserialize, Clone)]
pub struct InfluxConfig {
@ -40,12 +37,12 @@ impl Default for InfluxConfig {
pub struct ChronyConfig {
pub enabled: bool,
pub timeout: u64,
pub poll_interval: u64,
pub tracking_interval: u64,
pub sources_interval: u64,
pub measurement_prefix: String,
pub tracking_measurement: String,
pub sources_measurement: String,
pub host: IpAddr,
pub port: u16,
pub host: String,
}
impl Default for ChronyConfig {
@ -53,12 +50,37 @@ impl Default for ChronyConfig {
ChronyConfig {
enabled: false,
timeout: 5,
poll_interval: 60,
tracking_interval: 60,
sources_interval: 300,
measurement_prefix: "chrony.".into(),
tracking_measurement: "tracking".into(),
sources_measurement: "sources".into(),
host: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
port: 323,
host: "127.0.0.1:323".into(),
}
}
}
#[derive(Serialize, Deserialize, Clone)]
pub struct HwmonSensorConfig {
pub name: String,
pub sensor: String,
}
#[derive(Serialize, Deserialize, Clone)]
pub struct HwmonConfig {
pub enabled: bool,
pub interval: u64,
pub measurement_prefix: String,
pub sensors: Map<String, HwmonSensorConfig>,
}
impl Default for HwmonConfig {
fn default() -> Self {
HwmonConfig {
enabled: false,
interval: 60,
measurement_prefix: "hwmon.".into(),
sensors: map! {},
}
}
}
@ -66,6 +88,7 @@ impl Default for ChronyConfig {
#[derive(Serialize, Deserialize, Clone, Default)]
pub struct SourcesConfig {
pub chrony: ChronyConfig,
pub hwmon: HwmonConfig,
}
#[derive(Serialize, Deserialize, Clone, Default)]
@ -74,70 +97,19 @@ pub struct Config {
pub sources: SourcesConfig,
}
pub fn load_config(filename: &Path) -> Result<Config, Box<dyn std::error::Error>> {
let config = Figment::from(Serialized::defaults(Config::default()))
.merge(Toml::file(filename))
.extract()?;
Ok(config)
pub fn load_config(filename: &Path) -> Figment {
Figment::from(Serialized::defaults(Config::default())).merge(Toml::file(filename))
}
pub fn datapoint_from_tracking(
t: &reply::Tracking,
config: &Config,
) -> Result<DataPoint, Box<dyn std::error::Error>> {
let now = SystemTime::now().duration_since(UNIX_EPOCH)?;
let measurement = config.sources.chrony.measurement_prefix.to_owned()
+ &config.sources.chrony.tracking_measurement;
let mut builder =
DataPoint::builder(&measurement).timestamp(now.as_nanos().try_into().unwrap());
for (key, value) in &config.influxdb.tags {
builder = builder.tag(key, value);
}
pub type ChimemonSourceChannel = Sender<DataPoint>;
pub type ChimemonTargetChannel = Receiver<DataPoint>;
let point = builder
.field("ref_id", t.ref_id as i64)
.field("ref_ip_addr", t.ip_addr.to_string())
.field("stratum", t.stratum as i64)
.field("leap_status", t.leap_status as i64)
.field("current_correction", f64::from(t.current_correction))
.field("last_offset", f64::from(t.last_offset))
.field("rms_offset", f64::from(t.rms_offset))
.field("freq_ppm", f64::from(t.freq_ppm))
.field("resid_freq_ppm", f64::from(t.resid_freq_ppm))
.field("skew_ppm", f64::from(t.skew_ppm))
.field("root_delay", f64::from(t.root_delay))
.field("root_dispersion", f64::from(t.root_dispersion))
.field("last_update_interval", f64::from(t.last_update_interval))
.build()?;
Ok(point)
#[async_trait]
pub trait ChimemonSource {
async fn run(self, chan: ChimemonSourceChannel);
}
pub fn datapoint_from_sourcedata(
d: &reply::SourceData,
config: &Config,
) -> Result<DataPoint, Box<dyn std::error::Error>> {
let now = SystemTime::now().duration_since(UNIX_EPOCH)?;
let measurement = config.sources.chrony.measurement_prefix.to_owned()
+ &config.sources.chrony.sources_measurement;
let mut builder =
DataPoint::builder(&measurement).timestamp(now.as_nanos().try_into().unwrap());
for (key, value) in &config.influxdb.tags {
builder = builder.tag(key, value)
}
builder = builder
.tag("ref_id", d.ip_addr.to_string())
.tag("mode", (d.mode as i64).to_string())
.tag("state", (d.state as i64).to_string())
.field("poll", d.poll as i64)
.field("stratum", d.stratum as i64)
.field("flags", d.flags.bits() as i64)
.field("reachability", d.reachability as i64)
.field("since_sample", d.since_sample as i64)
.field("orig_latest_meas", f64::from(d.orig_latest_meas))
.field("latest_meas", f64::from(d.latest_meas))
.field("latest_meas_err", f64::from(d.latest_meas_err));
let point = builder.build()?;
Ok(point)
#[async_trait]
pub trait ChimemonTarget {
async fn run(self, chan: ChimemonTargetChannel);
}

View File

@ -1,63 +1,55 @@
mod chrony;
mod hwmon;
use clap::Parser;
use env_logger;
use clap::{Parser, ValueEnum};
use env_logger::{self, Env};
use futures::{future::join_all, prelude::*};
use log::{error, info, warn};
use std::{path::Path, time::Duration};
use std::net::SocketAddr;
use tokio::runtime::Handle;
use std::path::Path;
use tokio::sync::broadcast;
use crate::chrony::*;
use crate::{chrony::*, hwmon::HwmonSource};
use chimemon::*;
const PROGRAM_NAME: &str = "chimemon";
const VERSION: &str = "0.0.1";
const DEFAULT_RETRY_DELAY: Duration = Duration::new(60, 0);
const MAX_RETRY_DELAY: Duration = Duration::new(1800, 0);
#[derive(ValueEnum, Clone)]
enum Level {
Debug,
Info,
Warn,
Error,
}
#[derive(Parser)]
struct Args {
/// TOML configuration to load
#[arg(short, long, default_value_t = String::from("config.toml"))]
config_file: String,
}
async fn do_chrony_poll(
config: &Config,
chrony: &ChronyClient,
influx: &influxdb2::Client,
) -> Result<(), Box<dyn std::error::Error>> {
let tracking = chrony.get_tracking().await?;
let sources = chrony.get_sources().await?;
let mut frame = Vec::new();
let tracking_data = datapoint_from_tracking(&tracking, &config)?;
info!(target: "chrony", "Writing tracking data: {:?}", tracking_data);
frame.push(tracking_data);
for ds in sources {
let source_data = datapoint_from_sourcedata(&ds, &config)?;
info!(target: "chrony", "Writing source data: {:?}", source_data);
frame.push(source_data);
}
influx
.write(&config.influxdb.bucket, stream::iter(frame))
.await?;
Ok(())
#[arg(value_enum, default_value_t = Level::Warn)]
log_level: Level,
}
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();
let args = Args::parse();
let loglevel = args
.log_level
.to_possible_value()
.unwrap()
.get_name()
.to_owned();
env_logger::Builder::from_env(Env::default().default_filter_or(loglevel)).init();
info!("{} v{} starting...", PROGRAM_NAME, VERSION);
let config = load_config(Path::new(&args.config_file))?;
let fig = load_config(Path::new(&args.config_file));
warn!("{:?}", fig);
let config: Config = fig.extract()?;
let mut tasks = Vec::new();
let (tx, _) = broadcast::channel(16);
let sourcechan: ChimemonSourceChannel = tx;
info!(
"Connecting to influxdb {} org: {} using token",
@ -69,48 +61,48 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
&config.influxdb.token,
);
if config.sources.chrony.enabled {
let chrony = ChronyClient::new(
chrony_candm::Client::spawn(&Handle::current(), Default::default()),
SocketAddr::new(
config.sources.chrony.host,
config.sources.chrony.port,
),
Duration::from_secs(config.sources.chrony.timeout),
);
let chrony = if config.sources.chrony.enabled {
Some(ChronyClient::new(config.to_owned()))
} else {
None
};
match chrony {
Some(c) => {
tasks.push(tokio::spawn(c.run(sourcechan.clone())));
}
None => (),
};
let mut retry_delay = DEFAULT_RETRY_DELAY;
let mut fail_count = 0;
let hwmon = if config.sources.hwmon.enabled {
Some(HwmonSource::new(config.to_owned()))
} else {
None
};
match hwmon {
Some(hwmon) => {
tasks.push(tokio::spawn(hwmon.run(sourcechan.clone())));
}
None => (),
};
let interval = Duration::new(config.sources.chrony.poll_interval, 0);
let mut interval = tokio::time::interval(interval);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
let mut influxrx = sourcechan.subscribe();
tasks.push(tokio::spawn(async move {
loop {
let dp = influxrx.recv().await.unwrap();
influx
.write(&config.influxdb.bucket, stream::iter([dp]))
.await
.expect("Error writing to influxdb");
}
}));
tasks.push(tokio::spawn(async move {
info!(target: "chrony", "Chrony task started");
loop {
interval.tick().await;
match do_chrony_poll(&config, &chrony, &influx.to_owned()).await {
Ok(_) => {
fail_count = 0;
retry_delay = DEFAULT_RETRY_DELAY;
}
Err(e) => {
warn!(target: "chrony", "Error in chrony task: {}", e.to_string());
if fail_count % 4 == 0 {
// exponential backoff, every 4 failures double the retry timer, cap at max
retry_delay = std::cmp::min(MAX_RETRY_DELAY, retry_delay * 2);
}
fail_count += 1;
}
}
if fail_count > 0 {
tokio::time::sleep(retry_delay).await;
interval.reset();
};
}
}));
}
// let mut debugrx = sourcechan.subscribe();
// tasks.push(tokio::spawn(async move {
// loop {
// let v = debugrx.recv().await;
// warn!("streamed: {:?}", v.unwrap());
// }
// }));
join_all(tasks).await;