diff --git a/Cargo.toml b/Cargo.toml index 17fd708..8c89e5a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,7 @@ release-logs = ["tracing/max_level_info"] [dependencies] serde = "1.0" serde_derive = "1.0" -tokio = { version = "1", features = ["rt", "io-util"] } +tokio = { version = "1", features = ["rt-multi-thread", "io-util"] } clap = { version = "4.0", features = ["derive"] } figment = { version = "0.10", features = ["toml"] } futures = "0.3.24" diff --git a/src/config.rs b/src/config.rs index fa34932..27238a1 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,18 +1,13 @@ -use std::{collections::BTreeMap, path::Path}; - -use figment::{ - Figment, Provider, - providers::{Format, Serialized, Toml}, - util::map, - value::Map, -}; +use figment::{Provider, providers::Serialized, util::map, value::Map}; use gethostname::gethostname; use serde_derive::{Deserialize, Serialize}; use serde_with::{DurationSeconds, serde_as}; #[derive(Serialize, Deserialize, Clone)] +#[serde(default)] pub struct InfluxConfig { pub enabled: bool, + pub timeout: std::time::Duration, pub url: String, pub org: String, pub bucket: String, @@ -25,6 +20,7 @@ impl Default for InfluxConfig { let host = gethostname().into_string().unwrap(); InfluxConfig { enabled: false, + timeout: std::time::Duration::from_secs(10), url: "http://localhost:8086".into(), org: "default".into(), bucket: "default".into(), @@ -199,14 +195,13 @@ pub enum SourceConfig { #[serde(tag = "type", rename_all = "snake_case")] pub enum TargetConfig { ChronySock(ChronySockConfig), - InfluxDb(InfluxConfig), + Influxdb(InfluxConfig), } #[derive(Serialize, Deserialize, Clone, Default)] pub struct Config { - pub influxdb: InfluxConfig, - pub sources: BTreeMap, - pub targets: BTreeMap, + pub sources: Map, + pub targets: Map, } impl Provider for Config { diff --git a/src/lib.rs b/src/lib.rs index 5fe807b..2e0122a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,7 +13,7 @@ macro_rules! fatal { use async_trait::async_trait; use chrono::{DateTime, Utc}; -use tokio::sync::broadcast::*; +use tokio::sync::broadcast; use tokio_util::sync::CancellationToken; use std::{fmt::Debug, sync::Arc}; @@ -50,37 +50,39 @@ type MetricTags = Vec; pub struct SourceMetric { name: &'static str, value: MetricValue, - tags: Arc, } impl SourceMetric { - pub fn new_int(name: &'static str, value: i64, tags: Arc) -> Self { + pub fn new_int(name: &'static str, value: i64) -> Self { Self { name: name, value: MetricValue::Int(value), - tags, } } - pub fn new_float(name: &'static str, value: f64, tags: Arc) -> Self { + pub fn new_float(name: &'static str, value: f64) -> Self { Self { name: name, value: MetricValue::Float(value), - tags, } } - pub fn new_bool(name: &'static str, value: bool, tags: Arc) -> Self { + pub fn new_bool(name: &'static str, value: bool) -> Self { Self { name: name, value: MetricValue::Bool(value), - tags, } } } +#[derive(Debug)] +pub struct SourceMetricSet { + metrics: Vec, + tags: Arc, +} + pub trait SourceReportDetails: Debug + Send + Sync { - fn to_metrics(&self) -> Vec; + fn to_metrics(&self) -> Vec; fn is_healthy(&self) -> bool; } @@ -88,7 +90,7 @@ pub trait SourceReportDetails: Debug + Send + Sync { pub struct SourceReport { pub name: String, pub status: SourceStatus, - pub details: Arc, + pub details: Arc, } #[derive(Debug, Clone)] @@ -109,12 +111,13 @@ impl From for ChimemonMessage { } } -pub type ChimemonSourceChannel = Sender; -pub type ChimemonTargetChannel = Receiver; +pub type ChimemonSourceChannel = broadcast::Sender; +pub type ChimemonTargetChannel = broadcast::Receiver; #[async_trait] pub trait ChimemonSource { type Config; + const TASK_NAME: &'static str; fn new(name: &str, config: Self::Config) -> Self; async fn run(self, chan: ChimemonSourceChannel, cancel: CancellationToken); } @@ -122,6 +125,7 @@ pub trait ChimemonSource { #[async_trait] pub trait ChimemonTarget { type Config; + const TASK_NAME: &'static str; fn new(name: &str, config: Self::Config) -> Self; - async fn run(self, chan: ChimemonTargetChannel, cancel: CancellationToken); + async fn run(self, mut chan: ChimemonTargetChannel, cancel: CancellationToken); } diff --git a/src/main.rs b/src/main.rs index 086bd4f..6720cc6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,6 @@ +use std::sync::Arc; + +use async_trait::async_trait; use clap::{Parser, ValueEnum}; use figment::{ Figment, @@ -11,6 +14,7 @@ use tracing_subscriber::{self, EnvFilter, fmt::format::FmtSpan}; use chimemon::{ config::{SourceConfig, TargetConfig}, + targets::influx::InfluxTarget, *, }; use config::Config; @@ -55,46 +59,36 @@ struct Args { config_file: String, #[arg(value_enum, default_value_t = Level::Info)] log_level: Level, + #[arg(short, long, default_value_t = false)] + echo_task: bool, } fn run_source( name: &str, source: SourceConfig, chan: ChimemonSourceChannel, - shutdown: CancellationToken, + cancel: CancellationToken, ) -> Option> { match source { - SourceConfig::Chrony(source_config) if source_config.enabled => { - let c = ChronyClient::new(&name, source_config); - Some(tokio::spawn( - c.run(chan, shutdown).instrument(info_span!("chrony-task")), - )) + SourceConfig::Chrony(cfg) if cfg.enabled => { + spawn_source::(name, cfg, chan, cancel) } - SourceConfig::Gpsd(source_config) if source_config.enabled => { - let c = GpsdSource::new(&name, source_config); - Some(tokio::spawn( - c.run(chan, shutdown).instrument(info_span!("gpsd-task")), - )) + SourceConfig::Gpsd(cfg) if cfg.enabled => { + spawn_source::(name, cfg, chan, cancel) } - SourceConfig::Hwmon(source_config) if source_config.enabled => { - let c = HwmonSource::new(&name, source_config); - Some(tokio::spawn( - c.run(chan, shutdown).instrument(info_span!("hwmon-task")), - )) + SourceConfig::Hwmon(cfg) if cfg.enabled => { + spawn_source::(name, cfg, chan, cancel) } - SourceConfig::Prs10(source_config) if source_config.enabled => { - let c = Prs10Monitor::new(&name, source_config); - Some(tokio::spawn( - c.run(chan, shutdown).instrument(info_span!("prs10-task")), - )) + SourceConfig::Prs10(cfg) if cfg.enabled => { + spawn_source::(name, cfg, chan, cancel) } - SourceConfig::Uccm(source_config) if source_config.enabled => { - let c = UCCMMonitor::new(&name, source_config); - Some(tokio::spawn( - c.run(chan, shutdown).instrument(info_span!("uccm-task")), - )) + SourceConfig::Uccm(cfg) if cfg.enabled => { + spawn_source::(name, cfg, chan, cancel) + } + _ => { + debug!("Disabled source {name} skipped"); + None } - _ => None, } } @@ -105,44 +99,79 @@ fn run_target( cancel: CancellationToken, ) -> Option> { match target { - TargetConfig::ChronySock(source_config) if source_config.enabled => { - let c = ChronySockServer::new(name, source_config); - Some(tokio::spawn( - c.run(chan, cancel) - .instrument(info_span!("chronysock-task")), - )) + TargetConfig::ChronySock(cfg) if cfg.enabled => { + spawn_target::(name, cfg, chan, cancel) } - TargetConfig::InfluxDb(source_config) if source_config.enabled => { - warn!("influx not implemented"); + TargetConfig::Influxdb(cfg) if cfg.enabled => { + spawn_target::(name, cfg, chan, cancel) + } + _ => { + debug!("Disabled target {name} skipped"); None } - _ => None, } } -async fn dummy_consumer(mut chan: ChimemonTargetChannel, cancel: CancellationToken) { - info!("Dummy receiver task started"); - loop { - select! { - _ = cancel.cancelled() => { - return - }, - Ok(m) = chan.recv() => { - match m { - ChimemonMessage::SourceReport(report) => { - let metrics = report.details.to_metrics(); - info!("instance: {} metrics: {metrics:?}", report.name); - } - msg => { - info!("message: {msg:?}"); - } +fn spawn_source( + name: &str, + config: T::Config, + chan: ChimemonSourceChannel, + cancel: CancellationToken, +) -> Option> { + let span = info_span!("source", task = name); + let s = T::new(name, config); + Some(tokio::spawn(s.run(chan, cancel).instrument(span))) +} + +fn spawn_target( + name: &str, + config: T::Config, + chan: ChimemonTargetChannel, + cancel: CancellationToken, +) -> Option> { + let span = info_span!("target", task = name); + let t = T::new(name, config); + Some(tokio::spawn(async move { + t.run(chan, cancel).instrument(span).await + })) +} + +struct EchoTarget {} +struct EchoTargetConfig {} +#[async_trait] +impl ChimemonTarget for EchoTarget { + type Config = EchoTargetConfig; + const TASK_NAME: &'static str = "echo-task"; + + fn new(_name: &str, _config: Self::Config) -> Self { + EchoTarget {} + } + async fn run(self, mut chan: ChimemonTargetChannel, cancel: CancellationToken) { + info!("Dummy receiver task started"); + loop { + let msg = select! { + _ = cancel.cancelled() => { + return + }, + msg = chan.recv() => msg + }; + match msg { + Ok(ChimemonMessage::SourceReport(report)) => { + let metrics = report.details.to_metrics(); + info!("instance: {} metrics: {metrics:?}", report.name); + } + Ok(msg) => { + info!("message: {msg:?}"); + } + Err(e) => { + warn!(error = ?e, "Error receiving message"); } } } } } -#[tokio::main(flavor = "current_thread")] +#[tokio::main(flavor = "multi_thread")] async fn main() -> Result<(), Box> { let args = Args::parse(); tracing_subscriber::fmt() @@ -162,17 +191,10 @@ async fn main() -> Result<(), Box> { let config: Config = fig.extract()?; let mut tasks = Vec::new(); - let (tx, _) = broadcast::channel(16); - let sourcechan: ChimemonSourceChannel = tx; + let (sourcechan, _) = broadcast::channel(16); let shutdown_token = CancellationToken::new(); - for (name, source) in config.sources { - if let Some(task) = run_source(&name, source, sourcechan.clone(), shutdown_token.clone()) { - tasks.push(task) - } - } - for (name, target) in config.targets { if let Some(task) = run_target( &name, @@ -184,21 +206,31 @@ async fn main() -> Result<(), Box> { } } + for (name, source) in config.sources { + if let Some(task) = run_source(&name, source, sourcechan.clone(), shutdown_token.clone()) { + tasks.push(task) + } + } + if tasks.len() == 0 { error!("No tasks configured, exiting."); return Ok(()); // not an error, but exit before starting a dummy task } - + if sourcechan.strong_count() == 0 { + warn!("No sources configured, no events will be generated"); + } if sourcechan.receiver_count() == 0 { - warn!("No consumers configured, events will be discarded"); - tasks.push(tokio::spawn( - dummy_consumer(sourcechan.subscribe(), shutdown_token.clone()) - .instrument(info_span!("dummy-consumer-task")), - )); + warn!("No targets configured, events will be discarded"); + } + if args.echo_task || sourcechan.receiver_count() == 0 { + let c = EchoTargetConfig {}; + tasks.push( + spawn_target::("echo", c, sourcechan.subscribe(), shutdown_token.clone()) + .unwrap(), + ) } debug!("Task setup complete, tasks: {}", tasks.len()); - ctrlc::set_handler(move || { if shutdown_token.is_cancelled() { info!("Forced shutdown"); diff --git a/src/sources/chrony.rs b/src/sources/chrony.rs index a952f28..d1efd0b 100644 --- a/src/sources/chrony.rs +++ b/src/sources/chrony.rs @@ -9,6 +9,7 @@ use tokio::select; use tokio_util::sync::CancellationToken; use tracing::{info, warn}; +use crate::SourceMetricSet; use crate::{ ChimemonSource, ChimemonSourceChannel, MetricTags, SourceMetric, SourceReport, SourceReportDetails, SourceStatus, config::ChronyConfig, @@ -43,26 +44,24 @@ impl SourceReportDetails for ChronyTrackingReport { fn is_healthy(&self) -> bool { true } - fn to_metrics(&self) -> Vec { - let tags = &self.tags; - vec![ - SourceMetric::new_int("ref_id", self.ref_id, tags.clone()), - SourceMetric::new_int("stratum", self.stratum, tags.clone()), - SourceMetric::new_int("leap_status", self.leap_status, tags.clone()), - SourceMetric::new_float("current_correction", self.current_correction, tags.clone()), - SourceMetric::new_float("last_offset", self.last_offset, tags.clone()), - SourceMetric::new_float("rms_offset", self.rms_offset, tags.clone()), - SourceMetric::new_float("freq_ppm", self.freq_ppm, tags.clone()), - SourceMetric::new_float("resid_freq_ppm", self.resid_freq_ppm, tags.clone()), - SourceMetric::new_float("skew_ppm", self.skew_ppm, tags.clone()), - SourceMetric::new_float("root_delay", self.root_delay, tags.clone()), - SourceMetric::new_float("root_dispersion", self.root_dispersion, tags.clone()), - SourceMetric::new_float( - "last_update_interval", - self.last_update_interval, - tags.clone(), - ), - ] + fn to_metrics(&self) -> Vec { + vec![SourceMetricSet { + tags: self.tags.clone(), + metrics: vec![ + SourceMetric::new_int("ref_id", self.ref_id), + SourceMetric::new_int("stratum", self.stratum), + SourceMetric::new_int("leap_status", self.leap_status), + SourceMetric::new_float("current_correction", self.current_correction), + SourceMetric::new_float("last_offset", self.last_offset), + SourceMetric::new_float("rms_offset", self.rms_offset), + SourceMetric::new_float("freq_ppm", self.freq_ppm), + SourceMetric::new_float("resid_freq_ppm", self.resid_freq_ppm), + SourceMetric::new_float("skew_ppm", self.skew_ppm), + SourceMetric::new_float("root_delay", self.root_delay), + SourceMetric::new_float("root_dispersion", self.root_dispersion), + SourceMetric::new_float("last_update_interval", self.last_update_interval), + ], + }] } } @@ -76,8 +75,8 @@ impl SourceReportDetails for ChronySourcesReport { //TODO: think about whether there is an idea of unhealthy sources true } - fn to_metrics(&self) -> Vec { - let mut metrics = Vec::with_capacity(8 * self.sources.len()); + fn to_metrics(&self) -> Vec { + let mut metrics = Vec::with_capacity(self.sources.len()); for source in &self.sources { let tags = Arc::new(vec![ @@ -102,28 +101,19 @@ impl SourceReportDetails for ChronySourcesReport { }, ), ]); - metrics.extend([ - SourceMetric::new_int("poll", source.poll as i64, tags.clone()), - SourceMetric::new_int("stratum", source.stratum as i64, tags.clone()), - SourceMetric::new_int("flags", source.flags.bits() as i64, tags.clone()), - SourceMetric::new_int( - "reachability", - source.reachability.count_ones() as i64, - tags.clone(), - ), - SourceMetric::new_int("since_sample", source.since_sample as i64, tags.clone()), - SourceMetric::new_float( - "orig_latest_meas", - source.orig_latest_meas.into(), - tags.clone(), - ), - SourceMetric::new_float("latest_meas", source.latest_meas.into(), tags.clone()), - SourceMetric::new_float( - "latest_meas_err", - source.latest_meas_err.into(), - tags.clone(), - ), - ]); + metrics.push(SourceMetricSet { + tags: tags, + metrics: vec![ + SourceMetric::new_int("poll", source.poll as i64), + SourceMetric::new_int("stratum", source.stratum as i64), + SourceMetric::new_int("flags", source.flags.bits() as i64), + SourceMetric::new_int("reachability", source.reachability.count_ones() as i64), + SourceMetric::new_int("since_sample", source.since_sample as i64), + SourceMetric::new_float("orig_latest_meas", source.orig_latest_meas.into()), + SourceMetric::new_float("latest_meas", source.latest_meas.into()), + SourceMetric::new_float("latest_meas_err", source.latest_meas_err.into()), + ], + }); } metrics @@ -280,6 +270,7 @@ impl ChronyClient { #[async_trait] impl ChimemonSource for ChronyClient { type Config = ChronyConfig; + const TASK_NAME: &'static str = "chrony-task"; fn new(name: &str, config: Self::Config) -> Self { let server = config .host diff --git a/src/sources/gpsd.rs b/src/sources/gpsd.rs index 1871e20..c00fc70 100644 --- a/src/sources/gpsd.rs +++ b/src/sources/gpsd.rs @@ -17,9 +17,10 @@ use tokio_util::codec::{Framed, LinesCodec}; use tokio_util::sync::CancellationToken; use tracing::{debug, debug_span, error, info, instrument, warn}; +use crate::SourceMetricSet; use crate::{ - ChimemonMessage, ChimemonSource, ChimemonSourceChannel, SourceMetric, SourceReport, - SourceReportDetails, SourceStatus, config::GpsdConfig, + ChimemonSource, ChimemonSourceChannel, SourceMetric, SourceReport, SourceReportDetails, + SourceStatus, config::GpsdConfig, }; pub struct GpsdSource { @@ -76,13 +77,16 @@ impl SourceReportDetails for GpsdSourceReport { fn is_healthy(&self) -> bool { self.fix_type != GpsdFixType::Unknown && self.fix_type != GpsdFixType::NoFix } - fn to_metrics(&self) -> Vec { + fn to_metrics(&self) -> Vec { let tags = Arc::new(vec![]); - vec![ - SourceMetric::new_int("sats_visible", self.sats_visible as i64, tags.clone()), - SourceMetric::new_int("sats_tracked", self.sats_tracked as i64, tags.clone()), - SourceMetric::new_float("tdop", self.tdop, tags.clone()), - ] + vec![SourceMetricSet { + tags, + metrics: vec![ + SourceMetric::new_int("sats_visible", self.sats_visible as i64), + SourceMetric::new_int("sats_tracked", self.sats_tracked as i64), + SourceMetric::new_float("tdop", self.tdop), + ], + }] } } @@ -167,6 +171,7 @@ impl GpsdSource { #[async_trait] impl ChimemonSource for GpsdSource { type Config = GpsdConfig; + const TASK_NAME: &'static str = "gpsd-task"; fn new(name: &str, config: Self::Config) -> Self { // TODO: refactor so this mess isn't necessary // Should do async setup at the start of run(), not here diff --git a/src/sources/hwmon.rs b/src/sources/hwmon.rs index de3caf7..650a486 100644 --- a/src/sources/hwmon.rs +++ b/src/sources/hwmon.rs @@ -6,7 +6,7 @@ use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, warn}; use crate::{ - ChimemonSource, ChimemonSourceChannel, MetricTags, SourceMetric, SourceReport, + ChimemonSource, ChimemonSourceChannel, MetricTags, SourceMetric, SourceMetricSet, SourceReport, SourceReportDetails, SourceStatus, config::HwmonConfig, }; @@ -73,14 +73,13 @@ impl SourceReportDetails for HwmonReport { //self.alarms.iter().any(|(_sensor, alarm)| *alarm) true } - fn to_metrics(&self) -> Vec { + fn to_metrics(&self) -> Vec { let mut metrics = Vec::new(); for (sensor, value) in &self.values { - metrics.push(SourceMetric::new_float( - "hwmon_value", - *value, - sensor.tags.clone(), - )) + metrics.push(SourceMetricSet { + tags: sensor.tags.clone(), + metrics: vec![SourceMetric::new_float("hwmon_value", *value)], + }) } // for (sensor, alarm) in &self.alarms { // metrics.push(SourceMetric::new_bool( @@ -105,6 +104,7 @@ impl HwmonSource { #[async_trait] impl ChimemonSource for HwmonSource { type Config = HwmonConfig; + const TASK_NAME: &'static str = "hwmon-task"; fn new(name: &str, config: Self::Config) -> Self { let sensors = config .sensors diff --git a/src/sources/prs10.rs b/src/sources/prs10.rs index 3ce697b..ea52f86 100644 --- a/src/sources/prs10.rs +++ b/src/sources/prs10.rs @@ -16,6 +16,7 @@ use tokio_serial::{SerialPort, SerialStream}; use tokio_util::sync::CancellationToken; use tracing::{debug, debug_span, error, info, instrument, warn}; +use crate::SourceMetricSet; use crate::{ ChimemonMessage, ChimemonSource, ChimemonSourceChannel, MetricTags, SourceMetric, SourceReport, SourceReportDetails, SourceStatus, config::Prs10Config, fatal, @@ -76,7 +77,7 @@ impl Prs10PowerLampFlags { .iter() .map(|(flag, label)| { // We track whether each flag is set (true) or not (false) - SourceMetric::new_bool(*label, self.contains(**flag), tags.clone()) + SourceMetric::new_bool(*label, self.contains(**flag)) }) .collect() } @@ -184,20 +185,19 @@ impl SourceReportDetails for Prs10Status { && self.pps_flags == HEALTHY_PPS } - fn to_metrics(&self) -> Vec { + fn to_metrics(&self) -> Vec { let tags = Arc::new(vec![]); - vec![ - SourceMetric::new_int( - "volt_lamp_flags", - self.volt_lamp_flags.bits() as i64, - tags.clone(), - ), - SourceMetric::new_int("rf_flags", self.rf_flags.bits() as i64, tags.clone()), - SourceMetric::new_int("temp_flags", self.temp_flags.bits() as i64, tags.clone()), - SourceMetric::new_int("fll_flags", self.fll_flags.bits() as i64, tags.clone()), - SourceMetric::new_int("pps_flags", self.pps_flags.bits() as i64, tags.clone()), - // system flags are kind of useless because we can't guarantee they get upstreamed and will only appear once since they are 'event flags' - ] + vec![SourceMetricSet { + tags, + metrics: vec![ + SourceMetric::new_int("volt_lamp_flags", self.volt_lamp_flags.bits() as i64), + SourceMetric::new_int("rf_flags", self.rf_flags.bits() as i64), + SourceMetric::new_int("temp_flags", self.temp_flags.bits() as i64), + SourceMetric::new_int("fll_flags", self.fll_flags.bits() as i64), + SourceMetric::new_int("pps_flags", self.pps_flags.bits() as i64), + // system flags are kind of useless because we can't guarantee they get upstreamed and will only appear once since they are 'event flags' + ], + }] } } @@ -267,57 +267,40 @@ impl SourceReportDetails for Prs10Stats { fn is_healthy(&self) -> bool { true } - fn to_metrics(&self) -> Vec { + fn to_metrics(&self) -> Vec { let tags = Arc::new(vec![]); - vec![ - // Integer Metrics - SourceMetric::new_int("ocxo_efc", self.ocxo_efc as i64, tags.clone()), - // Float Metrics - SourceMetric::new_float("error_signal_volts", self.error_signal_volts, tags.clone()), - SourceMetric::new_float( - "detect_signal_volts", - self.detect_signal_volts, - tags.clone(), - ), - SourceMetric::new_float("heat_volts", self.heat_volts, tags.clone()), - SourceMetric::new_float("elec_volts", self.elec_volts, tags.clone()), - SourceMetric::new_float( - "lamp_fet_drain_volts", - self.lamp_fet_drain_volts, - tags.clone(), - ), - SourceMetric::new_float( - "lamp_fet_gate_volts", - self.lamp_fet_gate_volts, - tags.clone(), - ), - SourceMetric::new_float("ocxo_heat_volts", self.ocxo_heat_volts, tags.clone()), - SourceMetric::new_float("cell_heat_volts", self.cell_heat_volts, tags.clone()), - SourceMetric::new_float("lamp_heat_volts", self.lamp_heat_volts, tags.clone()), - SourceMetric::new_float("rb_photo", self.rb_photo, tags.clone()), - SourceMetric::new_float("rb_photo_iv", self.rb_photo_iv, tags.clone()), - SourceMetric::new_float("case_temp", self.case_temp, tags.clone()), - SourceMetric::new_float("ocxo_therm", self.ocxo_therm, tags.clone()), - SourceMetric::new_float("cell_therm", self.cell_therm, tags.clone()), - SourceMetric::new_float("lamp_therm", self.lamp_therm, tags.clone()), - SourceMetric::new_float("ext_cal_volts", self.ext_cal_volts, tags.clone()), - SourceMetric::new_float("analog_gnd_volts", self.analog_gnd_volts, tags.clone()), - SourceMetric::new_float( - "if_vco_varactor_volts", - self.if_vco_varactor_volts, - tags.clone(), - ), - SourceMetric::new_float( - "op_vco_varactor_volts", - self.op_vco_varactor_volts, - tags.clone(), - ), - SourceMetric::new_float("mul_amp_gain_volts", self.mul_amp_gain_volts, tags.clone()), - SourceMetric::new_float("rf_lock_volts", self.rf_lock_volts, tags.clone()), - // U16 Metrics (optional, but can be treated as integers) - SourceMetric::new_int("freq_offset_ppt", self.freq_offset_ppt as i64, tags.clone()), - SourceMetric::new_int("mag_efc", self.mag_efc as i64, tags.clone()), - ] + vec![SourceMetricSet { + tags, + metrics: vec![ + // Integer Metrics + SourceMetric::new_int("ocxo_efc", self.ocxo_efc as i64), + // Float Metrics + SourceMetric::new_float("error_signal_volts", self.error_signal_volts), + SourceMetric::new_float("detect_signal_volts", self.detect_signal_volts), + SourceMetric::new_float("heat_volts", self.heat_volts), + SourceMetric::new_float("elec_volts", self.elec_volts), + SourceMetric::new_float("lamp_fet_drain_volts", self.lamp_fet_drain_volts), + SourceMetric::new_float("lamp_fet_gate_volts", self.lamp_fet_gate_volts), + SourceMetric::new_float("ocxo_heat_volts", self.ocxo_heat_volts), + SourceMetric::new_float("cell_heat_volts", self.cell_heat_volts), + SourceMetric::new_float("lamp_heat_volts", self.lamp_heat_volts), + SourceMetric::new_float("rb_photo", self.rb_photo), + SourceMetric::new_float("rb_photo_iv", self.rb_photo_iv), + SourceMetric::new_float("case_temp", self.case_temp), + SourceMetric::new_float("ocxo_therm", self.ocxo_therm), + SourceMetric::new_float("cell_therm", self.cell_therm), + SourceMetric::new_float("lamp_therm", self.lamp_therm), + SourceMetric::new_float("ext_cal_volts", self.ext_cal_volts), + SourceMetric::new_float("analog_gnd_volts", self.analog_gnd_volts), + SourceMetric::new_float("if_vco_varactor_volts", self.if_vco_varactor_volts), + SourceMetric::new_float("op_vco_varactor_volts", self.op_vco_varactor_volts), + SourceMetric::new_float("mul_amp_gain_volts", self.mul_amp_gain_volts), + SourceMetric::new_float("rf_lock_volts", self.rf_lock_volts), + // U16 Metrics (optional, but can be treated as integers) + SourceMetric::new_int("freq_offset_ppt", self.freq_offset_ppt as i64), + SourceMetric::new_int("mag_efc", self.mag_efc as i64), + ], + }] } } @@ -511,6 +494,7 @@ impl Prs10Monitor { #[async_trait] impl ChimemonSource for Prs10Monitor { type Config = Prs10Config; + const TASK_NAME: &'static str = "prs10-task"; fn new(name: &str, config: Self::Config) -> Self { let builder = tokio_serial::new(&config.port, config.baud) .timeout(config.timeout) diff --git a/src/sources/uccm.rs b/src/sources/uccm.rs index 8591e6a..0e3b218 100644 --- a/src/sources/uccm.rs +++ b/src/sources/uccm.rs @@ -17,9 +17,10 @@ use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, warn}; use crate::{ - ChimemonMessage, ChimemonSource, ChimemonSourceChannel, SourceMetric, SourceReport, - SourceReportDetails, SourceStatus, TimeReport, config::UCCMConfig, + ChimemonSource, ChimemonSourceChannel, SourceMetric, SourceReport, SourceReportDetails, + SourceStatus, TimeReport, config::UCCMConfig, }; +use crate::{SourceMetricSet, fatal}; pub const GPS_EPOCH: i64 = 315964800; // Doesn't seem possible to have a const DateTime object pub type UccmEndian = BigEndian; @@ -59,61 +60,33 @@ impl SourceReportDetails for UCCMTODReport { && !self.flags.contains(UCCMFlags::GPS_LOS) } - fn to_metrics(&self) -> Vec { - let no_tags = Arc::new(vec![]); - vec![ - SourceMetric::new_int("leaps", self.leaps as i64, no_tags.clone()), - SourceMetric::new_bool( - "osc_lock", - self.flags.contains(UCCMFlags::OSC_LOCK), - no_tags.clone(), - ), - SourceMetric::new_bool( - "leap_flag", - self.flags.contains(UCCMFlags::LEAP_FLAG), - no_tags.clone(), - ), - SourceMetric::new_bool( - "init_unlock", - self.flags.contains(UCCMFlags::INIT_UNLOCK), - no_tags.clone(), - ), - SourceMetric::new_bool( - "init_no_sats", - self.flags.contains(UCCMFlags::INIT_NO_SATS), - no_tags.clone(), - ), - SourceMetric::new_bool( - "have_gps_time", - self.flags.contains(UCCMFlags::HAVE_GPS_TIME), - no_tags.clone(), - ), - SourceMetric::new_bool( - "power_fail", - self.flags.contains(UCCMFlags::POWER_FAIL), - no_tags.clone(), - ), - SourceMetric::new_bool( - "no_gps_sync", - self.flags.contains(UCCMFlags::NO_GPS_SYNC), - no_tags.clone(), - ), - SourceMetric::new_bool( - "no_gps_sync2", - self.flags.contains(UCCMFlags::NO_GPS_SYNC2), - no_tags.clone(), - ), - SourceMetric::new_bool( - "ant_fault", - self.flags.contains(UCCMFlags::NO_ANT), - no_tags.clone(), - ), - SourceMetric::new_bool( - "gps_los", - self.flags.contains(UCCMFlags::GPS_LOS), - no_tags.clone(), - ), - ] + fn to_metrics(&self) -> Vec { + let tags = Arc::new(vec![]); + vec![SourceMetricSet { + tags, + metrics: vec![ + SourceMetric::new_int("leaps", self.leaps as i64), + SourceMetric::new_bool("osc_lock", self.flags.contains(UCCMFlags::OSC_LOCK)), + SourceMetric::new_bool("leap_flag", self.flags.contains(UCCMFlags::LEAP_FLAG)), + SourceMetric::new_bool("init_unlock", self.flags.contains(UCCMFlags::INIT_UNLOCK)), + SourceMetric::new_bool( + "init_no_sats", + self.flags.contains(UCCMFlags::INIT_NO_SATS), + ), + SourceMetric::new_bool( + "have_gps_time", + self.flags.contains(UCCMFlags::HAVE_GPS_TIME), + ), + SourceMetric::new_bool("power_fail", self.flags.contains(UCCMFlags::POWER_FAIL)), + SourceMetric::new_bool("no_gps_sync", self.flags.contains(UCCMFlags::NO_GPS_SYNC)), + SourceMetric::new_bool( + "no_gps_sync2", + self.flags.contains(UCCMFlags::NO_GPS_SYNC2), + ), + SourceMetric::new_bool("ant_fault", self.flags.contains(UCCMFlags::NO_ANT)), + SourceMetric::new_bool("gps_los", self.flags.contains(UCCMFlags::GPS_LOS)), + ], + }] } } @@ -126,13 +99,12 @@ impl SourceReportDetails for UCCMLoopDiagReport { fn is_healthy(&self) -> bool { true } - fn to_metrics(&self) -> Vec { + fn to_metrics(&self) -> Vec { let tags = Arc::new(vec![]); - vec![SourceMetric::new_float( - "ocxo_offset", - self.ocxo as f64, + vec![SourceMetricSet { tags, - )] + metrics: vec![SourceMetric::new_float("ocxo_offset", self.ocxo as f64)], + }] } } @@ -144,11 +116,7 @@ pub struct UCCMGpsSvTracking { impl From<&UCCMGpsSvTracking> for SourceMetric { fn from(value: &UCCMGpsSvTracking) -> Self { - SourceMetric::new_int( - "sv_cno", - value.cno as i64, - Arc::new(vec![("sv_id", value.sv.to_string())]), - ) + SourceMetric::new_int("sv_cno", value.cno as i64) } } @@ -161,8 +129,11 @@ impl SourceReportDetails for UCCMGPSSatsReport { fn is_healthy(&self) -> bool { self.tracked_svs.len() >= 4 } - fn to_metrics(&self) -> Vec { - self.tracked_svs.iter().map(|sv| sv.into()).collect() + fn to_metrics(&self) -> Vec { + vec![SourceMetricSet { + tags: Arc::new(vec![]), + metrics: self.tracked_svs.iter().map(|sv| sv.into()).collect(), + }] } } @@ -245,20 +216,23 @@ impl SourceReportDetails for UCCMStatusReport { fn is_healthy(&self) -> bool { self.gps_pps_valid } - fn to_metrics(&self) -> Vec { - let no_tags = Arc::new(vec![]); - vec![ - SourceMetric::new_int("tfom", self.tfom as i64, no_tags.clone()), - SourceMetric::new_int("ffom", self.ffom as i64, no_tags.clone()), - SourceMetric::new_float("gps_phase", self.gps_phase as f64, no_tags.clone()), - // TODO: sv info - // TOOD: timestamp - SourceMetric::new_float("ant_voltage", self.ant_voltage as f64, no_tags.clone()), - SourceMetric::new_float("ant_current", self.ant_current as f64, no_tags.clone()), - SourceMetric::new_float("temp", self.temp as f64, no_tags.clone()), - SourceMetric::new_int("efc_dac", self.efc_dac as i64, no_tags.clone()), - SourceMetric::new_float("freq_error", self.freq_error as f64, no_tags.clone()), - ] + fn to_metrics(&self) -> Vec { + let tags = Arc::new(vec![]); + vec![SourceMetricSet { + tags, + metrics: vec![ + SourceMetric::new_int("tfom", self.tfom as i64), + SourceMetric::new_int("ffom", self.ffom as i64), + SourceMetric::new_float("gps_phase", self.gps_phase as f64), + // TODO: sv info + // TOOD: timestamp + SourceMetric::new_float("ant_voltage", self.ant_voltage as f64), + SourceMetric::new_float("ant_current", self.ant_current as f64), + SourceMetric::new_float("temp", self.temp as f64), + SourceMetric::new_int("efc_dac", self.efc_dac as i64), + SourceMetric::new_float("freq_error", self.freq_error as f64), + ], + }] } } @@ -545,6 +519,7 @@ impl UCCMMonitor { #[async_trait] impl ChimemonSource for UCCMMonitor { type Config = UCCMConfig; + const TASK_NAME: &'static str = "uccm-task"; fn new(name: &str, config: Self::Config) -> Self { let builder = tokio_serial::new(&config.port, config.baud) .timeout(config.timeout) @@ -552,8 +527,13 @@ impl ChimemonSource for UCCMMonitor { .parity(tokio_serial::Parity::None) .stop_bits(tokio_serial::StopBits::One) .flow_control(tokio_serial::FlowControl::None); - let mut port = SerialStream::open(&builder).expect("Must be able to open serial port"); - port.set_exclusive(true).expect("Can't lock serial port"); + let mut port = match SerialStream::open(&builder) { + Ok(port) => port, + Err(e) => fatal!(error = ?e, "Error opening port {}", &config.port), + }; + if let Err(e) = port.set_exclusive(true) { + fatal!(error= ?e, "Can't lock serial port"); + }; info!( "Opened serial port {}@{}", port.name().unwrap(), diff --git a/src/targets/chrony_refclock.rs b/src/targets/chrony_refclock.rs index f8a695d..bed0e7f 100644 --- a/src/targets/chrony_refclock.rs +++ b/src/targets/chrony_refclock.rs @@ -33,6 +33,7 @@ impl ChronySockServer {} #[async_trait] impl ChimemonTarget for ChronySockServer { type Config = ChronySockConfig; + const TASK_NAME: &'static str = "chrony-refclock-task"; fn new(name: &str, config: ChronySockConfig) -> Self { ChronySockServer { name: name.to_owned(), @@ -43,10 +44,11 @@ impl ChimemonTarget for ChronySockServer { info!("Chrony refclock task started"); loop { select! { - _ = cancel.cancelled() => { return } - msg = chan.recv() => { + _ = cancel.cancelled() => { return } + msg = chan.recv() => { match msg { Ok(ChimemonMessage::TimeReport(tr)) => { + debug!(tr = ?tr, "Got timereport"); if tr.valid { { let frame = ChronyTimeReport { diff --git a/src/targets/influx.rs b/src/targets/influx.rs new file mode 100644 index 0000000..71f3ed6 --- /dev/null +++ b/src/targets/influx.rs @@ -0,0 +1,110 @@ +use async_trait::async_trait; +use futures::stream; +use influxdb2::{ + Client, + models::{DataPoint, FieldValue}, +}; +use tokio::{select, sync::broadcast, time::timeout}; +use tokio_util::sync::CancellationToken; +use tracing::{debug, error, info, instrument}; + +use crate::{ + ChimemonMessage, ChimemonTarget, ChimemonTargetChannel, MetricValue, SourceReport, + config::InfluxConfig, fatal, +}; + +pub struct InfluxTarget { + name: String, + config: InfluxConfig, + influx: Client, +} + +impl From for FieldValue { + fn from(value: MetricValue) -> Self { + match value { + MetricValue::Bool(b) => FieldValue::Bool(b), + MetricValue::Float(f) => FieldValue::F64(f), + MetricValue::Int(i) => FieldValue::I64(i), + } + } +} + +#[async_trait] +impl ChimemonTarget for InfluxTarget { + type Config = InfluxConfig; + const TASK_NAME: &'static str = "influx-task"; + fn new(name: &str, config: Self::Config) -> Self { + let influx = Client::new(&config.url, &config.org, &config.token); + Self { + name: name.to_owned(), + config: config, + influx, + } + } + async fn run(self, mut chan: ChimemonTargetChannel, cancel: CancellationToken) { + info!("Influx task started"); + + loop { + let msg = select! { + _ = cancel.cancelled() => { return }, + msg = chan.recv() => msg + }; + debug!(msg = ?msg, "Got msg"); + let msg = match msg { + Ok(msg) => msg, + Err(broadcast::error::RecvError::Closed) => { + fatal!("Permanent channel closed, terminating") + } + Err(broadcast::error::RecvError::Lagged(_)) => { + error!("Channel lagged"); + continue; + } + }; + if let Err(e) = self.handle_msg(&msg).await { + error!(error = ?e, msg=?&msg, "Error handling message"); + } + } + } +} + +impl InfluxTarget { + #[instrument(skip_all)] + async fn handle_source_report( + &self, + sr: &SourceReport, + ) -> Result<(), Box> { + debug!("Handling source report {}", sr.name); + let mut dps = Vec::new(); + for metric_set in &sr.details.to_metrics() { + let mut builder = DataPoint::builder(&sr.name); + builder = self + .config + .tags + .iter() + .fold(builder, |builder, (k, v)| builder.tag(k, v)); + builder = metric_set + .tags + .iter() + .fold(builder, |builder, (k, v)| builder.tag(*k, v)); + builder = metric_set.metrics.iter().fold(builder, |builder, metric| { + builder.field(metric.name, metric.value) + }); + dps.push(builder.build()?); + } + debug!("Sending {} datapoints to influx", dps.len()); + timeout( + self.config.timeout, + self.influx.write(&self.config.bucket, stream::iter(dps)), + ) + .await??; + debug!("All datapoints sent"); + Ok(()) + } + async fn handle_msg(&self, msg: &ChimemonMessage) -> Result<(), Box> { + debug!(msg = ?msg, "Handling msg"); + match msg { + ChimemonMessage::TimeReport(_tr) => Ok(()), + ChimemonMessage::SourceReport(sr) => self.handle_source_report(sr).await, + } + } +} diff --git a/src/targets/mod.rs b/src/targets/mod.rs index e3bfe56..8fc431b 100644 --- a/src/targets/mod.rs +++ b/src/targets/mod.rs @@ -1,2 +1,2 @@ pub mod chrony_refclock; -// pub mod influx; +pub mod influx;