From 156df9ae8680de8dd7fa5701032c573ffcc30379 Mon Sep 17 00:00:00 2001 From: Keenan Tims Date: Wed, 4 Feb 2026 02:04:51 -0800 Subject: [PATCH] refactor metric tags & sources config --- src/lib.rs | 59 ++++++++++--- src/main.rs | 117 +++++++++++--------------- src/sources/chrony.rs | 74 +++++++++-------- src/sources/gpsd.rs | 45 +++++----- src/sources/hwmon.rs | 54 ++++++------ src/sources/prs10.rs | 146 +++++++++++++++------------------ src/sources/uccm.rs | 125 ++++++++++++++-------------- src/targets/chrony_refclock.rs | 8 +- 8 files changed, 326 insertions(+), 302 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 5c2f2ea..39d02f8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,7 +4,7 @@ pub mod targets; use async_trait::async_trait; use chrono::{DateTime, Utc}; use figment::{ - Figment, + Figment, Provider, providers::{Format, Serialized, Toml}, util::map, value::Map, @@ -43,6 +43,7 @@ impl Default for InfluxConfig { #[serde_as] #[derive(Serialize, Deserialize, Clone)] +#[serde(default)] pub struct ChronyConfig { pub enabled: bool, #[serde_as(as = "DurationSeconds")] @@ -73,6 +74,7 @@ impl Default for ChronyConfig { } #[derive(Serialize, Deserialize, Clone)] +#[serde(default)] pub struct ChronySockConfig { pub enabled: bool, pub sock: String, @@ -95,6 +97,7 @@ pub struct HwmonSensorConfig { #[serde_as] #[derive(Serialize, Deserialize, Clone)] +#[serde(default)] pub struct HwmonConfig { pub enabled: bool, #[serde_as(as = "DurationSeconds")] @@ -116,6 +119,7 @@ impl Default for HwmonConfig { #[serde_as] #[derive(Serialize, Deserialize, Clone, Debug)] +#[serde(default)] pub struct GpsdConfig { pub enabled: bool, #[serde_as(as = "DurationSeconds")] @@ -135,6 +139,7 @@ impl Default for GpsdConfig { #[serde_as] #[derive(Serialize, Deserialize, Clone, Debug)] +#[serde(default)] pub struct Prs10Config { pub enabled: bool, pub port: String, @@ -178,40 +183,43 @@ pub enum SourceStatus { Unknown, } -#[derive(Clone, Debug)] +#[derive(Copy, Clone, Debug)] pub enum MetricValue { Int(i64), Float(f64), Bool(bool), } +type MetricTag = (&'static str, String); +type MetricTags = Vec; + #[derive(Clone, Debug)] pub struct SourceMetric { - name: String, + name: &'static str, value: MetricValue, - tags: Arc>, + tags: Arc, } impl SourceMetric { - pub fn new_int(name: &str, value: i64, tags: Arc>) -> Self { + pub fn new_int(name: &'static str, value: i64, tags: Arc) -> Self { Self { - name: name.to_owned(), + name: name, value: MetricValue::Int(value), tags, } } - pub fn new_float(name: &str, value: f64, tags: Arc>) -> Self { + pub fn new_float(name: &'static str, value: f64, tags: Arc) -> Self { Self { - name: name.to_owned(), + name: name, value: MetricValue::Float(value), tags, } } - pub fn new_bool(name: &str, value: bool, tags: Arc>) -> Self { + pub fn new_bool(name: &'static str, value: bool, tags: Arc) -> Self { Self { - name: name.to_owned(), + name: name, value: MetricValue::Bool(value), tags, } @@ -232,6 +240,7 @@ pub struct SourceReport { #[serde_as] #[derive(Serialize, Deserialize, Clone)] +#[serde(default)] pub struct UCCMConfig { pub enabled: bool, pub port: String, @@ -265,6 +274,23 @@ pub struct SourcesConfig { pub prs10: Prs10Config, } +#[derive(Serialize, Deserialize, Clone)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum SourceConfig { + Chrony(ChronyConfig), + Hwmon(HwmonConfig), + Uccm(UCCMConfig), + Gpsd(GpsdConfig), + Prs10(Prs10Config), +} + +#[derive(Serialize, Deserialize, Clone)] +pub struct NamedSourceConfig { + pub name: String, + #[serde(flatten)] + pub source: SourceConfig, +} + #[derive(Serialize, Deserialize, Clone, Default)] pub struct TargetsConfig { pub chrony: ChronySockConfig, @@ -273,10 +299,19 @@ pub struct TargetsConfig { #[derive(Serialize, Deserialize, Clone, Default)] pub struct Config { pub influxdb: InfluxConfig, - pub sources: SourcesConfig, + pub sources: Vec, pub targets: TargetsConfig, } +impl Provider for Config { + fn metadata(&self) -> figment::Metadata { + figment::Metadata::named("Default config") + } + fn data(&self) -> Result, figment::Error> { + Serialized::defaults(Config::default()).data() + } +} + pub fn load_config(filename: &Path) -> Figment { Figment::from(Serialized::defaults(Config::default())).merge(Toml::file(filename)) } @@ -317,6 +352,8 @@ pub type ChimemonTargetChannel = Receiver; #[async_trait] pub trait ChimemonSource { + type Config; + fn new(name: &str, config: Self::Config) -> Self; async fn run(self, chan: ChimemonSourceChannel); } diff --git a/src/main.rs b/src/main.rs index f9d4db7..4dfda5a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,13 +1,13 @@ use clap::{Parser, ValueEnum}; +use figment::{ + Figment, + providers::{Format, Toml}, +}; use futures::future::join_all; use std::path::Path; -use tokio::sync::broadcast; +use tokio::{sync::broadcast, task::JoinHandle}; use tracing::{Instrument, debug, error, info, info_span, warn}; -use tracing_subscriber::{ - self, EnvFilter, - fmt::format::{self, FmtSpan}, - prelude::*, -}; +use tracing_subscriber::{self, EnvFilter, fmt::format::FmtSpan}; use chimemon::*; use sources::{ @@ -36,6 +36,43 @@ struct Args { log_level: Level, } +fn run_source(config: NamedSourceConfig, chan: ChimemonSourceChannel) -> Option> { + let NamedSourceConfig { name, source } = config; + match source { + SourceConfig::Chrony(source_config) if source_config.enabled => { + let c = ChronyClient::new(&name, source_config); + Some(tokio::spawn( + c.run(chan).instrument(info_span!("chrony-task")), + )) + } + SourceConfig::Gpsd(source_config) if source_config.enabled => { + let c = GpsdSource::new(&name, source_config); + Some(tokio::spawn( + c.run(chan).instrument(info_span!("gpsd-task")), + )) + } + SourceConfig::Hwmon(source_config) if source_config.enabled => { + let c = HwmonSource::new(&name, source_config); + Some(tokio::spawn( + c.run(chan).instrument(info_span!("hwmon-task")), + )) + } + SourceConfig::Prs10(source_config) if source_config.enabled => { + let c = Prs10Monitor::new(&name, source_config); + Some(tokio::spawn( + c.run(chan).instrument(info_span!("prs10-task")), + )) + } + SourceConfig::Uccm(source_config) if source_config.enabled => { + let c = UCCMMonitor::new(&name, source_config); + Some(tokio::spawn( + c.run(chan.clone()).instrument(info_span!("uccm-task")), + )) + } + _ => None, + } +} + #[tokio::main(flavor = "current_thread")] async fn main() -> Result<(), Box> { tracing_subscriber::fmt() @@ -47,7 +84,9 @@ async fn main() -> Result<(), Box> { let args = Args::parse(); info!("{PROGRAM_NAME} v{VERSION} starting..."); - let fig = load_config(Path::new(&args.config_file)); + let fig = Figment::new() + .merge(Config::default()) + .merge(Toml::file(&args.config_file)); debug!("{fig:?}"); let config: Config = fig.extract()?; @@ -88,66 +127,10 @@ async fn main() -> Result<(), Box> { )); } - let chrony = if config.sources.chrony.enabled { - Some(ChronyClient::new(config.to_owned())) - } else { - None - }; - if let Some(c) = chrony { - tasks.push(tokio::spawn( - c.run(sourcechan.clone()) - .instrument(info_span!("chrony-task")), - )); - }; - - let hwmon = if config.sources.hwmon.enabled { - Some(HwmonSource::new(config.to_owned())) - } else { - None - }; - if let Some(hwmon) = hwmon { - tasks.push(tokio::spawn( - hwmon - .run(sourcechan.clone()) - .instrument(info_span!("hwmon-task")), - )); - }; - - let uccm = if config.sources.uccm.enabled { - Some(UCCMMonitor::new(config.to_owned())) - } else { - None - }; - if let Some(uccm) = uccm { - tasks.push(tokio::spawn( - uccm.run(sourcechan.clone()) - .instrument(info_span!("uccm-task")), - )); - }; - - let gpsd = if config.sources.gpsd.enabled { - Some(GpsdSource::new(config.to_owned()).await.unwrap()) - } else { - None - }; - if let Some(gpsd) = gpsd { - tasks.push(tokio::spawn( - gpsd.run(sourcechan.clone()) - .instrument(info_span!("gpsd-task")), - )) - } - - let prs10 = if config.sources.prs10.enabled { - Some(Prs10Monitor::new(config.sources.prs10)) - } else { - None - }; - if let Some(prs10) = prs10 { - tasks.push(tokio::spawn( - prs10 - .run(sourcechan.clone()) - .instrument(info_span!("prs10-task")), - )) + for source in config.sources { + if let Some(task) = run_source(source, sourcechan.clone()) { + tasks.push(task) + } } let chrony_refclock = if config.targets.chrony.enabled { diff --git a/src/sources/chrony.rs b/src/sources/chrony.rs index 80e08f9..7c4769e 100644 --- a/src/sources/chrony.rs +++ b/src/sources/chrony.rs @@ -1,26 +1,28 @@ -use crate::{ - ChimemonSource, ChimemonSourceChannel, Config, SourceMetric, SourceReport, SourceReportDetails, - SourceStatus, -}; +use std::net::{SocketAddr, ToSocketAddrs}; +use std::sync::Arc; + use async_trait::async_trait; use chrony_candm::reply::{self, ReplyBody, SourceMode}; use chrony_candm::request::{self, RequestBody}; use chrony_candm::{ClientOptions, blocking_query}; -use std::net::{SocketAddr, ToSocketAddrs}; -use std::sync::Arc; -use std::time::Duration; use tokio::join; use tracing::{info, warn}; +use crate::{ + ChimemonSource, ChimemonSourceChannel, ChronyConfig, MetricTags, SourceMetric, SourceReport, + SourceReportDetails, SourceStatus, +}; + pub struct ChronyClient { pub server: SocketAddr, + pub name: String, client_options: ClientOptions, - config: Config, + config: ChronyConfig, } #[derive(Debug)] pub struct ChronyTrackingReport { - tags: Arc>, + tags: Arc, pub ref_id: i64, pub ref_ip_addr: String, pub stratum: i64, @@ -78,9 +80,9 @@ impl SourceReportDetails for ChronySourcesReport { for source in &self.sources { let tags = Arc::new(vec![ - ("ref_id".to_owned(), source.ip_addr.to_string()), + ("ref_id", source.ip_addr.to_string()), ( - "mode".to_owned(), + "mode", match source.mode { SourceMode::Client => String::from("server"), SourceMode::Peer => String::from("peer"), @@ -88,7 +90,7 @@ impl SourceReportDetails for ChronySourcesReport { }, ), ( - "state".to_owned(), + "state", match source.state { reply::SourceState::Selected => String::from("best"), reply::SourceState::NonSelectable => String::from("unusable"), @@ -129,7 +131,7 @@ impl SourceReportDetails for ChronySourcesReport { fn report_from_tracking( t: &reply::Tracking, - config: &Config, + config: &ChronyConfig, ) -> Result> { let report = ChronyTrackingReport { tags: Arc::new(vec![]), //TODO: allow configuring tags in the source @@ -151,25 +153,6 @@ fn report_from_tracking( } impl ChronyClient { - pub fn new(config: Config) -> Self { - let server = config - .sources - .chrony - .host - .to_socket_addrs() - .unwrap() - .next() - .expect("Unable to parse host:port:"); - let client_options = ClientOptions { - n_tries: 3, - timeout: config.sources.chrony.timeout, - }; - ChronyClient { - server, - client_options, - config, - } - } async fn query(&self, request: RequestBody) -> Result { let server = self.server; let client_options = self.client_options; @@ -265,7 +248,7 @@ impl ChronyClient { let tracking_data = report_from_tracking(&tracking, &self.config)?; let report = SourceReport { - name: "chrony-tracking".to_owned(), + name: self.name.clone(), status: SourceStatus::Unknown, details: Arc::new(tracking_data), }; @@ -283,7 +266,7 @@ impl ChronyClient { let sources = self.get_sources().await?; let details = ChronySourcesReport { sources }; let report = SourceReport { - name: "chrony-sources".to_owned(), + name: self.name.clone(), status: SourceStatus::Unknown, details: Arc::new(details), }; @@ -295,11 +278,30 @@ impl ChronyClient { #[async_trait] impl ChimemonSource for ChronyClient { + type Config = ChronyConfig; + fn new(name: &str, config: Self::Config) -> Self { + let server = config + .host + .to_socket_addrs() + .unwrap() + .next() + .expect("Unable to parse host:port:"); + let client_options = ClientOptions { + n_tries: 3, + timeout: config.timeout, + }; + ChronyClient { + name: name.to_owned(), + server, + client_options, + config, + } + } async fn run(self, chan: ChimemonSourceChannel) { info!("Chrony task started"); - let mut t_interval = tokio::time::interval(self.config.sources.chrony.tracking_interval); - let mut s_interval = tokio::time::interval(self.config.sources.chrony.sources_interval); + let mut t_interval = tokio::time::interval(self.config.tracking_interval); + let mut s_interval = tokio::time::interval(self.config.sources_interval); let t_future = async { let lchan = chan.clone(); diff --git a/src/sources/gpsd.rs b/src/sources/gpsd.rs index 4fc2600..9f272c5 100644 --- a/src/sources/gpsd.rs +++ b/src/sources/gpsd.rs @@ -1,8 +1,3 @@ -use crate::{ - ChimemonMessage, ChimemonSource, ChimemonSourceChannel, Config, SourceMetric, SourceReport, - SourceReportDetails, SourceStatus, -}; - use std::collections::HashMap; use std::f64; use std::fmt::Debug; @@ -12,8 +7,7 @@ use std::time::Duration; use async_trait::async_trait; use backoff::ExponentialBackoff; -use futures::StreamExt; -use futures::{SinkExt, Stream}; +use futures::{SinkExt, Stream, StreamExt}; use gpsd_proto::{Device, Gst, Mode, Pps, Sky, Tpv, UnifiedResponse, Version}; use serde::Serialize; use serde_json; @@ -22,8 +16,14 @@ use tokio::time::{interval, timeout}; use tokio_util::codec::{Framed, LinesCodec}; use tracing::{debug, debug_span, info, instrument, warn}; +use crate::{ + ChimemonMessage, ChimemonSource, ChimemonSourceChannel, GpsdConfig, SourceMetric, SourceReport, + SourceReportDetails, SourceStatus, +}; + pub struct GpsdSource { - pub config: Config, + pub name: String, + pub config: GpsdConfig, conn: GpsdTransport, devices: HashMap, last_gst: Option, @@ -76,19 +76,20 @@ impl SourceReportDetails for GpsdSourceReport { self.fix_type != GpsdFixType::Unknown && self.fix_type != GpsdFixType::NoFix } fn to_metrics(&self) -> Vec { - let no_tags = Arc::new(vec![]); + let tags = Arc::new(vec![]); vec![ - SourceMetric::new_int("sats_visible", self.sats_visible as i64, no_tags.clone()), - SourceMetric::new_int("sats_tracked", self.sats_tracked as i64, no_tags.clone()), - SourceMetric::new_float("tdop", self.tdop, no_tags.clone()), + SourceMetric::new_int("sats_visible", self.sats_visible as i64, tags.clone()), + SourceMetric::new_int("sats_tracked", self.sats_tracked as i64, tags.clone()), + SourceMetric::new_float("tdop", self.tdop, tags.clone()), ] } } impl GpsdSource { - pub async fn new(config: Config) -> Result { - let conn = GpsdTransport::new(&config.sources.gpsd.host).await?; + async fn inner_new(name: &str, config: GpsdConfig) -> Result { + let conn = GpsdTransport::new(&config.host).await?; Ok(Self { + name: name.to_owned(), config, conn, devices: HashMap::new(), @@ -98,9 +99,6 @@ impl GpsdSource { last_sky: None, }) } -} - -impl GpsdSource { async fn send_status(&self, chan: &mut ChimemonSourceChannel) { let sky = self.last_sky.as_ref(); let tpv = self.last_tpv.as_ref(); @@ -118,7 +116,7 @@ impl GpsdSource { .map_or(f64::INFINITY, |tdop| tdop as f64); chan.send(ChimemonMessage::SourceReport(SourceReport { - name: "gpsd".into(), + name: self.name.clone(), status: SourceStatus::Unknown, details: Arc::new(GpsdSourceReport { fix_type: tpv.map_or(GpsdFixType::Unknown, |tpv| tpv.mode.into()), @@ -163,10 +161,19 @@ impl GpsdSource { #[async_trait] impl ChimemonSource for GpsdSource { + type Config = GpsdConfig; + fn new(name: &str, config: Self::Config) -> Self { + // TODO: refactor so this mess isn't necessary + // Should do async setup at the start of run(), not here + let rt = tokio::runtime::Builder::new_current_thread() + .build() + .unwrap(); + rt.block_on(Self::inner_new(name, config)).unwrap() + } async fn run(mut self, mut chan: ChimemonSourceChannel) { info!("gpsd task started"); self.conn.conn().await.unwrap(); - let mut ticker = interval(self.config.sources.gpsd.interval); + let mut ticker = interval(self.config.interval); let mut params = WatchParams::default(); params.json = Some(true); diff --git a/src/sources/hwmon.rs b/src/sources/hwmon.rs index 2dfccd6..943bbc8 100644 --- a/src/sources/hwmon.rs +++ b/src/sources/hwmon.rs @@ -1,13 +1,15 @@ -use crate::{ - ChimemonSource, ChimemonSourceChannel, Config, SourceMetric, SourceReport, SourceReportDetails, - SourceStatus, -}; +use std::{fs::File, io::Read, path::PathBuf, sync::Arc}; + use async_trait::async_trait; -use std::{fs::File, io::Read, path::PathBuf, sync::Arc, time::Duration}; use tracing::{debug, error, info, warn}; +use crate::{ + ChimemonSource, ChimemonSourceChannel, HwmonConfig, MetricTags, SourceMetric, SourceReport, + SourceReportDetails, SourceStatus, +}; pub struct HwmonSource { - config: Config, + name: String, + config: HwmonConfig, sensors: Vec>, } @@ -18,7 +20,7 @@ struct HwmonSensor { device: String, sensor: String, label: Option, - tags: Arc>, + tags: Arc, } impl HwmonSensor { @@ -40,12 +42,12 @@ impl HwmonSensor { None }; let mut tags_vec = vec![ - ("name".to_owned(), name.to_owned()), - ("device".to_owned(), device.to_owned()), - ("sensor".to_owned(), sensor.to_owned()), + ("name", name.to_owned()), + ("device", device.to_owned()), + ("sensor", sensor.to_owned()), ]; if let Some(label) = &label { - tags_vec.push(("label".to_owned(), label.clone())) + tags_vec.push(("label", label.clone())) } Self { value_path, @@ -92,18 +94,6 @@ impl SourceReportDetails for HwmonReport { const HWMON_ROOT: &str = "/sys/class/hwmon"; impl HwmonSource { - pub fn new(config: Config) -> Self { - let sensors = config - .sources - .hwmon - .sensors - .iter() - .map(|(k, v)| Arc::new(HwmonSensor::new(k, &v.name, &v.sensor))) - .collect(); - - HwmonSource { config, sensors } - } - async fn get_raw_value(sensor: &HwmonSensor) -> Result { tokio::fs::read_to_string(&sensor.value_path).await } @@ -111,9 +101,23 @@ impl HwmonSource { #[async_trait] impl ChimemonSource for HwmonSource { + type Config = HwmonConfig; + fn new(name: &str, config: Self::Config) -> Self { + let sensors = config + .sensors + .iter() + .map(|(k, v)| Arc::new(HwmonSensor::new(k, &v.name, &v.sensor))) + .collect(); + + HwmonSource { + name: name.to_owned(), + config, + sensors, + } + } async fn run(self, chan: ChimemonSourceChannel) { info!("hwmon task started"); - let mut interval = tokio::time::interval(self.config.sources.hwmon.interval); + let mut interval = tokio::time::interval(self.config.interval); loop { interval.tick().await; let mut values = Vec::new(); @@ -137,7 +141,7 @@ impl ChimemonSource for HwmonSource { } } let report = SourceReport { - name: "hwmon".to_owned(), + name: self.name.clone(), status: SourceStatus::Healthy, details: Arc::new(HwmonReport { values }), }; diff --git a/src/sources/prs10.rs b/src/sources/prs10.rs index ceb2f6c..a6e4a1f 100644 --- a/src/sources/prs10.rs +++ b/src/sources/prs10.rs @@ -2,10 +2,6 @@ use std::any::type_name; use std::str::FromStr; use std::sync::Arc; -use crate::{ - ChimemonMessage, ChimemonSource, ChimemonSourceChannel, Prs10Config, SourceMetric, - SourceReport, SourceReportDetails, SourceStatus, -}; use async_trait::async_trait; use bitflags::bitflags; use itertools::Itertools; @@ -13,10 +9,14 @@ use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, ReadHalf, WriteHalf}; use tokio::select; use tokio::sync::OnceCell; use tokio::time::{interval, timeout}; -use tokio_serial; use tokio_serial::{SerialPort, SerialStream}; use tracing::{debug, debug_span, error, info, instrument, warn}; +use crate::{ + ChimemonMessage, ChimemonSource, ChimemonSourceChannel, MetricTags, Prs10Config, SourceMetric, + SourceReport, SourceReportDetails, SourceStatus, +}; + #[derive(Debug)] pub struct Prs10Info { pub model: String, @@ -54,7 +54,7 @@ bitflags! { } impl Prs10PowerLampFlags { - pub fn get_metrics(&self, no_tags: Arc>) -> Vec { + pub fn get_metrics(&self, tags: Arc) -> Vec { // Define the mapping statically const FLAG_LABELS: [(&Prs10PowerLampFlags, &str); 8] = [ (&Prs10PowerLampFlags::ELEC_VOLTAGE_LOW, "elec_voltage_low"), @@ -72,7 +72,7 @@ impl Prs10PowerLampFlags { .iter() .map(|(flag, label)| { // We track whether each flag is set (true) or not (false) - SourceMetric::new_bool(*label, self.contains(**flag), no_tags.clone()) + SourceMetric::new_bool(*label, self.contains(**flag), tags.clone()) }) .collect() } @@ -180,17 +180,17 @@ impl SourceReportDetails for Prs10Status { } fn to_metrics(&self) -> Vec { - let no_tags = Arc::new(vec![]); + let tags = Arc::new(vec![]); vec![ SourceMetric::new_int( "volt_lamp_flags", self.volt_lamp_flags.bits() as i64, - no_tags.clone(), + tags.clone(), ), - SourceMetric::new_int("rf_flags", self.rf_flags.bits() as i64, no_tags.clone()), - SourceMetric::new_int("temp_flags", self.temp_flags.bits() as i64, no_tags.clone()), - SourceMetric::new_int("fll_flags", self.fll_flags.bits() as i64, no_tags.clone()), - SourceMetric::new_int("pps_flags", self.pps_flags.bits() as i64, no_tags.clone()), + SourceMetric::new_int("rf_flags", self.rf_flags.bits() as i64, tags.clone()), + SourceMetric::new_int("temp_flags", self.temp_flags.bits() as i64, tags.clone()), + SourceMetric::new_int("fll_flags", self.fll_flags.bits() as i64, tags.clone()), + SourceMetric::new_int("pps_flags", self.pps_flags.bits() as i64, tags.clone()), // system flags are kind of useless because we can't guarantee they get upstreamed and will only appear once since they are 'event flags' ] } @@ -254,104 +254,69 @@ impl SourceReportDetails for Prs10Stats { true } fn to_metrics(&self) -> Vec { - let no_tags = Arc::new(vec![]); + let tags = Arc::new(vec![]); vec![ // Integer Metrics - SourceMetric::new_int("ocxo_efc", self.ocxo_efc as i64, no_tags.clone()), + SourceMetric::new_int("ocxo_efc", self.ocxo_efc as i64, tags.clone()), // Float Metrics - SourceMetric::new_float( - "error_signal_volts", - self.error_signal_volts, - no_tags.clone(), - ), + SourceMetric::new_float("error_signal_volts", self.error_signal_volts, tags.clone()), SourceMetric::new_float( "detect_signal_volts", self.detect_signal_volts, - no_tags.clone(), + tags.clone(), ), - SourceMetric::new_float("heat_volts", self.heat_volts, no_tags.clone()), - SourceMetric::new_float("elec_volts", self.elec_volts, no_tags.clone()), + SourceMetric::new_float("heat_volts", self.heat_volts, tags.clone()), + SourceMetric::new_float("elec_volts", self.elec_volts, tags.clone()), SourceMetric::new_float( "lamp_fet_drain_volts", self.lamp_fet_drain_volts, - no_tags.clone(), + tags.clone(), ), SourceMetric::new_float( "lamp_fet_gate_volts", self.lamp_fet_gate_volts, - no_tags.clone(), + tags.clone(), ), - SourceMetric::new_float("ocxo_heat_volts", self.ocxo_heat_volts, no_tags.clone()), - SourceMetric::new_float("cell_heat_volts", self.cell_heat_volts, no_tags.clone()), - SourceMetric::new_float("lamp_heat_volts", self.lamp_heat_volts, no_tags.clone()), - SourceMetric::new_float("rb_photo", self.rb_photo, no_tags.clone()), - SourceMetric::new_float("rb_photo_iv", self.rb_photo_iv, no_tags.clone()), - SourceMetric::new_float("case_temp", self.case_temp, no_tags.clone()), - SourceMetric::new_float("ocxo_therm", self.ocxo_therm, no_tags.clone()), - SourceMetric::new_float("cell_therm", self.cell_therm, no_tags.clone()), - SourceMetric::new_float("lamp_therm", self.lamp_therm, no_tags.clone()), - SourceMetric::new_float("ext_cal_volts", self.ext_cal_volts, no_tags.clone()), - SourceMetric::new_float("analog_gnd_volts", self.analog_gnd_volts, no_tags.clone()), + SourceMetric::new_float("ocxo_heat_volts", self.ocxo_heat_volts, tags.clone()), + SourceMetric::new_float("cell_heat_volts", self.cell_heat_volts, tags.clone()), + SourceMetric::new_float("lamp_heat_volts", self.lamp_heat_volts, tags.clone()), + SourceMetric::new_float("rb_photo", self.rb_photo, tags.clone()), + SourceMetric::new_float("rb_photo_iv", self.rb_photo_iv, tags.clone()), + SourceMetric::new_float("case_temp", self.case_temp, tags.clone()), + SourceMetric::new_float("ocxo_therm", self.ocxo_therm, tags.clone()), + SourceMetric::new_float("cell_therm", self.cell_therm, tags.clone()), + SourceMetric::new_float("lamp_therm", self.lamp_therm, tags.clone()), + SourceMetric::new_float("ext_cal_volts", self.ext_cal_volts, tags.clone()), + SourceMetric::new_float("analog_gnd_volts", self.analog_gnd_volts, tags.clone()), SourceMetric::new_float( "if_vco_varactor_volts", self.if_vco_varactor_volts, - no_tags.clone(), + tags.clone(), ), SourceMetric::new_float( "op_vco_varactor_volts", self.op_vco_varactor_volts, - no_tags.clone(), + tags.clone(), ), - SourceMetric::new_float( - "mul_amp_gain_volts", - self.mul_amp_gain_volts, - no_tags.clone(), - ), - SourceMetric::new_float("rf_lock_volts", self.rf_lock_volts, no_tags.clone()), + SourceMetric::new_float("mul_amp_gain_volts", self.mul_amp_gain_volts, tags.clone()), + SourceMetric::new_float("rf_lock_volts", self.rf_lock_volts, tags.clone()), // U16 Metrics (optional, but can be treated as integers) - SourceMetric::new_int( - "freq_offset_ppt", - self.freq_offset_ppt as i64, - no_tags.clone(), - ), - SourceMetric::new_int("mag_efc", self.mag_efc as i64, no_tags.clone()), + SourceMetric::new_int("freq_offset_ppt", self.freq_offset_ppt as i64, tags.clone()), + SourceMetric::new_int("mag_efc", self.mag_efc as i64, tags.clone()), ] } } #[derive(Debug)] pub struct Prs10Monitor { + name: String, + config: Prs10Config, rx: ReadHalf, tx: WriteHalf, info: OnceCell, - config: Prs10Config, } impl Prs10Monitor { - pub fn new(config: Prs10Config) -> Self { - let builder = tokio_serial::new(&config.port, config.baud) - .timeout(config.timeout) - .data_bits(tokio_serial::DataBits::Eight) - .parity(tokio_serial::Parity::None) - .stop_bits(tokio_serial::StopBits::One) - .flow_control(tokio_serial::FlowControl::None); - let mut port = SerialStream::open(&builder).expect("Must be able to open serial port"); - port.set_exclusive(true).expect("Can't lock serial port"); - info!( - "Opened serial port {}@{}", - port.name().unwrap(), - port.baud_rate().unwrap() - ); - let (rx, tx) = tokio::io::split(port); - - Self { - rx, - tx, - config, - info: OnceCell::new(), - } - } - pub fn info(&self) -> &Prs10Info { self.info.get().expect("info() used before run()") } @@ -450,7 +415,7 @@ impl Prs10Monitor { async fn status_poll(&mut self) -> Result> { let status = self.get_status().await?; Ok(ChimemonMessage::SourceReport(SourceReport { - name: "prs10".into(), + name: self.name.clone(), status: if status.is_healthy() { SourceStatus::Healthy } else { @@ -480,7 +445,7 @@ impl Prs10Monitor { drop(stats_guard); Ok(ChimemonMessage::SourceReport(SourceReport { - name: "prs10".into(), + name: self.name.clone(), status: SourceStatus::Unknown, details: Arc::new(Prs10Stats { ocxo_efc, @@ -514,10 +479,35 @@ impl Prs10Monitor { #[async_trait] impl ChimemonSource for Prs10Monitor { + type Config = Prs10Config; + fn new(name: &str, config: Self::Config) -> Self { + let builder = tokio_serial::new(&config.port, config.baud) + .timeout(config.timeout) + .data_bits(tokio_serial::DataBits::Eight) + .parity(tokio_serial::Parity::None) + .stop_bits(tokio_serial::StopBits::One) + .flow_control(tokio_serial::FlowControl::None); + let mut port = SerialStream::open(&builder).expect("Must be able to open serial port"); + port.set_exclusive(true).expect("Can't lock serial port"); + info!( + "Opened serial port {}@{}", + port.name().unwrap(), + port.baud_rate().unwrap() + ); + let (rx, tx) = tokio::io::split(port); + + Self { + name: name.to_owned(), + config, + rx, + tx, + info: OnceCell::new(), + } + } async fn run(mut self, chan: ChimemonSourceChannel) { info!("PRS10 task starting"); if let Err(e) = self.set_info().await { - warn!("Error starting PRS10: {e:?}"); + error!("Error starting PRS10: {e:?}"); return; } info!( diff --git a/src/sources/uccm.rs b/src/sources/uccm.rs index 9421dcd..37f1568 100644 --- a/src/sources/uccm.rs +++ b/src/sources/uccm.rs @@ -1,7 +1,7 @@ -use crate::{ - ChimemonMessage, ChimemonSource, ChimemonSourceChannel, Config, SourceMetric, - SourceReportDetails, TimeReport, -}; +use std::io::{BufRead, Cursor}; +use std::str; +use std::sync::Arc; + use async_trait::async_trait; use bitflags::bitflags; use byteorder::{BigEndian, ReadBytesExt}; @@ -11,9 +11,6 @@ use figment::value::Map; use influxdb2::models::DataPoint; use influxdb2::models::data_point::DataPointBuilder; use itertools::Itertools; -use std::io::{BufRead, Cursor}; -use std::str; -use std::sync::Arc; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, ReadHalf, WriteHalf}; use tokio::join; use tokio::sync::Mutex; @@ -21,6 +18,11 @@ use tokio::time::sleep; use tokio_serial::{SerialPort, SerialStream}; use tracing::{debug, info, warn}; +use crate::{ + ChimemonMessage, ChimemonSource, ChimemonSourceChannel, SourceMetric, SourceReportDetails, + TimeReport, UCCMConfig, +}; + pub const GPS_EPOCH: i64 = 315964800; // Doesn't seem possible to have a const DateTime object pub type UccmEndian = BigEndian; @@ -32,11 +34,11 @@ pub enum UCCMMonitorParseState { } pub struct UCCMMonitor { - // pub port: SerialStream, + pub name: String, + config: UCCMConfig, rx: ReadHalf, tx: WriteHalf, pub info: Option, - config: Config, } #[derive(Debug)] @@ -399,30 +401,6 @@ impl TryFrom<&str> for UCCMGPSSatsReport { } impl UCCMMonitor { - pub fn new(config: Config) -> Self { - let builder = tokio_serial::new(&config.sources.uccm.port, config.sources.uccm.baud) - .timeout(config.sources.uccm.timeout) - .data_bits(tokio_serial::DataBits::Eight) - .parity(tokio_serial::Parity::None) - .stop_bits(tokio_serial::StopBits::One) - .flow_control(tokio_serial::FlowControl::None); - let mut port = SerialStream::open(&builder).expect("Must be able to open serial port"); - port.set_exclusive(true).expect("Can't lock serial port"); - info!( - "Opened serial port {}@{}", - port.name().unwrap(), - port.baud_rate().unwrap() - ); - let (rx, tx) = tokio::io::split(port); - UCCMMonitor { - // port, - rx, - tx, - info: None, - config, - } - } - pub async fn send_cmd(&mut self, cmd: &[u8]) -> Result { debug!("cmd: `{:?}`", String::from_utf8_lossy(cmd)); self.tx.write_all(cmd).await.unwrap(); @@ -481,7 +459,7 @@ impl UCCMMonitor { let mut last_loop_diag: Option = None; let mut last_gps_sats: Option = None; - let mut last_sent_report = Utc::now() - self.config.sources.uccm.status_interval; + let mut last_sent_report = Utc::now() - self.config.status_interval; loop { match tokio::io::AsyncReadExt::read_buf(&mut self.rx, &mut rdbuf).await { @@ -525,37 +503,33 @@ impl UCCMMonitor { })) .expect("Unable to send to channel"); if sysnow - last_sent_report - >= Duration::from_std(self.config.sources.uccm.status_interval) - .unwrap() + >= Duration::from_std(self.config.status_interval).unwrap() { - let mut points = vec![ - tod.as_builder( - &self.config.sources.uccm.measurement, - &self.config.influxdb.tags, - ) - .build() - .unwrap(), - ]; - if let Some(loop_diag) = &last_loop_diag { - points.push( - loop_diag - .as_builder( - &self.config.sources.uccm.measurement, - &self.config.influxdb.tags, - ) - .build() - .unwrap(), - ) - } - if let Some(gps_sats) = &last_gps_sats { - points.extend(gps_sats.build( - &self.config.sources.uccm.measurement, - &self.config.influxdb.tags, - )); - } + // let mut points = vec![ + // tod.as_builder(&self.config.measurement, &self.config.tags) + // .build() + // .unwrap(), + // ]; + // if let Some(loop_diag) = &last_loop_diag { + // points.push( + // loop_diag + // .as_builder( + // &self.config.measurement, + // &self.config.influxdb.tags, + // ) + // .build() + // .unwrap(), + // ) + // } + // if let Some(gps_sats) = &last_gps_sats { + // points.extend(gps_sats.build( + // &self.config.sources.uccm.measurement, + // &self.config.influxdb.tags, + // )); + // } - chan.send(ChimemonMessage::DataPoints(points)) - .expect("Unable to send to channel"); + // chan.send(ChimemonMessage::DataPoints(points)) + // .expect("Unable to send to channel"); last_sent_report = sysnow; } } @@ -597,6 +571,31 @@ impl UCCMMonitor { #[async_trait] impl ChimemonSource for UCCMMonitor { + type Config = UCCMConfig; + fn new(name: &str, config: Self::Config) -> Self { + let builder = tokio_serial::new(&config.port, config.baud) + .timeout(config.timeout) + .data_bits(tokio_serial::DataBits::Eight) + .parity(tokio_serial::Parity::None) + .stop_bits(tokio_serial::StopBits::One) + .flow_control(tokio_serial::FlowControl::None); + let mut port = SerialStream::open(&builder).expect("Must be able to open serial port"); + port.set_exclusive(true).expect("Can't lock serial port"); + info!( + "Opened serial port {}@{}", + port.name().unwrap(), + port.baud_rate().unwrap() + ); + let (rx, tx) = tokio::io::split(port); + UCCMMonitor { + name: name.to_owned(), + config, + rx, + tx, + info: None, + } + } + async fn run(mut self, chan: ChimemonSourceChannel) { info!("UCCM task starting"); if self.get_info().await.is_err() { diff --git a/src/targets/chrony_refclock.rs b/src/targets/chrony_refclock.rs index 58002d2..e828b0d 100644 --- a/src/targets/chrony_refclock.rs +++ b/src/targets/chrony_refclock.rs @@ -1,11 +1,13 @@ -use crate::{ChimemonMessage, ChimemonTarget, ChimemonTargetChannel, ChronySockConfig}; -use async_trait::async_trait; -use libc::{c_double, c_int, timeval}; use std::mem; use std::os::unix::net::UnixDatagram; use std::path::PathBuf; + +use async_trait::async_trait; +use libc::{c_double, c_int, timeval}; use tracing::debug; +use crate::{ChimemonMessage, ChimemonTarget, ChimemonTargetChannel, ChronySockConfig}; + const CHRONY_MAGIC: c_int = 0x534f434b; pub struct ChronySockServer {