refactor config

This commit is contained in:
2026-02-04 14:25:17 -08:00
parent d31bf8d39e
commit 50894fd0e1
10 changed files with 333 additions and 369 deletions

219
src/config.rs Normal file
View File

@@ -0,0 +1,219 @@
use std::{collections::BTreeMap, path::Path};
use figment::{
Figment, Provider,
providers::{Format, Serialized, Toml},
util::map,
value::Map,
};
use gethostname::gethostname;
use serde_derive::{Deserialize, Serialize};
use serde_with::{DurationSeconds, serde_as};
#[derive(Serialize, Deserialize, Clone)]
pub struct InfluxConfig {
pub enabled: bool,
pub url: String,
pub org: String,
pub bucket: String,
pub token: String,
pub tags: Map<String, String>,
}
impl Default for InfluxConfig {
fn default() -> Self {
let host = gethostname().into_string().unwrap();
InfluxConfig {
enabled: false,
url: "http://localhost:8086".into(),
org: "default".into(),
bucket: "default".into(),
token: "".into(),
tags: map! { "host".into() => host },
}
}
}
#[serde_as]
#[derive(Serialize, Deserialize, Clone)]
#[serde(default)]
pub struct ChronyConfig {
pub enabled: bool,
#[serde_as(as = "DurationSeconds<u64>")]
pub timeout: std::time::Duration,
#[serde_as(as = "DurationSeconds<u64>")]
pub tracking_interval: std::time::Duration,
#[serde_as(as = "DurationSeconds<u64>")]
pub sources_interval: std::time::Duration,
pub measurement_prefix: String,
pub tracking_measurement: String,
pub sources_measurement: String,
pub host: String,
}
impl Default for ChronyConfig {
fn default() -> Self {
ChronyConfig {
enabled: false,
timeout: std::time::Duration::from_secs(5),
tracking_interval: std::time::Duration::from_secs(60),
sources_interval: std::time::Duration::from_secs(300),
measurement_prefix: "chrony.".into(),
tracking_measurement: "tracking".into(),
sources_measurement: "sources".into(),
host: "127.0.0.1:323".into(),
}
}
}
#[derive(Serialize, Deserialize, Clone)]
#[serde(default)]
pub struct ChronySockConfig {
pub enabled: bool,
pub sock: String,
}
impl Default for ChronySockConfig {
fn default() -> Self {
ChronySockConfig {
enabled: false,
sock: "".into(),
}
}
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct HwmonSensorConfig {
pub device: String,
pub sensor: String,
}
#[serde_as]
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(default)]
pub struct HwmonConfig {
pub enabled: bool,
#[serde_as(as = "DurationSeconds<u64>")]
pub interval: std::time::Duration,
pub measurement: String,
pub sensors: Map<String, HwmonSensorConfig>,
}
impl Default for HwmonConfig {
fn default() -> Self {
HwmonConfig {
enabled: false,
interval: std::time::Duration::from_secs(60),
measurement: "hwmon".into(),
sensors: map! {},
}
}
}
#[serde_as]
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(default)]
pub struct GpsdConfig {
pub enabled: bool,
#[serde_as(as = "DurationSeconds<u64>")]
pub interval: std::time::Duration,
pub host: String,
}
impl Default for GpsdConfig {
fn default() -> Self {
GpsdConfig {
enabled: false,
interval: std::time::Duration::from_secs(60),
host: "localhost:2947".into(),
}
}
}
#[serde_as]
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(default)]
pub struct Prs10Config {
pub enabled: bool,
pub port: String,
pub baud: u32,
#[serde_as(as = "DurationSeconds<u64>")]
pub timeout: std::time::Duration,
#[serde_as(as = "DurationSeconds<u64>")]
pub status_interval: std::time::Duration,
#[serde_as(as = "DurationSeconds<u64>")]
pub stats_interval: std::time::Duration,
}
impl Default for Prs10Config {
fn default() -> Self {
Prs10Config {
enabled: false,
port: "/dev/ttyS0".into(),
baud: 9600,
timeout: std::time::Duration::from_secs(1),
status_interval: std::time::Duration::from_secs(10),
stats_interval: std::time::Duration::from_secs(30),
}
}
}
#[serde_as]
#[derive(Serialize, Deserialize, Clone)]
#[serde(default)]
pub struct UCCMConfig {
pub enabled: bool,
pub port: String,
pub baud: u32,
#[serde_as(as = "DurationSeconds<u64>")]
pub status_interval: std::time::Duration,
#[serde_as(as = "DurationSeconds<u64>")]
pub timeout: std::time::Duration,
pub measurement: String,
}
impl Default for UCCMConfig {
fn default() -> Self {
UCCMConfig {
enabled: false,
port: "/dev/ttyS0".into(),
baud: 57600,
status_interval: std::time::Duration::from_secs(10),
timeout: std::time::Duration::from_secs(1),
measurement: "uccm_gpsdo".into(),
}
}
}
#[derive(Serialize, Deserialize, Clone)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum SourceConfig {
Chrony(ChronyConfig),
Hwmon(HwmonConfig),
Uccm(UCCMConfig),
Gpsd(GpsdConfig),
Prs10(Prs10Config),
}
#[derive(Serialize, Deserialize, Clone)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum TargetConfig {
ChronySock(ChronySockConfig),
InfluxDb(InfluxConfig),
}
#[derive(Serialize, Deserialize, Clone, Default)]
pub struct Config {
pub influxdb: InfluxConfig,
pub sources: BTreeMap<String, SourceConfig>,
pub targets: BTreeMap<String, TargetConfig>,
}
impl Provider for Config {
fn metadata(&self) -> figment::Metadata {
figment::Metadata::named("Default config")
}
fn data(&self) -> Result<Map<figment::Profile, figment::value::Dict>, figment::Error> {
Serialized::defaults(Config::default()).data()
}
}

