refactor, prs10 stats, etc

This commit is contained in:
2026-02-02 00:41:19 -08:00
parent 7f24bf5a91
commit 7717aa9177
12 changed files with 790 additions and 652 deletions

1079
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -10,17 +10,12 @@ release-logs = ["tracing/max_level_info"]
[dependencies]
serde = "1.0"
serde_derive = "1.0"
influxdb2 = { version = "0.3.3", features = [
"rustls",
], default-features = false }
tokio = { version = "1", features = ["rt", "io-util"] }
clap = { version = "4.0", features = ["derive"] }
figment = { version = "0.10", features = ["toml"] }
gethostname = "0.3"
futures = "0.3.24"
async-trait = "0.1.58"
tokio-stream = { version = "0.1.11", features = ["sync"] }
bitflags = "1.3.2"
byteorder = "1.4.3"
tokio-serial = "5.4.4"
bytes = "1.2.1"
@@ -35,7 +30,10 @@ backoff = { version = "0.4.0", features = ["tokio"] }
serde_repr = "0.1.20"
tracing = "0.1.44"
tracing-subscriber = { version = "0.3.22", features = ["fmt"] }
bit-struct = { version = "0.3.2", default-features = false }
serialport = "4.8.1"
gethostname = "1.1.0"
bitflags = "2.10.0"
influxdb2 = "0.3.9"
[dependencies.chrony-candm]
git = "https://github.com/aws/chrony-candm"

View File

@@ -1,9 +1,5 @@
pub mod chrony;
pub mod chrony_refclock;
pub mod gpsd;
pub mod hwmon;
pub mod prs10;
pub mod uccm;
pub mod sources;
pub mod targets;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
@@ -18,7 +14,7 @@ use influxdb2::models::DataPoint;
use serde_derive::{Deserialize, Serialize};
use tokio::sync::broadcast::*;
use std::{fmt::Debug, net::SocketAddr, path::Path, sync::Arc};
use std::{fmt::Debug, path::Path, sync::Arc};
#[derive(Serialize, Deserialize, Clone)]
pub struct InfluxConfig {
@@ -146,7 +142,7 @@ impl Default for Prs10Config {
baud: 9600,
timeout: std::time::Duration::from_secs(1),
status_interval: std::time::Duration::from_secs(10),
stats_interval: std::time::Duration::from_secs(10),
stats_interval: std::time::Duration::from_secs(30),
}
}
}

View File

