refactor metric tags & sources config

This commit is contained in:
2026-02-04 02:04:51 -08:00
parent d464cf8ee6
commit 156df9ae86
8 changed files with 326 additions and 302 deletions

View File

@@ -4,7 +4,7 @@ pub mod targets;
use async_trait::async_trait; use async_trait::async_trait;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use figment::{ use figment::{
Figment, Figment, Provider,
providers::{Format, Serialized, Toml}, providers::{Format, Serialized, Toml},
util::map, util::map,
value::Map, value::Map,
@@ -43,6 +43,7 @@ impl Default for InfluxConfig {
#[serde_as] #[serde_as]
#[derive(Serialize, Deserialize, Clone)] #[derive(Serialize, Deserialize, Clone)]
#[serde(default)]
pub struct ChronyConfig { pub struct ChronyConfig {
pub enabled: bool, pub enabled: bool,
#[serde_as(as = "DurationSeconds<u64>")] #[serde_as(as = "DurationSeconds<u64>")]
@@ -73,6 +74,7 @@ impl Default for ChronyConfig {
} }
#[derive(Serialize, Deserialize, Clone)] #[derive(Serialize, Deserialize, Clone)]
#[serde(default)]
pub struct ChronySockConfig { pub struct ChronySockConfig {
pub enabled: bool, pub enabled: bool,
pub sock: String, pub sock: String,
@@ -95,6 +97,7 @@ pub struct HwmonSensorConfig {
#[serde_as] #[serde_as]
#[derive(Serialize, Deserialize, Clone)] #[derive(Serialize, Deserialize, Clone)]
#[serde(default)]
pub struct HwmonConfig { pub struct HwmonConfig {
pub enabled: bool, pub enabled: bool,
#[serde_as(as = "DurationSeconds<u64>")] #[serde_as(as = "DurationSeconds<u64>")]
@@ -116,6 +119,7 @@ impl Default for HwmonConfig {
#[serde_as] #[serde_as]
#[derive(Serialize, Deserialize, Clone, Debug)] #[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(default)]
pub struct GpsdConfig { pub struct GpsdConfig {
pub enabled: bool, pub enabled: bool,
#[serde_as(as = "DurationSeconds<u64>")] #[serde_as(as = "DurationSeconds<u64>")]
@@ -135,6 +139,7 @@ impl Default for GpsdConfig {
#[serde_as] #[serde_as]
#[derive(Serialize, Deserialize, Clone, Debug)] #[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(default)]
pub struct Prs10Config { pub struct Prs10Config {
pub enabled: bool, pub enabled: bool,
pub port: String, pub port: String,
@@ -178,40 +183,43 @@ pub enum SourceStatus {
Unknown, Unknown,
} }
#[derive(Clone, Debug)] #[derive(Copy, Clone, Debug)]
pub enum MetricValue { pub enum MetricValue {
Int(i64), Int(i64),
Float(f64), Float(f64),
Bool(bool), Bool(bool),
} }
type MetricTag = (&'static str, String);
type MetricTags = Vec<MetricTag>;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct SourceMetric { pub struct SourceMetric {
name: String, name: &'static str,
value: MetricValue, value: MetricValue,
tags: Arc<Vec<(String, String)>>, tags: Arc<MetricTags>,
} }
impl SourceMetric { impl SourceMetric {
pub fn new_int(name: &str, value: i64, tags: Arc<Vec<(String, String)>>) -> Self { pub fn new_int(name: &'static str, value: i64, tags: Arc<MetricTags>) -> Self {
Self { Self {
name: name.to_owned(), name: name,
value: MetricValue::Int(value), value: MetricValue::Int(value),
tags, tags,
} }
} }
pub fn new_float(name: &str, value: f64, tags: Arc<Vec<(String, String)>>) -> Self { pub fn new_float(name: &'static str, value: f64, tags: Arc<MetricTags>) -> Self {
Self { Self {
name: name.to_owned(), name: name,
value: MetricValue::Float(value), value: MetricValue::Float(value),
tags, tags,
} }
} }
pub fn new_bool(name: &str, value: bool, tags: Arc<Vec<(String, String)>>) -> Self { pub fn new_bool(name: &'static str, value: bool, tags: Arc<MetricTags>) -> Self {
Self { Self {
name: name.to_owned(), name: name,
value: MetricValue::Bool(value), value: MetricValue::Bool(value),
tags, tags,
} }
@@ -232,6 +240,7 @@ pub struct SourceReport {
#[serde_as] #[serde_as]
#[derive(Serialize, Deserialize, Clone)] #[derive(Serialize, Deserialize, Clone)]
#[serde(default)]
pub struct UCCMConfig { pub struct UCCMConfig {
pub enabled: bool, pub enabled: bool,
pub port: String, pub port: String,
@@ -265,6 +274,23 @@ pub struct SourcesConfig {
pub prs10: Prs10Config, pub prs10: Prs10Config,
} }
#[derive(Serialize, Deserialize, Clone)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum SourceConfig {
Chrony(ChronyConfig),
Hwmon(HwmonConfig),
Uccm(UCCMConfig),
Gpsd(GpsdConfig),
Prs10(Prs10Config),
}
#[derive(Serialize, Deserialize, Clone)]
pub struct NamedSourceConfig {
pub name: String,
#[serde(flatten)]
pub source: SourceConfig,
}
#[derive(Serialize, Deserialize, Clone, Default)] #[derive(Serialize, Deserialize, Clone, Default)]
pub struct TargetsConfig { pub struct TargetsConfig {
pub chrony: ChronySockConfig, pub chrony: ChronySockConfig,
@@ -273,10 +299,19 @@ pub struct TargetsConfig {
#[derive(Serialize, Deserialize, Clone, Default)] #[derive(Serialize, Deserialize, Clone, Default)]
pub struct Config { pub struct Config {
pub influxdb: InfluxConfig, pub influxdb: InfluxConfig,
pub sources: SourcesConfig, pub sources: Vec<NamedSourceConfig>,
pub targets: TargetsConfig, pub targets: TargetsConfig,
} }
impl Provider for Config {
fn metadata(&self) -> figment::Metadata {
figment::Metadata::named("Default config")
}
fn data(&self) -> Result<Map<figment::Profile, figment::value::Dict>, figment::Error> {
Serialized::defaults(Config::default()).data()
}
}
pub fn load_config(filename: &Path) -> Figment { pub fn load_config(filename: &Path) -> Figment {
Figment::from(Serialized::defaults(Config::default())).merge(Toml::file(filename)) Figment::from(Serialized::defaults(Config::default())).merge(Toml::file(filename))
} }
@@ -317,6 +352,8 @@ pub type ChimemonTargetChannel = Receiver<ChimemonMessage>;
#[async_trait] #[async_trait]
pub trait ChimemonSource { pub trait ChimemonSource {
type Config;
fn new(name: &str, config: Self::Config) -> Self;
async fn run(self, chan: ChimemonSourceChannel); async fn run(self, chan: ChimemonSourceChannel);
} }

View File

@@ -1,13 +1,13 @@
use clap::{Parser, ValueEnum}; use clap::{Parser, ValueEnum};
use figment::{
Figment,
providers::{Format, Toml},
};
use futures::future::join_all; use futures::future::join_all;
use std::path::Path; use std::path::Path;
use tokio::sync::broadcast; use tokio::{sync::broadcast, task::JoinHandle};
use tracing::{Instrument, debug, error, info, info_span, warn}; use tracing::{Instrument, debug, error, info, info_span, warn};
use tracing_subscriber::{ use tracing_subscriber::{self, EnvFilter, fmt::format::FmtSpan};
self, EnvFilter,
fmt::format::{self, FmtSpan},
prelude::*,
};
use chimemon::*; use chimemon::*;
use sources::{ use sources::{
@@ -36,6 +36,43 @@ struct Args {
log_level: Level, log_level: Level,
} }
fn run_source(config: NamedSourceConfig, chan: ChimemonSourceChannel) -> Option<JoinHandle<()>> {
let NamedSourceConfig { name, source } = config;
match source {
SourceConfig::Chrony(source_config) if source_config.enabled => {
let c = ChronyClient::new(&name, source_config);
Some(tokio::spawn(
c.run(chan).instrument(info_span!("chrony-task")),
))
}
SourceConfig::Gpsd(source_config) if source_config.enabled => {
let c = GpsdSource::new(&name, source_config);
Some(tokio::spawn(
c.run(chan).instrument(info_span!("gpsd-task")),
))
}
SourceConfig::Hwmon(source_config) if source_config.enabled => {
let c = HwmonSource::new(&name, source_config);
Some(tokio::spawn(
c.run(chan).instrument(info_span!("hwmon-task")),
))
}
SourceConfig::Prs10(source_config) if source_config.enabled => {
let c = Prs10Monitor::new(&name, source_config);
Some(tokio::spawn(
c.run(chan).instrument(info_span!("prs10-task")),
))
}
SourceConfig::Uccm(source_config) if source_config.enabled => {
let c = UCCMMonitor::new(&name, source_config);
Some(tokio::spawn(
c.run(chan.clone()).instrument(info_span!("uccm-task")),
))
}
_ => None,
}
}
#[tokio::main(flavor = "current_thread")] #[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error>> { async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt() tracing_subscriber::fmt()
@@ -47,7 +84,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args = Args::parse(); let args = Args::parse();
info!("{PROGRAM_NAME} v{VERSION} starting..."); info!("{PROGRAM_NAME} v{VERSION} starting...");
let fig = load_config(Path::new(&args.config_file)); let fig = Figment::new()
.merge(Config::default())
.merge(Toml::file(&args.config_file));
debug!("{fig:?}"); debug!("{fig:?}");
let config: Config = fig.extract()?; let config: Config = fig.extract()?;
@@ -88,66 +127,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
)); ));
} }
let chrony = if config.sources.chrony.enabled { for source in config.sources {
Some(ChronyClient::new(config.to_owned())) if let Some(task) = run_source(source, sourcechan.clone()) {
} else { tasks.push(task)
None }
};
if let Some(c) = chrony {
tasks.push(tokio::spawn(
c.run(sourcechan.clone())
.instrument(info_span!("chrony-task")),
));
};
let hwmon = if config.sources.hwmon.enabled {
Some(HwmonSource::new(config.to_owned()))
} else {
None
};
if let Some(hwmon) = hwmon {
tasks.push(tokio::spawn(
hwmon
.run(sourcechan.clone())
.instrument(info_span!("hwmon-task")),
));
};
let uccm = if config.sources.uccm.enabled {
Some(UCCMMonitor::new(config.to_owned()))
} else {
None
};
if let Some(uccm) = uccm {
tasks.push(tokio::spawn(
uccm.run(sourcechan.clone())
.instrument(info_span!("uccm-task")),
));
};
let gpsd = if config.sources.gpsd.enabled {
Some(GpsdSource::new(config.to_owned()).await.unwrap())
} else {
None
};
if let Some(gpsd) = gpsd {
tasks.push(tokio::spawn(
gpsd.run(sourcechan.clone())
.instrument(info_span!("gpsd-task")),
))
}
let prs10 = if config.sources.prs10.enabled {
Some(Prs10Monitor::new(config.sources.prs10))
} else {
None
};
if let Some(prs10) = prs10 {
tasks.push(tokio::spawn(
prs10
.run(sourcechan.clone())
.instrument(info_span!("prs10-task")),
))
} }
let chrony_refclock = if config.targets.chrony.enabled { let chrony_refclock = if config.targets.chrony.enabled {

View File

@@ -1,26 +1,28 @@
use crate::{ use std::net::{SocketAddr, ToSocketAddrs};
ChimemonSource, ChimemonSourceChannel, Config, SourceMetric, SourceReport, SourceReportDetails, use std::sync::Arc;
SourceStatus,
};
use async_trait::async_trait; use async_trait::async_trait;
use chrony_candm::reply::{self, ReplyBody, SourceMode}; use chrony_candm::reply::{self, ReplyBody, SourceMode};
use chrony_candm::request::{self, RequestBody}; use chrony_candm::request::{self, RequestBody};
use chrony_candm::{ClientOptions, blocking_query}; use chrony_candm::{ClientOptions, blocking_query};
use std::net::{SocketAddr, ToSocketAddrs};
use std::sync::Arc;
use std::time::Duration;
use tokio::join; use tokio::join;
use tracing::{info, warn}; use tracing::{info, warn};
use crate::{
ChimemonSource, ChimemonSourceChannel, ChronyConfig, MetricTags, SourceMetric, SourceReport,
SourceReportDetails, SourceStatus,
};
pub struct ChronyClient { pub struct ChronyClient {
pub server: SocketAddr, pub server: SocketAddr,
pub name: String,
client_options: ClientOptions, client_options: ClientOptions,
config: Config, config: ChronyConfig,
} }
#[derive(Debug)] #[derive(Debug)]
pub struct ChronyTrackingReport { pub struct ChronyTrackingReport {
tags: Arc<Vec<(String, String)>>, tags: Arc<MetricTags>,
pub ref_id: i64, pub ref_id: i64,
pub ref_ip_addr: String, pub ref_ip_addr: String,
pub stratum: i64, pub stratum: i64,
@@ -78,9 +80,9 @@ impl SourceReportDetails for ChronySourcesReport {
for source in &self.sources { for source in &self.sources {
let tags = Arc::new(vec![ let tags = Arc::new(vec![
("ref_id".to_owned(), source.ip_addr.to_string()), ("ref_id", source.ip_addr.to_string()),
( (
"mode".to_owned(), "mode",
match source.mode { match source.mode {
SourceMode::Client => String::from("server"), SourceMode::Client => String::from("server"),
SourceMode::Peer => String::from("peer"), SourceMode::Peer => String::from("peer"),
@@ -88,7 +90,7 @@ impl SourceReportDetails for ChronySourcesReport {
}, },
), ),
( (
"state".to_owned(), "state",
match source.state { match source.state {
reply::SourceState::Selected => String::from("best"), reply::SourceState::Selected => String::from("best"),
reply::SourceState::NonSelectable => String::from("unusable"), reply::SourceState::NonSelectable => String::from("unusable"),
@@ -129,7 +131,7 @@ impl SourceReportDetails for ChronySourcesReport {
fn report_from_tracking( fn report_from_tracking(
t: &reply::Tracking, t: &reply::Tracking,
config: &Config, config: &ChronyConfig,
) -> Result<ChronyTrackingReport, Box<dyn std::error::Error>> { ) -> Result<ChronyTrackingReport, Box<dyn std::error::Error>> {
let report = ChronyTrackingReport { let report = ChronyTrackingReport {
tags: Arc::new(vec![]), //TODO: allow configuring tags in the source tags: Arc::new(vec![]), //TODO: allow configuring tags in the source
@@ -151,25 +153,6 @@ fn report_from_tracking(
} }
impl ChronyClient { impl ChronyClient {
pub fn new(config: Config) -> Self {
let server = config
.sources
.chrony
.host
.to_socket_addrs()
.unwrap()
.next()
.expect("Unable to parse host:port:");
let client_options = ClientOptions {
n_tries: 3,
timeout: config.sources.chrony.timeout,
};
ChronyClient {
server,
client_options,
config,
}
}
async fn query(&self, request: RequestBody) -> Result<reply::Reply, std::io::Error> { async fn query(&self, request: RequestBody) -> Result<reply::Reply, std::io::Error> {
let server = self.server; let server = self.server;
let client_options = self.client_options; let client_options = self.client_options;
@@ -265,7 +248,7 @@ impl ChronyClient {
let tracking_data = report_from_tracking(&tracking, &self.config)?; let tracking_data = report_from_tracking(&tracking, &self.config)?;
let report = SourceReport { let report = SourceReport {
name: "chrony-tracking".to_owned(), name: self.name.clone(),
status: SourceStatus::Unknown, status: SourceStatus::Unknown,
details: Arc::new(tracking_data), details: Arc::new(tracking_data),
}; };
@@ -283,7 +266,7 @@ impl ChronyClient {
let sources = self.get_sources().await?; let sources = self.get_sources().await?;
let details = ChronySourcesReport { sources }; let details = ChronySourcesReport { sources };
let report = SourceReport { let report = SourceReport {
name: "chrony-sources".to_owned(), name: self.name.clone(),
status: SourceStatus::Unknown, status: SourceStatus::Unknown,
details: Arc::new(details), details: Arc::new(details),
}; };
@@ -295,11 +278,30 @@ impl ChronyClient {
#[async_trait] #[async_trait]
impl ChimemonSource for ChronyClient { impl ChimemonSource for ChronyClient {
type Config = ChronyConfig;
fn new(name: &str, config: Self::Config) -> Self {
let server = config
.host
.to_socket_addrs()
.unwrap()
.next()
.expect("Unable to parse host:port:");
let client_options = ClientOptions {
n_tries: 3,
timeout: config.timeout,
};
ChronyClient {
name: name.to_owned(),
server,
client_options,
config,
}
}
async fn run(self, chan: ChimemonSourceChannel) { async fn run(self, chan: ChimemonSourceChannel) {
info!("Chrony task started"); info!("Chrony task started");
let mut t_interval = tokio::time::interval(self.config.sources.chrony.tracking_interval); let mut t_interval = tokio::time::interval(self.config.tracking_interval);
let mut s_interval = tokio::time::interval(self.config.sources.chrony.sources_interval); let mut s_interval = tokio::time::interval(self.config.sources_interval);
let t_future = async { let t_future = async {
let lchan = chan.clone(); let lchan = chan.clone();

View File

@@ -1,8 +1,3 @@
use crate::{
ChimemonMessage, ChimemonSource, ChimemonSourceChannel, Config, SourceMetric, SourceReport,
SourceReportDetails, SourceStatus,
};
use std::collections::HashMap; use std::collections::HashMap;
use std::f64; use std::f64;
use std::fmt::Debug; use std::fmt::Debug;
@@ -12,8 +7,7 @@ use std::time::Duration;
use async_trait::async_trait; use async_trait::async_trait;
use backoff::ExponentialBackoff; use backoff::ExponentialBackoff;
use futures::StreamExt; use futures::{SinkExt, Stream, StreamExt};
use futures::{SinkExt, Stream};
use gpsd_proto::{Device, Gst, Mode, Pps, Sky, Tpv, UnifiedResponse, Version}; use gpsd_proto::{Device, Gst, Mode, Pps, Sky, Tpv, UnifiedResponse, Version};
use serde::Serialize; use serde::Serialize;
use serde_json; use serde_json;
@@ -22,8 +16,14 @@ use tokio::time::{interval, timeout};
use tokio_util::codec::{Framed, LinesCodec}; use tokio_util::codec::{Framed, LinesCodec};
use tracing::{debug, debug_span, info, instrument, warn}; use tracing::{debug, debug_span, info, instrument, warn};
use crate::{
ChimemonMessage, ChimemonSource, ChimemonSourceChannel, GpsdConfig, SourceMetric, SourceReport,
SourceReportDetails, SourceStatus,
};
pub struct GpsdSource { pub struct GpsdSource {
pub config: Config, pub name: String,
pub config: GpsdConfig,
conn: GpsdTransport, conn: GpsdTransport,
devices: HashMap<String, Device>, devices: HashMap<String, Device>,
last_gst: Option<Gst>, last_gst: Option<Gst>,
@@ -76,19 +76,20 @@ impl SourceReportDetails for GpsdSourceReport {
self.fix_type != GpsdFixType::Unknown && self.fix_type != GpsdFixType::NoFix self.fix_type != GpsdFixType::Unknown && self.fix_type != GpsdFixType::NoFix
} }
fn to_metrics(&self) -> Vec<SourceMetric> { fn to_metrics(&self) -> Vec<SourceMetric> {
let no_tags = Arc::new(vec![]); let tags = Arc::new(vec![]);
vec![ vec![
SourceMetric::new_int("sats_visible", self.sats_visible as i64, no_tags.clone()), SourceMetric::new_int("sats_visible", self.sats_visible as i64, tags.clone()),
SourceMetric::new_int("sats_tracked", self.sats_tracked as i64, no_tags.clone()), SourceMetric::new_int("sats_tracked", self.sats_tracked as i64, tags.clone()),
SourceMetric::new_float("tdop", self.tdop, no_tags.clone()), SourceMetric::new_float("tdop", self.tdop, tags.clone()),
] ]
} }
} }
impl GpsdSource { impl GpsdSource {
pub async fn new(config: Config) -> Result<Self, std::io::Error> { async fn inner_new(name: &str, config: GpsdConfig) -> Result<Self, std::io::Error> {
let conn = GpsdTransport::new(&config.sources.gpsd.host).await?; let conn = GpsdTransport::new(&config.host).await?;
Ok(Self { Ok(Self {
name: name.to_owned(),
config, config,
conn, conn,
devices: HashMap::new(), devices: HashMap::new(),
@@ -98,9 +99,6 @@ impl GpsdSource {
last_sky: None, last_sky: None,
}) })
} }
}
impl GpsdSource {
async fn send_status(&self, chan: &mut ChimemonSourceChannel) { async fn send_status(&self, chan: &mut ChimemonSourceChannel) {
let sky = self.last_sky.as_ref(); let sky = self.last_sky.as_ref();
let tpv = self.last_tpv.as_ref(); let tpv = self.last_tpv.as_ref();
@@ -118,7 +116,7 @@ impl GpsdSource {
.map_or(f64::INFINITY, |tdop| tdop as f64); .map_or(f64::INFINITY, |tdop| tdop as f64);
chan.send(ChimemonMessage::SourceReport(SourceReport { chan.send(ChimemonMessage::SourceReport(SourceReport {
name: "gpsd".into(), name: self.name.clone(),
status: SourceStatus::Unknown, status: SourceStatus::Unknown,
details: Arc::new(GpsdSourceReport { details: Arc::new(GpsdSourceReport {
fix_type: tpv.map_or(GpsdFixType::Unknown, |tpv| tpv.mode.into()), fix_type: tpv.map_or(GpsdFixType::Unknown, |tpv| tpv.mode.into()),
@@ -163,10 +161,19 @@ impl GpsdSource {
#[async_trait] #[async_trait]
impl ChimemonSource for GpsdSource { impl ChimemonSource for GpsdSource {
type Config = GpsdConfig;
fn new(name: &str, config: Self::Config) -> Self {
// TODO: refactor so this mess isn't necessary
// Should do async setup at the start of run(), not here
let rt = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();
rt.block_on(Self::inner_new(name, config)).unwrap()
}
async fn run(mut self, mut chan: ChimemonSourceChannel) { async fn run(mut self, mut chan: ChimemonSourceChannel) {
info!("gpsd task started"); info!("gpsd task started");
self.conn.conn().await.unwrap(); self.conn.conn().await.unwrap();
let mut ticker = interval(self.config.sources.gpsd.interval); let mut ticker = interval(self.config.interval);
let mut params = WatchParams::default(); let mut params = WatchParams::default();
params.json = Some(true); params.json = Some(true);

View File

@@ -1,13 +1,15 @@
use crate::{ use std::{fs::File, io::Read, path::PathBuf, sync::Arc};
ChimemonSource, ChimemonSourceChannel, Config, SourceMetric, SourceReport, SourceReportDetails,
SourceStatus,
};
use async_trait::async_trait; use async_trait::async_trait;
use std::{fs::File, io::Read, path::PathBuf, sync::Arc, time::Duration};
use tracing::{debug, error, info, warn}; use tracing::{debug, error, info, warn};
use crate::{
ChimemonSource, ChimemonSourceChannel, HwmonConfig, MetricTags, SourceMetric, SourceReport,
SourceReportDetails, SourceStatus,
};
pub struct HwmonSource { pub struct HwmonSource {
config: Config, name: String,
config: HwmonConfig,
sensors: Vec<Arc<HwmonSensor>>, sensors: Vec<Arc<HwmonSensor>>,
} }
@@ -18,7 +20,7 @@ struct HwmonSensor {
device: String, device: String,
sensor: String, sensor: String,
label: Option<String>, label: Option<String>,
tags: Arc<Vec<(String, String)>>, tags: Arc<MetricTags>,
} }
impl HwmonSensor { impl HwmonSensor {
@@ -40,12 +42,12 @@ impl HwmonSensor {
None None
}; };
let mut tags_vec = vec![ let mut tags_vec = vec![
("name".to_owned(), name.to_owned()), ("name", name.to_owned()),
("device".to_owned(), device.to_owned()), ("device", device.to_owned()),
("sensor".to_owned(), sensor.to_owned()), ("sensor", sensor.to_owned()),
]; ];
if let Some(label) = &label { if let Some(label) = &label {
tags_vec.push(("label".to_owned(), label.clone())) tags_vec.push(("label", label.clone()))
} }
Self { Self {
value_path, value_path,
@@ -92,18 +94,6 @@ impl SourceReportDetails for HwmonReport {
const HWMON_ROOT: &str = "/sys/class/hwmon"; const HWMON_ROOT: &str = "/sys/class/hwmon";
impl HwmonSource { impl HwmonSource {
pub fn new(config: Config) -> Self {
let sensors = config
.sources
.hwmon
.sensors
.iter()
.map(|(k, v)| Arc::new(HwmonSensor::new(k, &v.name, &v.sensor)))
.collect();
HwmonSource { config, sensors }
}
async fn get_raw_value(sensor: &HwmonSensor) -> Result<String, std::io::Error> { async fn get_raw_value(sensor: &HwmonSensor) -> Result<String, std::io::Error> {
tokio::fs::read_to_string(&sensor.value_path).await tokio::fs::read_to_string(&sensor.value_path).await
} }
@@ -111,9 +101,23 @@ impl HwmonSource {
#[async_trait] #[async_trait]
impl ChimemonSource for HwmonSource { impl ChimemonSource for HwmonSource {
type Config = HwmonConfig;
fn new(name: &str, config: Self::Config) -> Self {
let sensors = config
.sensors
.iter()
.map(|(k, v)| Arc::new(HwmonSensor::new(k, &v.name, &v.sensor)))
.collect();
HwmonSource {
name: name.to_owned(),
config,
sensors,
}
}
async fn run(self, chan: ChimemonSourceChannel) { async fn run(self, chan: ChimemonSourceChannel) {
info!("hwmon task started"); info!("hwmon task started");
let mut interval = tokio::time::interval(self.config.sources.hwmon.interval); let mut interval = tokio::time::interval(self.config.interval);
loop { loop {
interval.tick().await; interval.tick().await;
let mut values = Vec::new(); let mut values = Vec::new();
@@ -137,7 +141,7 @@ impl ChimemonSource for HwmonSource {
} }
} }
let report = SourceReport { let report = SourceReport {
name: "hwmon".to_owned(), name: self.name.clone(),
status: SourceStatus::Healthy, status: SourceStatus::Healthy,
details: Arc::new(HwmonReport { values }), details: Arc::new(HwmonReport { values }),
}; };

View File

@@ -2,10 +2,6 @@ use std::any::type_name;
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc; use std::sync::Arc;
use crate::{
ChimemonMessage, ChimemonSource, ChimemonSourceChannel, Prs10Config, SourceMetric,
SourceReport, SourceReportDetails, SourceStatus,
};
use async_trait::async_trait; use async_trait::async_trait;
use bitflags::bitflags; use bitflags::bitflags;
use itertools::Itertools; use itertools::Itertools;
@@ -13,10 +9,14 @@ use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, ReadHalf, WriteHalf};
use tokio::select; use tokio::select;
use tokio::sync::OnceCell; use tokio::sync::OnceCell;
use tokio::time::{interval, timeout}; use tokio::time::{interval, timeout};
use tokio_serial;
use tokio_serial::{SerialPort, SerialStream}; use tokio_serial::{SerialPort, SerialStream};
use tracing::{debug, debug_span, error, info, instrument, warn}; use tracing::{debug, debug_span, error, info, instrument, warn};
use crate::{
ChimemonMessage, ChimemonSource, ChimemonSourceChannel, MetricTags, Prs10Config, SourceMetric,
SourceReport, SourceReportDetails, SourceStatus,
};
#[derive(Debug)] #[derive(Debug)]
pub struct Prs10Info { pub struct Prs10Info {
pub model: String, pub model: String,
@@ -54,7 +54,7 @@ bitflags! {
} }
impl Prs10PowerLampFlags { impl Prs10PowerLampFlags {
pub fn get_metrics(&self, no_tags: Arc<Vec<(String, String)>>) -> Vec<SourceMetric> { pub fn get_metrics(&self, tags: Arc<MetricTags>) -> Vec<SourceMetric> {
// Define the mapping statically // Define the mapping statically
const FLAG_LABELS: [(&Prs10PowerLampFlags, &str); 8] = [ const FLAG_LABELS: [(&Prs10PowerLampFlags, &str); 8] = [
(&Prs10PowerLampFlags::ELEC_VOLTAGE_LOW, "elec_voltage_low"), (&Prs10PowerLampFlags::ELEC_VOLTAGE_LOW, "elec_voltage_low"),
@@ -72,7 +72,7 @@ impl Prs10PowerLampFlags {
.iter() .iter()
.map(|(flag, label)| { .map(|(flag, label)| {
// We track whether each flag is set (true) or not (false) // We track whether each flag is set (true) or not (false)
SourceMetric::new_bool(*label, self.contains(**flag), no_tags.clone()) SourceMetric::new_bool(*label, self.contains(**flag), tags.clone())
}) })
.collect() .collect()
} }
@@ -180,17 +180,17 @@ impl SourceReportDetails for Prs10Status {
} }
fn to_metrics(&self) -> Vec<SourceMetric> { fn to_metrics(&self) -> Vec<SourceMetric> {
let no_tags = Arc::new(vec![]); let tags = Arc::new(vec![]);
vec![ vec![
SourceMetric::new_int( SourceMetric::new_int(
"volt_lamp_flags", "volt_lamp_flags",
self.volt_lamp_flags.bits() as i64, self.volt_lamp_flags.bits() as i64,
no_tags.clone(), tags.clone(),
), ),
SourceMetric::new_int("rf_flags", self.rf_flags.bits() as i64, no_tags.clone()), SourceMetric::new_int("rf_flags", self.rf_flags.bits() as i64, tags.clone()),
SourceMetric::new_int("temp_flags", self.temp_flags.bits() as i64, no_tags.clone()), SourceMetric::new_int("temp_flags", self.temp_flags.bits() as i64, tags.clone()),
SourceMetric::new_int("fll_flags", self.fll_flags.bits() as i64, no_tags.clone()), SourceMetric::new_int("fll_flags", self.fll_flags.bits() as i64, tags.clone()),
SourceMetric::new_int("pps_flags", self.pps_flags.bits() as i64, no_tags.clone()), SourceMetric::new_int("pps_flags", self.pps_flags.bits() as i64, tags.clone()),
// system flags are kind of useless because we can't guarantee they get upstreamed and will only appear once since they are 'event flags' // system flags are kind of useless because we can't guarantee they get upstreamed and will only appear once since they are 'event flags'
] ]
} }
@@ -254,104 +254,69 @@ impl SourceReportDetails for Prs10Stats {
true true
} }
fn to_metrics(&self) -> Vec<SourceMetric> { fn to_metrics(&self) -> Vec<SourceMetric> {
let no_tags = Arc::new(vec![]); let tags = Arc::new(vec![]);
vec![ vec![
// Integer Metrics // Integer Metrics
SourceMetric::new_int("ocxo_efc", self.ocxo_efc as i64, no_tags.clone()), SourceMetric::new_int("ocxo_efc", self.ocxo_efc as i64, tags.clone()),
// Float Metrics // Float Metrics
SourceMetric::new_float( SourceMetric::new_float("error_signal_volts", self.error_signal_volts, tags.clone()),
"error_signal_volts",
self.error_signal_volts,
no_tags.clone(),
),
SourceMetric::new_float( SourceMetric::new_float(
"detect_signal_volts", "detect_signal_volts",
self.detect_signal_volts, self.detect_signal_volts,
no_tags.clone(), tags.clone(),
), ),
SourceMetric::new_float("heat_volts", self.heat_volts, no_tags.clone()), SourceMetric::new_float("heat_volts", self.heat_volts, tags.clone()),
SourceMetric::new_float("elec_volts", self.elec_volts, no_tags.clone()), SourceMetric::new_float("elec_volts", self.elec_volts, tags.clone()),
SourceMetric::new_float( SourceMetric::new_float(
"lamp_fet_drain_volts", "lamp_fet_drain_volts",
self.lamp_fet_drain_volts, self.lamp_fet_drain_volts,
no_tags.clone(), tags.clone(),
), ),
SourceMetric::new_float( SourceMetric::new_float(
"lamp_fet_gate_volts", "lamp_fet_gate_volts",
self.lamp_fet_gate_volts, self.lamp_fet_gate_volts,
no_tags.clone(), tags.clone(),
), ),
SourceMetric::new_float("ocxo_heat_volts", self.ocxo_heat_volts, no_tags.clone()), SourceMetric::new_float("ocxo_heat_volts", self.ocxo_heat_volts, tags.clone()),
SourceMetric::new_float("cell_heat_volts", self.cell_heat_volts, no_tags.clone()), SourceMetric::new_float("cell_heat_volts", self.cell_heat_volts, tags.clone()),
SourceMetric::new_float("lamp_heat_volts", self.lamp_heat_volts, no_tags.clone()), SourceMetric::new_float("lamp_heat_volts", self.lamp_heat_volts, tags.clone()),
SourceMetric::new_float("rb_photo", self.rb_photo, no_tags.clone()), SourceMetric::new_float("rb_photo", self.rb_photo, tags.clone()),
SourceMetric::new_float("rb_photo_iv", self.rb_photo_iv, no_tags.clone()), SourceMetric::new_float("rb_photo_iv", self.rb_photo_iv, tags.clone()),
SourceMetric::new_float("case_temp", self.case_temp, no_tags.clone()), SourceMetric::new_float("case_temp", self.case_temp, tags.clone()),
SourceMetric::new_float("ocxo_therm", self.ocxo_therm, no_tags.clone()), SourceMetric::new_float("ocxo_therm", self.ocxo_therm, tags.clone()),
SourceMetric::new_float("cell_therm", self.cell_therm, no_tags.clone()), SourceMetric::new_float("cell_therm", self.cell_therm, tags.clone()),
SourceMetric::new_float("lamp_therm", self.lamp_therm, no_tags.clone()), SourceMetric::new_float("lamp_therm", self.lamp_therm, tags.clone()),
SourceMetric::new_float("ext_cal_volts", self.ext_cal_volts, no_tags.clone()), SourceMetric::new_float("ext_cal_volts", self.ext_cal_volts, tags.clone()),
SourceMetric::new_float("analog_gnd_volts", self.analog_gnd_volts, no_tags.clone()), SourceMetric::new_float("analog_gnd_volts", self.analog_gnd_volts, tags.clone()),
SourceMetric::new_float( SourceMetric::new_float(
"if_vco_varactor_volts", "if_vco_varactor_volts",
self.if_vco_varactor_volts, self.if_vco_varactor_volts,
no_tags.clone(), tags.clone(),
), ),
SourceMetric::new_float( SourceMetric::new_float(
"op_vco_varactor_volts", "op_vco_varactor_volts",
self.op_vco_varactor_volts, self.op_vco_varactor_volts,
no_tags.clone(), tags.clone(),
), ),
SourceMetric::new_float( SourceMetric::new_float("mul_amp_gain_volts", self.mul_amp_gain_volts, tags.clone()),
"mul_amp_gain_volts", SourceMetric::new_float("rf_lock_volts", self.rf_lock_volts, tags.clone()),
self.mul_amp_gain_volts,
no_tags.clone(),
),
SourceMetric::new_float("rf_lock_volts", self.rf_lock_volts, no_tags.clone()),
// U16 Metrics (optional, but can be treated as integers) // U16 Metrics (optional, but can be treated as integers)
SourceMetric::new_int( SourceMetric::new_int("freq_offset_ppt", self.freq_offset_ppt as i64, tags.clone()),
"freq_offset_ppt", SourceMetric::new_int("mag_efc", self.mag_efc as i64, tags.clone()),
self.freq_offset_ppt as i64,
no_tags.clone(),
),
SourceMetric::new_int("mag_efc", self.mag_efc as i64, no_tags.clone()),
] ]
} }
} }
#[derive(Debug)] #[derive(Debug)]
pub struct Prs10Monitor { pub struct Prs10Monitor {
name: String,
config: Prs10Config,
rx: ReadHalf<SerialStream>, rx: ReadHalf<SerialStream>,
tx: WriteHalf<SerialStream>, tx: WriteHalf<SerialStream>,
info: OnceCell<Prs10Info>, info: OnceCell<Prs10Info>,
config: Prs10Config,
} }
impl Prs10Monitor { impl Prs10Monitor {
pub fn new(config: Prs10Config) -> Self {
let builder = tokio_serial::new(&config.port, config.baud)
.timeout(config.timeout)
.data_bits(tokio_serial::DataBits::Eight)
.parity(tokio_serial::Parity::None)
.stop_bits(tokio_serial::StopBits::One)
.flow_control(tokio_serial::FlowControl::None);
let mut port = SerialStream::open(&builder).expect("Must be able to open serial port");
port.set_exclusive(true).expect("Can't lock serial port");
info!(
"Opened serial port {}@{}",
port.name().unwrap(),
port.baud_rate().unwrap()
);
let (rx, tx) = tokio::io::split(port);
Self {
rx,
tx,
config,
info: OnceCell::new(),
}
}
pub fn info(&self) -> &Prs10Info { pub fn info(&self) -> &Prs10Info {
self.info.get().expect("info() used before run()") self.info.get().expect("info() used before run()")
} }
@@ -450,7 +415,7 @@ impl Prs10Monitor {
async fn status_poll(&mut self) -> Result<ChimemonMessage, Box<dyn std::error::Error>> { async fn status_poll(&mut self) -> Result<ChimemonMessage, Box<dyn std::error::Error>> {
let status = self.get_status().await?; let status = self.get_status().await?;
Ok(ChimemonMessage::SourceReport(SourceReport { Ok(ChimemonMessage::SourceReport(SourceReport {
name: "prs10".into(), name: self.name.clone(),
status: if status.is_healthy() { status: if status.is_healthy() {
SourceStatus::Healthy SourceStatus::Healthy
} else { } else {
@@ -480,7 +445,7 @@ impl Prs10Monitor {
drop(stats_guard); drop(stats_guard);
Ok(ChimemonMessage::SourceReport(SourceReport { Ok(ChimemonMessage::SourceReport(SourceReport {
name: "prs10".into(), name: self.name.clone(),
status: SourceStatus::Unknown, status: SourceStatus::Unknown,
details: Arc::new(Prs10Stats { details: Arc::new(Prs10Stats {
ocxo_efc, ocxo_efc,
@@ -514,10 +479,35 @@ impl Prs10Monitor {
#[async_trait] #[async_trait]
impl ChimemonSource for Prs10Monitor { impl ChimemonSource for Prs10Monitor {
type Config = Prs10Config;
fn new(name: &str, config: Self::Config) -> Self {
let builder = tokio_serial::new(&config.port, config.baud)
.timeout(config.timeout)
.data_bits(tokio_serial::DataBits::Eight)
.parity(tokio_serial::Parity::None)
.stop_bits(tokio_serial::StopBits::One)
.flow_control(tokio_serial::FlowControl::None);
let mut port = SerialStream::open(&builder).expect("Must be able to open serial port");
port.set_exclusive(true).expect("Can't lock serial port");
info!(
"Opened serial port {}@{}",
port.name().unwrap(),
port.baud_rate().unwrap()
);
let (rx, tx) = tokio::io::split(port);
Self {
name: name.to_owned(),
config,
rx,
tx,
info: OnceCell::new(),
}
}
async fn run(mut self, chan: ChimemonSourceChannel) { async fn run(mut self, chan: ChimemonSourceChannel) {
info!("PRS10 task starting"); info!("PRS10 task starting");
if let Err(e) = self.set_info().await { if let Err(e) = self.set_info().await {
warn!("Error starting PRS10: {e:?}"); error!("Error starting PRS10: {e:?}");
return; return;
} }
info!( info!(

View File

@@ -1,7 +1,7 @@
use crate::{ use std::io::{BufRead, Cursor};
ChimemonMessage, ChimemonSource, ChimemonSourceChannel, Config, SourceMetric, use std::str;
SourceReportDetails, TimeReport, use std::sync::Arc;
};
use async_trait::async_trait; use async_trait::async_trait;
use bitflags::bitflags; use bitflags::bitflags;
use byteorder::{BigEndian, ReadBytesExt}; use byteorder::{BigEndian, ReadBytesExt};
@@ -11,9 +11,6 @@ use figment::value::Map;
use influxdb2::models::DataPoint; use influxdb2::models::DataPoint;
use influxdb2::models::data_point::DataPointBuilder; use influxdb2::models::data_point::DataPointBuilder;
use itertools::Itertools; use itertools::Itertools;
use std::io::{BufRead, Cursor};
use std::str;
use std::sync::Arc;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, ReadHalf, WriteHalf}; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, ReadHalf, WriteHalf};
use tokio::join; use tokio::join;
use tokio::sync::Mutex; use tokio::sync::Mutex;
@@ -21,6 +18,11 @@ use tokio::time::sleep;
use tokio_serial::{SerialPort, SerialStream}; use tokio_serial::{SerialPort, SerialStream};
use tracing::{debug, info, warn}; use tracing::{debug, info, warn};
use crate::{
ChimemonMessage, ChimemonSource, ChimemonSourceChannel, SourceMetric, SourceReportDetails,
TimeReport, UCCMConfig,
};
pub const GPS_EPOCH: i64 = 315964800; // Doesn't seem possible to have a const DateTime object pub const GPS_EPOCH: i64 = 315964800; // Doesn't seem possible to have a const DateTime object
pub type UccmEndian = BigEndian; pub type UccmEndian = BigEndian;
@@ -32,11 +34,11 @@ pub enum UCCMMonitorParseState {
} }
pub struct UCCMMonitor { pub struct UCCMMonitor {
// pub port: SerialStream, pub name: String,
config: UCCMConfig,
rx: ReadHalf<SerialStream>, rx: ReadHalf<SerialStream>,
tx: WriteHalf<SerialStream>, tx: WriteHalf<SerialStream>,
pub info: Option<UCCMInfo>, pub info: Option<UCCMInfo>,
config: Config,
} }
#[derive(Debug)] #[derive(Debug)]
@@ -399,30 +401,6 @@ impl TryFrom<&str> for UCCMGPSSatsReport {
} }
impl UCCMMonitor { impl UCCMMonitor {
pub fn new(config: Config) -> Self {
let builder = tokio_serial::new(&config.sources.uccm.port, config.sources.uccm.baud)
.timeout(config.sources.uccm.timeout)
.data_bits(tokio_serial::DataBits::Eight)
.parity(tokio_serial::Parity::None)
.stop_bits(tokio_serial::StopBits::One)
.flow_control(tokio_serial::FlowControl::None);
let mut port = SerialStream::open(&builder).expect("Must be able to open serial port");
port.set_exclusive(true).expect("Can't lock serial port");
info!(
"Opened serial port {}@{}",
port.name().unwrap(),
port.baud_rate().unwrap()
);
let (rx, tx) = tokio::io::split(port);
UCCMMonitor {
// port,
rx,
tx,
info: None,
config,
}
}
pub async fn send_cmd(&mut self, cmd: &[u8]) -> Result<String, std::io::Error> { pub async fn send_cmd(&mut self, cmd: &[u8]) -> Result<String, std::io::Error> {
debug!("cmd: `{:?}`", String::from_utf8_lossy(cmd)); debug!("cmd: `{:?}`", String::from_utf8_lossy(cmd));
self.tx.write_all(cmd).await.unwrap(); self.tx.write_all(cmd).await.unwrap();
@@ -481,7 +459,7 @@ impl UCCMMonitor {
let mut last_loop_diag: Option<UCCMLoopDiagReport> = None; let mut last_loop_diag: Option<UCCMLoopDiagReport> = None;
let mut last_gps_sats: Option<UCCMGPSSatsReport> = None; let mut last_gps_sats: Option<UCCMGPSSatsReport> = None;
let mut last_sent_report = Utc::now() - self.config.sources.uccm.status_interval; let mut last_sent_report = Utc::now() - self.config.status_interval;
loop { loop {
match tokio::io::AsyncReadExt::read_buf(&mut self.rx, &mut rdbuf).await { match tokio::io::AsyncReadExt::read_buf(&mut self.rx, &mut rdbuf).await {
@@ -525,37 +503,33 @@ impl UCCMMonitor {
})) }))
.expect("Unable to send to channel"); .expect("Unable to send to channel");
if sysnow - last_sent_report if sysnow - last_sent_report
>= Duration::from_std(self.config.sources.uccm.status_interval) >= Duration::from_std(self.config.status_interval).unwrap()
.unwrap()
{ {
let mut points = vec![ // let mut points = vec![
tod.as_builder( // tod.as_builder(&self.config.measurement, &self.config.tags)
&self.config.sources.uccm.measurement, // .build()
&self.config.influxdb.tags, // .unwrap(),
) // ];
.build() // if let Some(loop_diag) = &last_loop_diag {
.unwrap(), // points.push(
]; // loop_diag
if let Some(loop_diag) = &last_loop_diag { // .as_builder(
points.push( // &self.config.measurement,
loop_diag // &self.config.influxdb.tags,
.as_builder( // )
&self.config.sources.uccm.measurement, // .build()
&self.config.influxdb.tags, // .unwrap(),
) // )
.build() // }
.unwrap(), // if let Some(gps_sats) = &last_gps_sats {
) // points.extend(gps_sats.build(
} // &self.config.sources.uccm.measurement,
if let Some(gps_sats) = &last_gps_sats { // &self.config.influxdb.tags,
points.extend(gps_sats.build( // ));
&self.config.sources.uccm.measurement, // }
&self.config.influxdb.tags,
));
}
chan.send(ChimemonMessage::DataPoints(points)) // chan.send(ChimemonMessage::DataPoints(points))
.expect("Unable to send to channel"); // .expect("Unable to send to channel");
last_sent_report = sysnow; last_sent_report = sysnow;
} }
} }
@@ -597,6 +571,31 @@ impl UCCMMonitor {
#[async_trait] #[async_trait]
impl ChimemonSource for UCCMMonitor { impl ChimemonSource for UCCMMonitor {
type Config = UCCMConfig;
fn new(name: &str, config: Self::Config) -> Self {
let builder = tokio_serial::new(&config.port, config.baud)
.timeout(config.timeout)
.data_bits(tokio_serial::DataBits::Eight)
.parity(tokio_serial::Parity::None)
.stop_bits(tokio_serial::StopBits::One)
.flow_control(tokio_serial::FlowControl::None);
let mut port = SerialStream::open(&builder).expect("Must be able to open serial port");
port.set_exclusive(true).expect("Can't lock serial port");
info!(
"Opened serial port {}@{}",
port.name().unwrap(),
port.baud_rate().unwrap()
);
let (rx, tx) = tokio::io::split(port);
UCCMMonitor {
name: name.to_owned(),
config,
rx,
tx,
info: None,
}
}
async fn run(mut self, chan: ChimemonSourceChannel) { async fn run(mut self, chan: ChimemonSourceChannel) {
info!("UCCM task starting"); info!("UCCM task starting");
if self.get_info().await.is_err() { if self.get_info().await.is_err() {

View File

@@ -1,11 +1,13 @@
use crate::{ChimemonMessage, ChimemonTarget, ChimemonTargetChannel, ChronySockConfig};
use async_trait::async_trait;
use libc::{c_double, c_int, timeval};
use std::mem; use std::mem;
use std::os::unix::net::UnixDatagram; use std::os::unix::net::UnixDatagram;
use std::path::PathBuf; use std::path::PathBuf;
use async_trait::async_trait;
use libc::{c_double, c_int, timeval};
use tracing::debug; use tracing::debug;
use crate::{ChimemonMessage, ChimemonTarget, ChimemonTargetChannel, ChronySockConfig};
const CHRONY_MAGIC: c_int = 0x534f434b; const CHRONY_MAGIC: c_int = 0x534f434b;
pub struct ChronySockServer { pub struct ChronySockServer {