View File

@@ -1,3 +1,4 @@
pub mod config;
pub mod sources; pub mod sources;
pub mod targets; pub mod targets;
@@ -11,168 +12,11 @@ macro_rules! fatal {
use async_trait::async_trait; use async_trait::async_trait;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use figment::{
Figment, Provider,
providers::{Format, Serialized, Toml},
util::map,
value::Map,
};
use gethostname::gethostname;
use influxdb2::models::DataPoint;
use serde_derive::{Deserialize, Serialize};
use serde_with::{DurationSeconds, serde_as};
use tokio::sync::broadcast::*; use tokio::sync::broadcast::*;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use std::{fmt::Debug, path::Path, sync::Arc}; use std::{fmt::Debug, sync::Arc};
#[derive(Serialize, Deserialize, Clone)]
pub struct InfluxConfig {
pub enabled: bool,
pub url: String,
pub org: String,
pub bucket: String,
pub token: String,
pub tags: Map<String, String>,
}
impl Default for InfluxConfig {
fn default() -> Self {
let host = gethostname().into_string().unwrap();
InfluxConfig {
enabled: false,
url: "http://localhost:8086".into(),
org: "default".into(),
bucket: "default".into(),
token: "".into(),
tags: map! { "host".into() => host },
}
}
}
#[serde_as]
#[derive(Serialize, Deserialize, Clone)]
#[serde(default)]
pub struct ChronyConfig {
pub enabled: bool,
#[serde_as(as = "DurationSeconds<u64>")]
pub timeout: std::time::Duration,
#[serde_as(as = "DurationSeconds<u64>")]
pub tracking_interval: std::time::Duration,
#[serde_as(as = "DurationSeconds<u64>")]
pub sources_interval: std::time::Duration,
pub measurement_prefix: String,
pub tracking_measurement: String,
pub sources_measurement: String,
pub host: String,
}
impl Default for ChronyConfig {
fn default() -> Self {
ChronyConfig {
enabled: false,
timeout: std::time::Duration::from_secs(5),
tracking_interval: std::time::Duration::from_secs(60),
sources_interval: std::time::Duration::from_secs(300),
measurement_prefix: "chrony.".into(),
tracking_measurement: "tracking".into(),
sources_measurement: "sources".into(),
host: "127.0.0.1:323".into(),
}
}
}
#[derive(Serialize, Deserialize, Clone)]
#[serde(default)]
pub struct ChronySockConfig {
pub enabled: bool,
pub sock: String,
}
impl Default for ChronySockConfig {
fn default() -> Self {
ChronySockConfig {
enabled: false,
sock: "".into(),
}
}
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct HwmonSensorConfig {
pub device: String,
pub sensor: String,
}
#[serde_as]
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(default)]
pub struct HwmonConfig {
pub enabled: bool,
#[serde_as(as = "DurationSeconds<u64>")]
pub interval: std::time::Duration,
pub measurement: String,
pub sensors: Map<String, HwmonSensorConfig>,
}
impl Default for HwmonConfig {
fn default() -> Self {
HwmonConfig {
enabled: false,
interval: std::time::Duration::from_secs(60),
measurement: "hwmon".into(),
sensors: map! {},
}
}
}
#[serde_as]
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(default)]
pub struct GpsdConfig {
pub enabled: bool,
#[serde_as(as = "DurationSeconds<u64>")]
pub interval: std::time::Duration,
pub host: String,
}
impl Default for GpsdConfig {
fn default() -> Self {
GpsdConfig {
enabled: false,
interval: std::time::Duration::from_secs(60),
host: "localhost:2947".into(),
}
}
}
#[serde_as]
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(default)]
pub struct Prs10Config {
pub enabled: bool,
pub port: String,
pub baud: u32,
#[serde_as(as = "DurationSeconds<u64>")]
pub timeout: std::time::Duration,
#[serde_as(as = "DurationSeconds<u64>")]
pub status_interval: std::time::Duration,
#[serde_as(as = "DurationSeconds<u64>")]
pub stats_interval: std::time::Duration,
}
impl Default for Prs10Config {
fn default() -> Self {
Prs10Config {
enabled: false,
port: "/dev/ttyS0".into(),
baud: 9600,
timeout: std::time::Duration::from_secs(1),
status_interval: std::time::Duration::from_secs(10),
stats_interval: std::time::Duration::from_secs(30),
}
}
}
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct TimeReport { pub struct TimeReport {
@@ -247,103 +91,12 @@ pub struct SourceReport {
pub details: Arc<dyn SourceReportDetails>, pub details: Arc<dyn SourceReportDetails>,
} }
#[serde_as]
#[derive(Serialize, Deserialize, Clone)]
#[serde(default)]
pub struct UCCMConfig {
pub enabled: bool,
pub port: String,
pub baud: u32,
#[serde_as(as = "DurationSeconds<u64>")]
pub status_interval: std::time::Duration,
#[serde_as(as = "DurationSeconds<u64>")]
pub timeout: std::time::Duration,
pub measurement: String,
}
impl Default for UCCMConfig {
fn default() -> Self {
UCCMConfig {
enabled: false,
port: "/dev/ttyS0".into(),
baud: 57600,
status_interval: std::time::Duration::from_secs(10),
timeout: std::time::Duration::from_secs(1),
measurement: "uccm_gpsdo".into(),
}
}
}
#[derive(Serialize, Deserialize, Clone, Default)]
pub struct SourcesConfig {
pub chrony: ChronyConfig,
pub hwmon: HwmonConfig,
pub uccm: UCCMConfig,
pub gpsd: GpsdConfig,
pub prs10: Prs10Config,
}
#[derive(Serialize, Deserialize, Clone)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum SourceConfig {
Chrony(ChronyConfig),
Hwmon(HwmonConfig),
Uccm(UCCMConfig),
Gpsd(GpsdConfig),
Prs10(Prs10Config),
}
#[derive(Serialize, Deserialize, Clone)]
pub struct NamedSourceConfig {
pub name: String,
#[serde(flatten)]
pub source: SourceConfig,
}
#[derive(Serialize, Deserialize, Clone, Default)]
pub struct TargetsConfig {
pub chrony: ChronySockConfig,
}
#[derive(Serialize, Deserialize, Clone, Default)]
pub struct Config {
pub influxdb: InfluxConfig,
pub sources: Vec<NamedSourceConfig>,
pub targets: TargetsConfig,
}
impl Provider for Config {
fn metadata(&self) -> figment::Metadata {
figment::Metadata::named("Default config")
}
fn data(&self) -> Result<Map<figment::Profile, figment::value::Dict>, figment::Error> {
Serialized::defaults(Config::default()).data()
}
}
pub fn load_config(filename: &Path) -> Figment {
Figment::from(Serialized::defaults(Config::default())).merge(Toml::file(filename))
}
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum ChimemonMessage { pub enum ChimemonMessage {
DataPoint(DataPoint),
DataPoints(Vec<DataPoint>),
TimeReport(TimeReport), TimeReport(TimeReport),
SourceReport(SourceReport), SourceReport(SourceReport),
} }
impl From<DataPoint> for ChimemonMessage {
fn from(dp: DataPoint) -> Self {
ChimemonMessage::DataPoint(dp)
}
}
impl From<Vec<DataPoint>> for ChimemonMessage {
fn from(dps: Vec<DataPoint>) -> Self {
ChimemonMessage::DataPoints(dps)
}
}
impl From<TimeReport> for ChimemonMessage { impl From<TimeReport> for ChimemonMessage {
fn from(tr: TimeReport) -> Self { fn from(tr: TimeReport) -> Self {
ChimemonMessage::TimeReport(tr) ChimemonMessage::TimeReport(tr)
@@ -368,5 +121,7 @@ pub trait ChimemonSource {
#[async_trait] #[async_trait]
pub trait ChimemonTarget { pub trait ChimemonTarget {
type Config;
fn new(name: &str, config: Self::Config) -> Self;
async fn run(self, chan: ChimemonTargetChannel, cancel: CancellationToken); async fn run(self, chan: ChimemonTargetChannel, cancel: CancellationToken);
} }

View File

@@ -9,7 +9,11 @@ use tokio_util::sync::CancellationToken;
use tracing::{Instrument, debug, error, info, info_span, warn}; use tracing::{Instrument, debug, error, info, info_span, warn};
use tracing_subscriber::{self, EnvFilter, fmt::format::FmtSpan}; use tracing_subscriber::{self, EnvFilter, fmt::format::FmtSpan};
use chimemon::*; use chimemon::{
config::{SourceConfig, TargetConfig},
*,
};
use config::Config;
use sources::{ use sources::{
chrony::ChronyClient, gpsd::GpsdSource, hwmon::HwmonSource, prs10::Prs10Monitor, chrony::ChronyClient, gpsd::GpsdSource, hwmon::HwmonSource, prs10::Prs10Monitor,
uccm::UCCMMonitor, uccm::UCCMMonitor,
@@ -54,11 +58,11 @@ struct Args {
} }
fn run_source( fn run_source(
config: NamedSourceConfig, name: &str,
source: SourceConfig,
chan: ChimemonSourceChannel, chan: ChimemonSourceChannel,
shutdown: CancellationToken, shutdown: CancellationToken,
) -> Option<JoinHandle<()>> { ) -> Option<JoinHandle<()>> {
let NamedSourceConfig { name, source } = config;
match source { match source {
SourceConfig::Chrony(source_config) if source_config.enabled => { SourceConfig::Chrony(source_config) if source_config.enabled => {
let c = ChronyClient::new(&name, source_config); let c = ChronyClient::new(&name, source_config);
@@ -94,6 +98,28 @@ fn run_source(
} }
} }
fn run_target(
name: &str,
target: TargetConfig,
chan: ChimemonTargetChannel,
cancel: CancellationToken,
) -> Option<JoinHandle<()>> {
match target {
TargetConfig::ChronySock(source_config) if source_config.enabled => {
let c = ChronySockServer::new(name, source_config);
Some(tokio::spawn(
c.run(chan, cancel)
.instrument(info_span!("chronysock-task")),
))
}
TargetConfig::InfluxDb(source_config) if source_config.enabled => {
warn!("influx not implemented");
None
}
_ => None,
}
}
async fn dummy_consumer(mut chan: ChimemonTargetChannel, cancel: CancellationToken) { async fn dummy_consumer(mut chan: ChimemonTargetChannel, cancel: CancellationToken) {
info!("Dummy receiver task started"); info!("Dummy receiver task started");
loop { loop {
@@ -141,72 +167,22 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let shutdown_token = CancellationToken::new(); let shutdown_token = CancellationToken::new();
if config.influxdb.enabled { for (name, source) in config.sources {
info!( if let Some(task) = run_source(&name, source, sourcechan.clone(), shutdown_token.clone()) {
"Connecting to influxdb {} org: {} using token",
&config.influxdb.url, &config.influxdb.org
);
let config = config.clone();
let influx = influxdb2::Client::new(
&config.influxdb.url,
&config.influxdb.org,
&config.influxdb.token,
);
let mut influx_rx = sourcechan.subscribe();
let influx_cancel = shutdown_token.clone();
tasks.push(tokio::spawn(
async move {
let stream = async_stream::stream! {
while let Ok(msg) = influx_rx.recv().await {
match msg {
ChimemonMessage::DataPoint(dp) => {
yield dp
},
ChimemonMessage::DataPoints(dps) => {
for p in dps {
yield p
}
},
_ => {}
}
}
};
select! {
_ = influx_cancel.cancelled() => {
return
},
res = influx.write(&config.influxdb.bucket, stream) => {
match res {
Err(e) => error!("Error writing to influx: {}", e.to_string()),
_ => warn!("Unexpectedly shutting down influx task"),
}
},
}
}
.instrument(info_span!("influx-task")),
));
}
for source in config.sources {
if let Some(task) = run_source(source, sourcechan.clone(), shutdown_token.clone()) {
tasks.push(task) tasks.push(task)
} }
} }
let chrony_refclock = if config.targets.chrony.enabled { for (name, target) in config.targets {
Some(ChronySockServer::new(config.targets.chrony.to_owned())) if let Some(task) = run_target(
} else { &name,
None target,
}; sourcechan.subscribe(),
if let Some(chrony_refclock) = chrony_refclock { shutdown_token.clone(),
tasks.push(tokio::spawn( ) {
chrony_refclock tasks.push(task)
.run(sourcechan.subscribe(), shutdown_token.clone()) }
.instrument(info_span!("chrony-refclock-task")), }
));
};
if tasks.len() == 0 { if tasks.len() == 0 {
error!("No tasks configured, exiting."); error!("No tasks configured, exiting.");

View File

@@ -5,14 +5,13 @@ use async_trait::async_trait;
use chrony_candm::reply::{self, ReplyBody, SourceMode}; use chrony_candm::reply::{self, ReplyBody, SourceMode};
use chrony_candm::request::{self, RequestBody}; use chrony_candm::request::{self, RequestBody};
use chrony_candm::{ClientOptions, blocking_query}; use chrony_candm::{ClientOptions, blocking_query};
use futures::future::join; use tokio::select;
use tokio::{join, select};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing::{info, warn}; use tracing::{info, warn};
use crate::{ use crate::{
ChimemonSource, ChimemonSourceChannel, ChronyConfig, MetricTags, SourceMetric, SourceReport, ChimemonSource, ChimemonSourceChannel, MetricTags, SourceMetric, SourceReport,
SourceReportDetails, SourceStatus, SourceReportDetails, SourceStatus, config::ChronyConfig,
}; };
pub struct ChronyClient { pub struct ChronyClient {
@@ -313,7 +312,7 @@ impl ChimemonSource for ChronyClient {
match self.tracking_poll(&chan).await { match self.tracking_poll(&chan).await {
Ok(_) => (), Ok(_) => (),
Err(e) => { Err(e) => {
warn!("Error in chrony task: {}", e.to_string()); warn!(error = ?e, "Error in chrony tracking task");
} }
} }
}, },
@@ -321,7 +320,7 @@ impl ChimemonSource for ChronyClient {
match self.sources_poll(&chan).await { match self.sources_poll(&chan).await {
Ok(_) => (), Ok(_) => (),
Err(e) => { Err(e) => {
warn!("Error in chrony task: {}", e.to_string()); warn!(error = ?e, "Error in chrony sources task");
} }
} }
} }

View File

@@ -15,11 +15,11 @@ use tokio::net::{TcpStream, ToSocketAddrs, lookup_host};
use tokio::time::{interval, timeout}; use tokio::time::{interval, timeout};
use tokio_util::codec::{Framed, LinesCodec}; use tokio_util::codec::{Framed, LinesCodec};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing::{debug, debug_span, info, instrument, warn}; use tracing::{debug, debug_span, error, info, instrument, warn};
use crate::{ use crate::{
ChimemonMessage, ChimemonSource, ChimemonSourceChannel, GpsdConfig, SourceMetric, SourceReport, ChimemonMessage, ChimemonSource, ChimemonSourceChannel, SourceMetric, SourceReport,
SourceReportDetails, SourceStatus, SourceReportDetails, SourceStatus, config::GpsdConfig,
}; };
pub struct GpsdSource { pub struct GpsdSource {
@@ -116,17 +116,21 @@ impl GpsdSource {
.and_then(|sky| sky.tdop) .and_then(|sky| sky.tdop)
.map_or(f64::INFINITY, |tdop| tdop as f64); .map_or(f64::INFINITY, |tdop| tdop as f64);
chan.send(ChimemonMessage::SourceReport(SourceReport { if let Err(e) = chan.send(
name: self.name.clone(), SourceReport {
status: SourceStatus::Unknown, name: self.name.clone(),
details: Arc::new(GpsdSourceReport { status: SourceStatus::Unknown,
fix_type: tpv.map_or(GpsdFixType::Unknown, |tpv| tpv.mode.into()), details: Arc::new(GpsdSourceReport {
sats_tracked, fix_type: tpv.map_or(GpsdFixType::Unknown, |tpv| tpv.mode.into()),
sats_visible, sats_tracked,
tdop, sats_visible,
}), tdop,
})) }),
.unwrap(); }
.into(),
) {
error!(error = ?e, "Unable to send to channel")
}
} }
fn handle_msg(&mut self, msg: String) -> Result<(), Box<dyn std::error::Error>> { fn handle_msg(&mut self, msg: String) -> Result<(), Box<dyn std::error::Error>> {

View File

@@ -6,9 +6,10 @@ use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn}; use tracing::{debug, error, info, warn};
use crate::{ use crate::{
ChimemonSource, ChimemonSourceChannel, HwmonConfig, MetricTags, SourceMetric, SourceReport, ChimemonSource, ChimemonSourceChannel, MetricTags, SourceMetric, SourceReport,
SourceReportDetails, SourceStatus, SourceReportDetails, SourceStatus, config::HwmonConfig,
}; };
pub struct HwmonSource { pub struct HwmonSource {
name: String, name: String,
config: HwmonConfig, config: HwmonConfig,

View File

@@ -17,8 +17,8 @@ use tokio_util::sync::CancellationToken;
use tracing::{debug, debug_span, error, info, instrument, warn}; use tracing::{debug, debug_span, error, info, instrument, warn};
use crate::{ use crate::{
ChimemonMessage, ChimemonSource, ChimemonSourceChannel, MetricTags, Prs10Config, SourceMetric, ChimemonMessage, ChimemonSource, ChimemonSourceChannel, MetricTags, SourceMetric, SourceReport,
SourceReport, SourceReportDetails, SourceStatus, fatal, SourceReportDetails, SourceStatus, config::Prs10Config, fatal,
}; };
#[derive(Debug)] #[derive(Debug)]
@@ -429,7 +429,7 @@ impl Prs10Monitor {
#[instrument(skip_all)] #[instrument(skip_all)]
async fn status_poll(&mut self) -> Result<ChimemonMessage, Box<dyn std::error::Error>> { async fn status_poll(&mut self) -> Result<ChimemonMessage, Box<dyn std::error::Error>> {
let status = self.get_status().await?; let status = self.get_status().await?;
Ok(ChimemonMessage::SourceReport(SourceReport { Ok(SourceReport {
name: self.name.clone(), name: self.name.clone(),
status: if status.is_healthy() { status: if status.is_healthy() {
SourceStatus::Healthy SourceStatus::Healthy
@@ -437,7 +437,8 @@ impl Prs10Monitor {
SourceStatus::Unknown SourceStatus::Unknown
}, },
details: Arc::new(status), details: Arc::new(status),
})) }
.into())
} }
#[instrument(skip_all)] #[instrument(skip_all)]
@@ -459,7 +460,7 @@ impl Prs10Monitor {
} }
drop(stats_guard); drop(stats_guard);
Ok(ChimemonMessage::SourceReport(SourceReport { Ok(SourceReport {
name: self.name.clone(), name: self.name.clone(),
status: SourceStatus::Unknown, status: SourceStatus::Unknown,
details: Arc::new(Prs10Stats { details: Arc::new(Prs10Stats {
@@ -488,7 +489,8 @@ impl Prs10Monitor {
mul_amp_gain_volts: analog_values[18], mul_amp_gain_volts: analog_values[18],
rf_lock_volts: analog_values[19], rf_lock_volts: analog_values[19],
}), }),
})) }
.into())
} }
async fn reset_rx_state(&mut self) -> Result<(), Box<dyn std::error::Error>> { async fn reset_rx_state(&mut self) -> Result<(), Box<dyn std::error::Error>> {

View File

@@ -7,7 +7,6 @@ use bitflags::bitflags;
use byteorder::{BigEndian, ReadBytesExt}; use byteorder::{BigEndian, ReadBytesExt};
use bytes::{Buf, BytesMut}; use bytes::{Buf, BytesMut};
use chrono::{DateTime, Duration, NaiveDateTime, Utc}; use chrono::{DateTime, Duration, NaiveDateTime, Utc};
use figment::value::Map;
use itertools::Itertools; use itertools::Itertools;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, ReadHalf, WriteHalf}; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, ReadHalf, WriteHalf};
use tokio::select; use tokio::select;
@@ -19,7 +18,7 @@ use tracing::{debug, error, info, warn};
use crate::{ use crate::{
ChimemonMessage, ChimemonSource, ChimemonSourceChannel, SourceMetric, SourceReport, ChimemonMessage, ChimemonSource, ChimemonSourceChannel, SourceMetric, SourceReport,
SourceReportDetails, SourceStatus, TimeReport, UCCMConfig, SourceReportDetails, SourceStatus, TimeReport, config::UCCMConfig,
}; };
pub const GPS_EPOCH: i64 = 315964800; // Doesn't seem possible to have a const DateTime object pub const GPS_EPOCH: i64 = 315964800; // Doesn't seem possible to have a const DateTime object
@@ -465,36 +464,41 @@ impl UCCMMonitor {
&& tod && tod
.flags .flags
.contains(UCCMFlags::OSC_LOCK | UCCMFlags::HAVE_GPS_TIME); .contains(UCCMFlags::OSC_LOCK | UCCMFlags::HAVE_GPS_TIME);
chan.send(ChimemonMessage::TimeReport(TimeReport { chan.send(
system_time: sysnow, TimeReport {
offset, system_time: sysnow,
leaps: tod.leaps as isize, offset,
leap_flag: tod.flags.contains(UCCMFlags::LEAP_FLAG), leaps: tod.leaps as isize,
valid, leap_flag: tod.flags.contains(UCCMFlags::LEAP_FLAG),
})) valid,
}
.into(),
)
.expect("Unable to send to channel"); .expect("Unable to send to channel");
if sysnow - last_sent_report if sysnow - last_sent_report
>= Duration::from_std(self.config.status_interval).unwrap() >= Duration::from_std(self.config.status_interval).unwrap()
{ {
if let Some(loop_diag) = &last_loop_diag { if let Some(loop_diag) = &last_loop_diag {
if let Err(e) = if let Err(e) = chan.send(
chan.send(ChimemonMessage::SourceReport(SourceReport { SourceReport {
name: "uccm".to_owned(), name: "uccm".to_owned(),
status: SourceStatus::Unknown, status: SourceStatus::Unknown,
details: loop_diag.clone(), details: loop_diag.clone(),
})) }
{ .into(),
) {
error!(error = ?e, "Unable to send message to channel"); error!(error = ?e, "Unable to send message to channel");
} }
} }
if let Some(gps_sats) = &last_gps_sats { if let Some(gps_sats) = &last_gps_sats {
if let Err(e) = if let Err(e) = chan.send(
chan.send(ChimemonMessage::SourceReport(SourceReport { SourceReport {
name: "uccm".to_owned(), name: "uccm".to_owned(),
status: SourceStatus::Unknown, status: SourceStatus::Unknown,
details: gps_sats.clone(), details: gps_sats.clone(),
})) }
{ .into(),
) {
error!(error = ?e, "Unable to send message to channel"); error!(error = ?e, "Unable to send message to channel");
} }
} }

View File

@@ -6,13 +6,14 @@ use async_trait::async_trait;
use libc::{c_double, c_int, timeval}; use libc::{c_double, c_int, timeval};
use tokio::select; use tokio::select;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing::{debug, warn}; use tracing::{debug, info, warn};
use crate::{ChimemonMessage, ChimemonTarget, ChimemonTargetChannel, ChronySockConfig}; use crate::{ChimemonMessage, ChimemonTarget, ChimemonTargetChannel, config::ChronySockConfig};
const CHRONY_MAGIC: c_int = 0x534f434b; const CHRONY_MAGIC: c_int = 0x534f434b;
pub struct ChronySockServer { pub struct ChronySockServer {
name: String,
sock_path: PathBuf, sock_path: PathBuf,
} }
@@ -27,17 +28,19 @@ pub struct ChronyTimeReport {
magic: c_int, magic: c_int,
} }
impl ChronySockServer { impl ChronySockServer {}
pub fn new(config: ChronySockConfig) -> Self {
ChronySockServer {
sock_path: config.sock.into(),
}
}
}
#[async_trait] #[async_trait]
impl ChimemonTarget for ChronySockServer { impl ChimemonTarget for ChronySockServer {
type Config = ChronySockConfig;
fn new(name: &str, config: ChronySockConfig) -> Self {
ChronySockServer {
name: name.to_owned(),
sock_path: config.sock.into(),
}
}
async fn run(mut self, mut chan: ChimemonTargetChannel, cancel: CancellationToken) { async fn run(mut self, mut chan: ChimemonTargetChannel, cancel: CancellationToken) {
info!("Chrony refclock task started");
loop { loop {
select! { select! {
_ = cancel.cancelled() => { return } _ = cancel.cancelled() => { return }

View File

@@ -1 +1,2 @@
pub mod chrony_refclock; pub mod chrony_refclock;
// pub mod influx;