@@ -5,14 +5,15 @@ use tokio::sync::broadcast;
use tracing::{Instrument, debug, error, info, info_span, warn};
use tracing_subscriber;
use crate::{
chrony::*, chrony_refclock::ChronySockServer, hwmon::HwmonSource, prs10::Prs10Monitor,
use chimemon::*;
use sources::{
chrony::ChronyClient, gpsd::GpsdSource, hwmon::HwmonSource, prs10::Prs10Monitor,
uccm::UCCMMonitor,
};
use chimemon::{gpsd::GpsdSource, *};
use targets::chrony_refclock::ChronySockServer;
const PROGRAM_NAME: &str = "chimemon";
const VERSION: &str = "0.0.1";
const VERSION: &str = env!("CARGO_PKG_VERSION");
#[derive(ValueEnum, Clone)]
enum Level {
@@ -116,19 +117,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
));
};
let chrony_refclock = if config.targets.chrony.enabled {
Some(ChronySockServer::new(config.targets.chrony.to_owned()))
} else {
None
};
if let Some(chrony_refclock) = chrony_refclock {
tasks.push(tokio::spawn(
chrony_refclock
.run(sourcechan.subscribe())
.instrument(info_span!("chrony-refclock-task")),
));
};
let gpsd = if config.sources.gpsd.enabled {
Some(GpsdSource::new(config.to_owned()).await.unwrap())
} else {
@@ -154,6 +142,19 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
))
}
let chrony_refclock = if config.targets.chrony.enabled {
Some(ChronySockServer::new(config.targets.chrony.to_owned()))
} else {
None
};
if let Some(chrony_refclock) = chrony_refclock {
tasks.push(tokio::spawn(
chrony_refclock
.run(sourcechan.subscribe())
.instrument(info_span!("chrony-refclock-task")),
));
};
if tasks.len() == 0 {
error!("No tasks configured, exiting.");
return Ok(()); // not an error, but exit before starting a dummy task

View File

@@ -3,13 +3,12 @@ use crate::{
SourceStatus,
};
use async_trait::async_trait;
use chrony_candm::reply::{self, ReplyBody, SourceMode, SourceState};
use chrony_candm::reply::{self, ReplyBody, SourceMode};
use chrony_candm::request::{self, RequestBody};
use chrony_candm::{ClientOptions, blocking_query};
use influxdb2::models::DataPoint;
use std::net::{SocketAddr, ToSocketAddrs};
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::time::Duration;
use tokio::join;
use tracing::{info, warn};

View File

@@ -12,15 +12,13 @@ use std::time::Duration;
use async_trait::async_trait;
use backoff::ExponentialBackoff;
use futures::StreamExt;
use futures::{SinkExt, Stream};
use futures::{StreamExt, task::Context};
use gpsd_proto::{
Device, Gst, Mode, Pps, ResponseHandshake, Sky, Tpv, UnifiedResponse, Version, Watch,
};
use serde::{Deserialize, Deserializer, Serialize};
use gpsd_proto::{Device, Gst, Mode, Pps, Sky, Tpv, UnifiedResponse, Version};
use serde::Serialize;
use serde_json;
use tokio::net::{TcpStream, ToSocketAddrs, lookup_host};
use tokio::time::{Interval, interval, timeout};
use tokio::time::{interval, timeout};
use tokio_util::codec::{Framed, LinesCodec};
use tracing::{debug, debug_span, info, warn};
@@ -167,7 +165,7 @@ impl GpsdSource {
impl ChimemonSource for GpsdSource {
async fn run(mut self, mut chan: ChimemonSourceChannel) {
info!("gpsd task started");
self.conn.ensure_connection().await.unwrap();
self.conn.conn().await.unwrap();
let mut ticker = interval(Duration::from_secs(self.config.sources.gpsd.interval));
let mut params = WatchParams::default();
@@ -337,51 +335,61 @@ impl GpsdTransport {
conn_backoff: ExponentialBackoff::default(),
})
}
async fn connect(&mut self) -> Result<(), Box<dyn std::error::Error>> {
async fn connect_inner(
&self,
) -> Result<Framed<TcpStream, LinesCodec>, Box<dyn std::error::Error>> {
info!("Connecting to gpsd @ {}", self.host);
let mut framed = backoff::future::retry_notify(
self.conn_backoff.clone(),
async || {
Ok(Framed::new(
TcpStream::connect(self.host).await?,
LinesCodec::new(),
))
},
|e, d| warn!("Failed to connect to {} after {:?}: `{}`", self.host, d, e),
)
.await?;
let mut framed = Framed::new(TcpStream::connect(self.host).await?, LinesCodec::new());
debug!("Waiting for initial VERSION");
if let Ok(Some(Ok(r))) = timeout(Duration::from_secs(5), framed.next()).await {
match timeout(Duration::from_secs(5), framed.next()).await {
Ok(Some(Ok(r))) => {
if let Ok(version) = serde_json::from_str::<Version>(&r) {
info!(
"Connected to gpsd @ {}, release {}",
self.host, version.release
);
} else {
warn!("Got unexpected non-VERSION response after connection (`{r}`)");
}
Ok(framed)
}
_ => Err("Unexpected failure to receive initial handshake response".into()),
}
}
async fn conn(
&mut self,
) -> Result<&mut Framed<TcpStream, LinesCodec>, Box<dyn std::error::Error>> {
if self.framed.is_none() {
let framed = backoff::future::retry_notify(
self.conn_backoff.clone(),
|| async {
self.connect_inner()
.await
.map_err(backoff::Error::transient)
},
|e, d| warn!("Failed to connect to {} after {:?}: `{}`", self.host, d, e),
)
} else {
warn!("Got unexpected non-VERSION response after connection (`{r}`)")
}
.await?;
self.framed = Some(framed);
Ok(())
} else {
Err("Unexpected failure to receive initial handshake response".into())
}
}
async fn ensure_connection(&mut self) -> Result<(), Box<dyn std::error::Error>> {
if let Some(conn) = &self.framed {
Ok(())
} else {
self.connect().await
}
Ok(self.framed.as_mut().unwrap())
}
async fn cmd_response(
&mut self,
cmd: &GpsdCommand,
) -> Result<Vec<UnifiedResponse>, Box<dyn std::error::Error>> {
debug!("Command: `{cmd:?}`");
self.ensure_connection().await?;
let mut responses = Vec::new();
if let Some(conn) = &mut self.framed {
let conn = self.conn().await?;
debug!("Raw command: `{}`", cmd.to_string());
conn.send(cmd.to_string()).await?;
for _ in 0..cmd.expected_responses() {
@@ -394,9 +402,6 @@ impl GpsdTransport {
}
}
}
} else {
return Err("Missing connection despite ensure".into());
}
Ok(responses)
}
@@ -407,12 +412,10 @@ impl GpsdTransport {
impl Stream<Item = Result<UnifiedResponse, Box<dyn std::error::Error>>>,
Box<dyn std::error::Error>,
> {
self.ensure_connection().await?;
if let Some(conn) = &mut self.framed {
Ok(conn.map(|line| Ok(serde_json::from_str::<UnifiedResponse>(&line?)?)))
} else {
Err("No connection after connecting.".into())
}
Ok(self
.conn()
.await?
.map(|line| Ok(serde_json::from_str::<UnifiedResponse>(&line?)?)))
}
}
@@ -420,7 +423,7 @@ mod tests {
use gpsd_proto::{ResponseData, UnifiedResponse};
use tokio_stream::StreamExt;
use crate::gpsd::{GpsdCommand, GpsdTransport, WatchParams};
use crate::sources::gpsd::{GpsdCommand, GpsdTransport, WatchParams};
use std::sync::Once;
static INIT: Once = Once::new();
@@ -433,7 +436,7 @@ mod tests {
async fn test_gpsd() {
init_logger();
let mut gpsd = GpsdTransport::new(&"192.168.65.93:2947").await.unwrap();
gpsd.connect().await.unwrap();
gpsd.conn().await.unwrap();
let mut params = WatchParams::default();
params.enable = Some(true);
params.json = Some(true);

View File

@@ -3,16 +3,8 @@ use crate::{
SourceStatus,
};
use async_trait::async_trait;
use futures::{StreamExt, stream};
use influxdb2::models::DataPoint;
use std::{
fs::File,
io::Read,
path::PathBuf,
sync::Arc,
time::{Duration, SystemTime, UNIX_EPOCH},
};
use tracing::{Instrument, debug, error, info, info_span, warn};
use std::{fs::File, io::Read, path::PathBuf, sync::Arc, time::Duration};
use tracing::{debug, error, info, warn};
pub struct HwmonSource {
config: Config,

5
src/sources/mod.rs Normal file
View File

@@ -0,0 +1,5 @@
pub mod chrony;
pub mod gpsd;
pub mod hwmon;
pub mod prs10;
pub mod uccm;

View File

@@ -1,3 +1,4 @@
use std::str::FromStr;
use std::sync::Arc;
use crate::{
@@ -5,14 +6,12 @@ use crate::{
SourceReport, SourceReportDetails, SourceStatus,
};
use async_trait::async_trait;
use bit_struct::u4;
use bitflags::bitflags;
use itertools::Itertools;
use serde::Deserialize;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, ReadHalf, WriteHalf};
use tokio::select;
use tokio::sync::OnceCell;
use tokio::time::{Interval, interval, timeout};
use tokio::time::{interval, timeout};
use tokio_serial;
use tokio_serial::{SerialPort, SerialStream};
use tracing::{debug, error, info, warn};
@@ -40,6 +39,7 @@ impl TryFrom<&[u8]> for Prs10Info {
}
bitflags! {
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub struct Prs10PowerLampFlags: u8 {
const ELEC_VOLTAGE_LOW = (1<<0);
const ELEC_VOLTAGE_HIGH = (1<<1);
@@ -53,7 +53,7 @@ bitflags! {
}
impl Prs10PowerLampFlags {
pub fn get_metrics(&self, no_tags: Vec<String>) -> Vec<SourceMetric> {
pub fn get_metrics(&self, no_tags: Arc<Vec<(String, String)>>) -> Vec<SourceMetric> {
// Define the mapping statically
const FLAG_LABELS: [(&Prs10PowerLampFlags, &str); 8] = [
(&Prs10PowerLampFlags::ELEC_VOLTAGE_LOW, "elec_voltage_low"),
@@ -66,8 +66,6 @@ impl Prs10PowerLampFlags {
(&Prs10PowerLampFlags::GATE_VOLTAGE_HIGH, "gate_voltage_high"),
];
let no_tags = Arc::new(vec![]);
// Generate metrics based on flag availability
FLAG_LABELS
.iter()
@@ -80,6 +78,7 @@ impl Prs10PowerLampFlags {
}
bitflags! {
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub struct Prs10RfFlags: u8 {
const PLL_UNLOCK = (1<<0);
const XTAL_VAR_LOW = (1<<1);
@@ -93,6 +92,7 @@ bitflags! {
}
bitflags! {
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub struct Prs10TempFlags: u8 {
const LAMP_TEMP_LOW = (1<<0);
const LAMP_TEMP_HIGH = (1<<1);
@@ -106,6 +106,7 @@ bitflags! {
}
bitflags! {
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub struct Prs10FllFlags: u8 {
const FLL_OFF = (1<<0);
const FLL_DISABLED = (1<<1);
@@ -117,6 +118,7 @@ bitflags! {
}
bitflags! {
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub struct Prs10PpsFlags: u8 {
const PLL_DISABLED = (1<<0);
const PPS_WARMUP = (1<<1);
@@ -130,6 +132,7 @@ bitflags! {
}
bitflags! {
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub struct Prs10SystemFlags: u8 {
const LAMP_RESTART = (1<<0);
const WDT_RESET = (1<<1);
@@ -142,7 +145,7 @@ bitflags! {
}
}
#[derive(Debug, Eq, PartialEq)]
#[derive(Debug, Eq, PartialEq, Clone)]
pub struct Prs10Status {
pub volt_lamp_flags: Prs10PowerLampFlags,
pub rf_flags: Prs10RfFlags,
@@ -167,7 +170,7 @@ impl Default for Prs10Status {
impl SourceReportDetails for Prs10Status {
fn is_healthy(&self) -> bool {
const HEALTHY_PPS: Prs10PpsFlags = Prs10PpsFlags { bits: 4 };
const HEALTHY_PPS: Prs10PpsFlags = Prs10PpsFlags::from_bits(4).unwrap();
self.volt_lamp_flags.is_empty()
&& self.rf_flags.is_empty()
&& self.temp_flags.is_empty()
@@ -217,6 +220,7 @@ impl TryFrom<&[u8]> for Prs10Status {
}
}
#[derive(Debug, Clone)]
pub struct Prs10Stats {
pub ocxo_efc: u32,
pub error_signal_volts: f64,
@@ -238,6 +242,10 @@ pub struct Prs10Stats {
pub lamp_therm: f64,
pub ext_cal_volts: f64,
pub analog_gnd_volts: f64,
pub if_vco_varactor_volts: f64,
pub op_vco_varactor_volts: f64,
pub mul_amp_gain_volts: f64,
pub rf_lock_volts: f64,
}
impl SourceReportDetails for Prs10Stats {
@@ -283,6 +291,22 @@ impl SourceReportDetails for Prs10Stats {
SourceMetric::new_float("lamp_therm", self.lamp_therm, no_tags.clone()),
SourceMetric::new_float("ext_cal_volts", self.ext_cal_volts, no_tags.clone()),
SourceMetric::new_float("analog_gnd_volts", self.analog_gnd_volts, no_tags.clone()),
SourceMetric::new_float(
"if_vco_varactor_volts",
self.if_vco_varactor_volts,
no_tags.clone(),
),
SourceMetric::new_float(
"op_vco_varactor_volts",
self.op_vco_varactor_volts,
no_tags.clone(),
),
SourceMetric::new_float(
"mul_amp_gain_volts",
self.mul_amp_gain_volts,
no_tags.clone(),
),
SourceMetric::new_float("rf_lock_volts", self.rf_lock_volts, no_tags.clone()),
// U16 Metrics (optional, but can be treated as integers)
SourceMetric::new_int(
"freq_offset_ppt",
@@ -344,7 +368,7 @@ impl Prs10Monitor {
async fn set_info(&mut self) -> Result<(), Box<dyn std::error::Error>> {
let id = self.get_id().await?;
self.info.set(id);
self.info.set(id)?;
debug!("Set info to {:?}", self.info);
Ok(())
}
@@ -365,7 +389,7 @@ impl Prs10Monitor {
id
}
pub async fn get_analog(&mut self, id: u4) -> Result<f64, Box<dyn std::error::Error>> {
pub async fn get_analog(&mut self, id: u16) -> Result<f64, Box<dyn std::error::Error>> {
debug!("Getting analog value {id}");
let mut cmd = b"AD".to_vec();
cmd.extend_from_slice(id.to_string().as_bytes());
@@ -376,6 +400,19 @@ impl Prs10Monitor {
Ok(value)
}
pub async fn get_parsed<T: FromStr>(
&mut self,
cmd: &[u8],
) -> Result<T, Box<dyn std::error::Error>>
where
T::Err: std::error::Error + 'static,
{
debug!("Getting int value for command {cmd:?}");
let resp = self.cmd_response(cmd).await?;
let val = str::from_utf8(&resp)?.parse::<T>()?;
Ok(val)
}
pub async fn get_ocxo_efc(&mut self) -> Result<u32, Box<dyn std::error::Error>> {
debug!("Getting u16,u16 -> u32 for OCXO EFC value");
let resp = self.cmd_response(b"FC?").await?;
@@ -391,18 +428,21 @@ impl Prs10Monitor {
}
}
pub async fn get_float(&mut self, cmd: &[u8]) -> Result<f64, Box<dyn std::error::Error>> {
debug!("Getting float value for command {cmd:?}");
let resp = self.cmd_response(cmd).await?;
let val = str::from_utf8(&resp)?.parse::<f64>()?;
Ok(val)
pub async fn get_detected_signals(&mut self) -> Result<(f64, f64), Box<dyn std::error::Error>> {
debug!("Getting detected signals pair");
let resp = self.cmd_response(b"DS?").await?;
let (error, signal) = resp
.splitn(2, |c| *c == b',')
.map(|s| str::from_utf8(s).unwrap().parse::<u16>())
.collect_tuple()
.ok_or("Not enough values in response to DS?".to_string())?;
Ok((error? as f64 * 0.15e-6, signal? as f64 * 0.001))
}
async fn status_poll(&mut self) -> Option<ChimemonMessage> {
async fn status_poll(&mut self) -> Result<ChimemonMessage, Box<dyn std::error::Error>> {
debug!("polling status");
let status = self.get_status().await;
if let Ok(status) = status {
Some(ChimemonMessage::SourceReport(SourceReport {
let status = self.get_status().await?;
Ok(ChimemonMessage::SourceReport(SourceReport {
name: "prs10".into(),
status: if status.is_healthy() {
SourceStatus::Healthy
@@ -411,20 +451,58 @@ impl Prs10Monitor {
},
details: Arc::new(status),
}))
} else {
None
}
}
async fn stats_poll(&mut self) -> Option<ChimemonMessage> {
async fn stats_poll(&mut self) -> Result<ChimemonMessage, Box<dyn std::error::Error>> {
const ANALOG_SCALING: [f64; 20] = [
0.0, 10.0, 10.0, 10.0, 10.0, 1.0, 1.0, 1.0, 1.0, 4.0, 100.0, 1.0, 1.0, 1.0, 1.0, 1.0,
4.0, 4.0, 4.0, 1.0,
];
debug!("polling stats");
let ocxo_efc = self.get_ocxo_efc().await;
let start = std::time::Instant::now();
let ocxo_efc = self.get_ocxo_efc().await?;
let (error_signal_volts, detect_signal_volts) = self.get_detected_signals().await?;
let freq_offset_ppt = self.get_parsed(b"SF?").await?;
let mag_efc = self.get_parsed(b"MR?").await?;
let mut analog_values = [0.0; 20];
for i in 1u16..=19 {
analog_values[i as usize] = self.get_analog(i).await? * ANALOG_SCALING[i as usize]
}
let duration = std::time::Instant::now() - start;
Some(ChimemonMessage::SourceReport(SourceReport {
debug!("stats polled in {}ms", duration.as_secs_f64() * 1000.0);
Ok(ChimemonMessage::SourceReport(SourceReport {
name: "prs10".into(),
status: SourceStatus::Unknown,
details: Arc::new(Prs10Stats {}),
details: Arc::new(Prs10Stats {
ocxo_efc,
error_signal_volts,
detect_signal_volts,
freq_offset_ppt,
mag_efc,
heat_volts: analog_values[1],
elec_volts: analog_values[2],
lamp_fet_drain_volts: analog_values[3],
lamp_fet_gate_volts: analog_values[4],
ocxo_heat_volts: analog_values[5],
cell_heat_volts: analog_values[6],
lamp_heat_volts: analog_values[7],
rb_photo: analog_values[8],
rb_photo_iv: analog_values[9],
case_temp: analog_values[10],
ocxo_therm: analog_values[11],
cell_therm: analog_values[12],
lamp_therm: analog_values[13],
ext_cal_volts: analog_values[14],
analog_gnd_volts: analog_values[15],
if_vco_varactor_volts: analog_values[16],
op_vco_varactor_volts: analog_values[17],
mul_amp_gain_volts: analog_values[18],
rf_lock_volts: analog_values[19],
}),
}))
}
}
@@ -445,26 +523,31 @@ impl ChimemonSource for Prs10Monitor {
);
let mut status_timer = interval(self.config.status_interval);
let mut pps_timer = interval(self.config.stats_interval);
let mut stats_timer = interval(self.config.stats_interval);
loop {
let msg = select! {
_ = status_timer.tick() => {
self.status_poll().await
},
_ = pps_timer.tick() => {
_ = stats_timer.tick() => {
self.stats_poll().await
}
};
if let Some(msg) = msg {
chan.send(msg).expect("Unable to send to channel");
match msg {
Ok(msg) => {
if let Err(e) = chan.send(msg) {
error!("Unable to send to channel {e}")
}
}
Err(e) => error!("Error in poll task: {e}"),
}
}
}
}
mod tests {
use crate::prs10::{Prs10Info, Prs10PowerLampFlags, Prs10PpsFlags, Prs10Status};
use crate::sources::prs10::{Prs10Info, Prs10PowerLampFlags, Prs10PpsFlags, Prs10Status};
#[test]
fn test_info_parse() -> Result<(), Box<dyn std::error::Error>> {
const INFO_VECTOR: &[u8] = b"PRS10_3.15_SN_12345";

View File

@@ -195,6 +195,7 @@ impl UCCMGPSSatsReport {
}
bitflags! {
#[derive(Debug)]
pub struct UCCMFlags: u32 {
const OSC_LOCK = (1<<29);
const LEAP_FLAG = (1<<25);

1
src/targets/mod.rs Normal file
View File

@@ -0,0 +1 @@
pub mod chrony_refclock;