Compare commits

..

16 Commits

19 changed files with 3388 additions and 1513 deletions

1598
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,31 +1,41 @@
[package] [package]
name = "chimemon" name = "chimemon"
version = "0.2.0" version = "0.2.0"
edition = "2021" edition = "2024"
[features]
default = []
release-logs = ["tracing/max_level_info"]
[dependencies] [dependencies]
serde = "1.0" serde = "1.0"
serde_derive = "1.0" serde_derive = "1.0"
influxdb2 = { version = "0.3.3", features = [ tokio = { version = "1", features = ["rt-multi-thread", "io-util"] }
"rustls",
], default-features = false }
tokio = { version = "1", features = ["rt", "io-util"] }
clap = { version = "4.0", features = ["derive"] } clap = { version = "4.0", features = ["derive"] }
log = "0.4"
figment = { version = "0.10", features = ["toml"] } figment = { version = "0.10", features = ["toml"] }
gethostname = "0.3"
env_logger = "0.9.1"
futures = "0.3.24" futures = "0.3.24"
async-trait = "0.1.58" async-trait = "0.1.58"
tokio-stream = { version = "0.1.11", features = ["sync"] } tokio-stream = { version = "0.1.11", features = ["sync"] }
bitflags = "1.3.2"
byteorder = "1.4.3" byteorder = "1.4.3"
tokio-serial = "5.4.4" tokio-serial = "5.4.4"
bytes = "1.2.1" bytes = "1.2.1"
chrono = "0.4.23"
libc = "0.2.137" libc = "0.2.137"
async-stream = "0.3.6" async-stream = "0.3.6"
itertools = "0.14.0" itertools = "0.14.0"
gpsd_proto = { version = "1.0.0" }
tokio-util = { version = "0.7.17", features = ["codec"] }
serde_json = "1.0.146"
backoff = { version = "0.4.0", features = ["tokio"] }
serde_repr = "0.1.20"
tracing = "0.1.44"
tracing-subscriber = { version = "0.3.22", features = ["fmt", "ansi", "time", "env-filter"] }
serialport = "4.8.1"
gethostname = "1.1.0"
bitflags = "2.10.0"
influxdb2 = { version = "0.5.2" }
chrono = "0.4.43"
serde_with = "3.16.1"
ctrlc = "3.5.1"
[dependencies.chrony-candm] [dependencies.chrony-candm]
git = "https://github.com/aws/chrony-candm" git = "https://github.com/aws/chrony-candm"

View File

@@ -1,6 +1,6 @@
FROM rust:slim as builder FROM rust:slim AS builder
RUN apt-get update && apt-get install -y libssl-dev pkg-config RUN apt-get update && apt-get install -y libudev-dev pkg-config
# build deps only first for build cache # build deps only first for build cache
WORKDIR /usr/src WORKDIR /usr/src
RUN USER=root cargo new chimemon RUN USER=root cargo new chimemon
@@ -11,6 +11,7 @@ COPY . .
RUN cargo build --release RUN cargo build --release
FROM debian:bullseye-slim FROM debian:bullseye-slim
RUN apt-get update && apt-get install -y libudev
WORKDIR /app WORKDIR /app
COPY --from=builder /usr/src/chimemon/target/release/chimemon chimemon COPY --from=builder /usr/src/chimemon/target/release/chimemon chimemon
CMD ["/app/chimemon"] CMD ["/app/chimemon"]

View File

@@ -23,6 +23,13 @@
port = "/dev/ttyUSB0" port = "/dev/ttyUSB0"
status_interval = 10 status_interval = 10
measurement = "uccm_gpsdo" measurement = "uccm_gpsdo"
[sources.prs10]
enabled = true
port = "/dev/ttyUSB0"
status_interval = 10
stats_interval = 10
measurement = "prs10_gpsdo"
[targets] [targets]
@@ -38,4 +45,4 @@ token = ""
[influxdb.tags] [influxdb.tags]
# host = "qubit" # default is the local hostname # host = "qubit" # default is the local hostname
# arbitrary = "tags" # are allowed # arbitrary = "tags" # are allowed

View File

@@ -1,276 +0,0 @@
use async_trait::async_trait;
use chimemon::{ChimemonSource, ChimemonSourceChannel, Config};
use chrony_candm::reply::{self, ReplyBody, SourceMode};
use chrony_candm::request::{self, RequestBody};
use chrony_candm::{blocking_query, ClientOptions};
use influxdb2::models::DataPoint;
use log::{info, warn};
use std::net::{SocketAddr, ToSocketAddrs};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::join;
pub struct ChronyClient {
pub server: SocketAddr,
client_options: ClientOptions,
config: Config,
}
fn datapoint_from_tracking(
t: &reply::Tracking,
config: &Config,
) -> Result<DataPoint, Box<dyn std::error::Error>> {
let now = SystemTime::now().duration_since(UNIX_EPOCH)?;
let measurement = config.sources.chrony.measurement_prefix.to_owned()
+ &config.sources.chrony.tracking_measurement;
let mut builder =
DataPoint::builder(&measurement).timestamp(now.as_nanos().try_into().unwrap());
for (key, value) in &config.influxdb.tags {
builder = builder.tag(key, value);
}
let point = builder
.field("ref_id", t.ref_id as i64)
.field("ref_ip_addr", t.ip_addr.to_string())
.field("stratum", t.stratum as i64)
.field("leap_status", t.leap_status as i64)
.field("current_correction", f64::from(t.current_correction))
.field("last_offset", f64::from(t.last_offset))
.field("rms_offset", f64::from(t.rms_offset))
.field("freq_ppm", f64::from(t.freq_ppm))
.field("resid_freq_ppm", f64::from(t.resid_freq_ppm))
.field("skew_ppm", f64::from(t.skew_ppm))
.field("root_delay", f64::from(t.root_delay))
.field("root_dispersion", f64::from(t.root_dispersion))
.field("last_update_interval", f64::from(t.last_update_interval))
.build()?;
Ok(point)
}
pub fn datapoint_from_sourcedata(
d: &reply::SourceData,
config: &Config,
) -> Result<DataPoint, Box<dyn std::error::Error>> {
let now = SystemTime::now().duration_since(UNIX_EPOCH)?;
let measurement = config.sources.chrony.measurement_prefix.to_owned()
+ &config.sources.chrony.sources_measurement;
let mut builder =
DataPoint::builder(&measurement).timestamp(now.as_nanos().try_into().unwrap());
for (key, value) in &config.influxdb.tags {
builder = builder.tag(key, value)
}
builder = builder
.tag("ref_id", d.ip_addr.to_string())
.tag(
"mode",
match d.mode {
SourceMode::Client => String::from("server"),
SourceMode::Peer => String::from("peer"),
SourceMode::Ref => String::from("refclock"),
},
)
.tag(
"state",
match d.state {
reply::SourceState::Selected => String::from("best"),
reply::SourceState::NonSelectable => String::from("unusable"),
reply::SourceState::Falseticker => String::from("falseticker"),
reply::SourceState::Jittery => String::from("jittery"),
reply::SourceState::Unselected => String::from("combined"),
reply::SourceState::Selectable => String::from("unused"),
},
)
.field("poll", d.poll as i64)
.field("stratum", d.stratum as i64)
.field("flags", d.flags.bits() as i64)
.field("reachability", d.reachability.count_ones() as i64)
.field("since_sample", d.since_sample as i64)
.field("orig_latest_meas", f64::from(d.orig_latest_meas))
.field("latest_meas", f64::from(d.latest_meas))
.field("latest_meas_err", f64::from(d.latest_meas_err));
let point = builder.build()?;
Ok(point)
}
impl ChronyClient {
pub fn new(config: Config) -> Self {
let server = config
.sources
.chrony
.host
.to_socket_addrs()
.unwrap()
.next()
.expect("Unable to parse host:port:");
let client_options = ClientOptions {
n_tries: 3,
timeout: Duration::from_secs(config.sources.chrony.timeout),
};
ChronyClient {
server,
client_options,
config,
}
}
async fn query(&self, request: RequestBody) -> Result<reply::Reply, std::io::Error> {
let server = self.server;
let client_options = self.client_options;
tokio::task::spawn_blocking(move || blocking_query(request, client_options, &server))
.await
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!("Error joining thread: {}", e),
)
})?
}
pub async fn get_tracking(&self) -> Result<reply::Tracking, std::io::Error> {
let reply = self.query(RequestBody::Tracking).await?;
match reply.body {
ReplyBody::Tracking(tracking) => Ok(tracking),
_ => Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"Unexpected response type",
)),
}
}
pub async fn get_sources(&self) -> Result<Vec<reply::SourceData>, std::io::Error> {
let reply = self.query(RequestBody::NSources).await?;
let nsources = match reply.body {
ReplyBody::NSources(ns) => Ok(i32::try_from(ns.n_sources).unwrap()),
_ => Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"Unexpected response type",
)),
}?;
let mut res = Vec::with_capacity(
nsources
.try_into()
.expect("Ridiculously unconvertible number of sources"),
);
for x in 0..nsources {
res.push(self.get_source(x).await?);
}
Ok(res)
}
async fn get_source(&self, index: i32) -> Result<reply::SourceData, std::io::Error> {
let reply = self
.query(RequestBody::SourceData(request::SourceData { index }))
.await?;
let sourcedata = match reply.body {
ReplyBody::SourceData(sourcedata) => Ok(sourcedata),
_ => Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"Invalid response",
)),
}?;
// if sourcedata.mode == SourceMode::Ref {
// // Get the name if it's a refclock
// let reply = timeout(
// self.timeout,
// self.client.query(
// RequestBody::NtpSourceName(request::NtpSourceName { ip_addr: sourcedata.ip_addr }),
// self.server,
// ),
// )
// .await??;
// let sourcename = match reply.body {
// ReplyBody::NtpSourceName(sourcename) => Ok(sourcename),
// _ => Err(std::io::Error::new(
// std::io::ErrorKind::InvalidData,
// "Invalid response",
// )),
// }?;
// sourcedata.ip_addr = sourcename;
// }
Ok(sourcedata)
}
async fn tracking_poll(
&self,
chan: &ChimemonSourceChannel,
) -> Result<(), Box<dyn std::error::Error>> {
let tracking = self.get_tracking().await?;
let tracking_data = datapoint_from_tracking(&tracking, &self.config)?;
info!("Sending tracking data");
chan.send(tracking_data.into())
.expect("Unable to send tracking data to targets");
Ok(())
}
async fn sources_poll(
&self,
chan: &ChimemonSourceChannel,
) -> Result<(), Box<dyn std::error::Error>> {
let sources = self.get_sources().await?;
let mut dps = Vec::with_capacity(sources.len());
for ds in sources {
let source_data = datapoint_from_sourcedata(&ds, &self.config)?;
dps.push(source_data);
}
info!("Sending source data");
chan.send(dps.into())
.expect("Unable to send source data to targets");
Ok(())
}
}
#[async_trait]
impl ChimemonSource for ChronyClient {
async fn run(self, chan: ChimemonSourceChannel) {
info!("Chrony task started");
let mut t_interval = tokio::time::interval(Duration::from_secs(
self.config.sources.chrony.tracking_interval,
));
let mut s_interval = tokio::time::interval(Duration::from_secs(
self.config.sources.chrony.sources_interval,
));
let t_future = async {
let lchan = chan.clone();
loop {
t_interval.tick().await;
match self.tracking_poll(&lchan).await {
Ok(_) => (),
Err(e) => {
warn!("Error in chrony task: {}", e.to_string());
}
}
}
};
let s_future = async {
let lchan = chan.clone();
loop {
s_interval.tick().await;
match self.sources_poll(&lchan).await {
Ok(_) => (),
Err(e) => {
warn!("Error in chrony task: {}", e.to_string());
}
}
}
};
join!(t_future, s_future);
}
}

View File

@@ -1,74 +0,0 @@
use async_trait::async_trait;
use chimemon::{ChimemonMessage, ChimemonTarget, ChimemonTargetChannel, ChronySockConfig};
use libc::{c_double, c_int, timeval};
use log::debug;
use std::mem;
use std::os::unix::net::UnixDatagram;
use std::path::PathBuf;
const CHRONY_MAGIC: c_int = 0x534f434b;
pub struct ChronySockServer {
sock_path: PathBuf,
}
#[repr(C)]
#[derive(Debug)]
pub struct ChronyTimeReport {
tv: timeval,
offset: c_double,
pulse: c_int,
leap: c_int,
_pad: c_int,
magic: c_int,
}
impl ChronySockServer {
pub fn new(config: ChronySockConfig) -> Self {
ChronySockServer {
sock_path: config.sock.into(),
}
}
}
#[async_trait]
impl ChimemonTarget for ChronySockServer {
async fn run(mut self, mut chan: ChimemonTargetChannel) {
loop {
let msg = chan.recv().await.unwrap();
match msg {
ChimemonMessage::TimeReport(tr) => {
if tr.valid {
{
let frame = ChronyTimeReport {
tv: timeval {
tv_sec: TryInto::<libc::time_t>::try_into(
tr.system_time.timestamp(),
)
.unwrap(),
tv_usec: tr.system_time.timestamp_subsec_micros()
as libc::suseconds_t,
},
offset: tr.offset.num_nanoseconds().unwrap() as f64 / 1e9,
leap: if tr.leap_flag { 1 } else { 0 },
pulse: 0,
_pad: 0,
magic: CHRONY_MAGIC,
};
let bs = unsafe {
std::slice::from_raw_parts(
(&frame as *const ChronyTimeReport) as *const u8,
mem::size_of::<ChronyTimeReport>(),
)
};
debug!("Sending to chrony sock {:#?}", frame);
let sock = UnixDatagram::unbound().unwrap();
sock.send_to(bs, &self.sock_path).unwrap();
}
}
}
_ => continue,
}
}
}
}

214
src/config.rs Normal file
View File

@@ -0,0 +1,214 @@
use figment::{Provider, providers::Serialized, util::map, value::Map};
use gethostname::gethostname;
use serde_derive::{Deserialize, Serialize};
use serde_with::{DurationSeconds, serde_as};
#[derive(Serialize, Deserialize, Clone)]
#[serde(default)]
pub struct InfluxConfig {
pub enabled: bool,
pub timeout: std::time::Duration,
pub url: String,
pub org: String,
pub bucket: String,
pub token: String,
pub tags: Map<String, String>,
}
impl Default for InfluxConfig {
fn default() -> Self {
let host = gethostname().into_string().unwrap();
InfluxConfig {
enabled: false,
timeout: std::time::Duration::from_secs(10),
url: "http://localhost:8086".into(),
org: "default".into(),
bucket: "default".into(),
token: "".into(),
tags: map! { "host".into() => host },
}
}
}
#[serde_as]
#[derive(Serialize, Deserialize, Clone)]
#[serde(default)]
pub struct ChronyConfig {
pub enabled: bool,
#[serde_as(as = "DurationSeconds<u64>")]
pub timeout: std::time::Duration,
#[serde_as(as = "DurationSeconds<u64>")]
pub tracking_interval: std::time::Duration,
#[serde_as(as = "DurationSeconds<u64>")]
pub sources_interval: std::time::Duration,
pub measurement_prefix: String,
pub tracking_measurement: String,
pub sources_measurement: String,
pub host: String,
}
impl Default for ChronyConfig {
fn default() -> Self {
ChronyConfig {
enabled: false,
timeout: std::time::Duration::from_secs(5),
tracking_interval: std::time::Duration::from_secs(60),
sources_interval: std::time::Duration::from_secs(300),
measurement_prefix: "chrony.".into(),
tracking_measurement: "tracking".into(),
sources_measurement: "sources".into(),
host: "127.0.0.1:323".into(),
}
}
}
#[derive(Serialize, Deserialize, Clone)]
#[serde(default)]
pub struct ChronySockConfig {
pub enabled: bool,
pub sock: String,
}
impl Default for ChronySockConfig {
fn default() -> Self {
ChronySockConfig {
enabled: false,
sock: "".into(),
}
}
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct HwmonSensorConfig {
pub device: String,
pub sensor: String,
}
#[serde_as]
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(default)]
pub struct HwmonConfig {
pub enabled: bool,
#[serde_as(as = "DurationSeconds<u64>")]
pub interval: std::time::Duration,
pub measurement: String,
pub sensors: Map<String, HwmonSensorConfig>,
}
impl Default for HwmonConfig {
fn default() -> Self {
HwmonConfig {
enabled: false,
interval: std::time::Duration::from_secs(60),
measurement: "hwmon".into(),
sensors: map! {},
}
}
}
#[serde_as]
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(default)]
pub struct GpsdConfig {
pub enabled: bool,
#[serde_as(as = "DurationSeconds<u64>")]
pub interval: std::time::Duration,
pub host: String,
}
impl Default for GpsdConfig {
fn default() -> Self {
GpsdConfig {
enabled: false,
interval: std::time::Duration::from_secs(60),
host: "localhost:2947".into(),
}
}
}
#[serde_as]
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(default)]
pub struct Prs10Config {
pub enabled: bool,
pub port: String,
pub baud: u32,
#[serde_as(as = "DurationSeconds<u64>")]
pub timeout: std::time::Duration,
#[serde_as(as = "DurationSeconds<u64>")]
pub status_interval: std::time::Duration,
#[serde_as(as = "DurationSeconds<u64>")]
pub stats_interval: std::time::Duration,
}
impl Default for Prs10Config {
fn default() -> Self {
Prs10Config {
enabled: false,
port: "/dev/ttyS0".into(),
baud: 9600,
timeout: std::time::Duration::from_secs(1),
status_interval: std::time::Duration::from_secs(10),
stats_interval: std::time::Duration::from_secs(30),
}
}
}
#[serde_as]
#[derive(Serialize, Deserialize, Clone)]
#[serde(default)]
pub struct UCCMConfig {
pub enabled: bool,
pub port: String,
pub baud: u32,
#[serde_as(as = "DurationSeconds<u64>")]
pub status_interval: std::time::Duration,
#[serde_as(as = "DurationSeconds<u64>")]
pub timeout: std::time::Duration,
pub measurement: String,
}
impl Default for UCCMConfig {
fn default() -> Self {
UCCMConfig {
enabled: false,
port: "/dev/ttyS0".into(),
baud: 57600,
status_interval: std::time::Duration::from_secs(10),
timeout: std::time::Duration::from_secs(1),
measurement: "uccm_gpsdo".into(),
}
}
}
#[derive(Serialize, Deserialize, Clone)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum SourceConfig {
Chrony(ChronyConfig),
Hwmon(HwmonConfig),
Uccm(UCCMConfig),
Gpsd(GpsdConfig),
Prs10(Prs10Config),
}
#[derive(Serialize, Deserialize, Clone)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum TargetConfig {
ChronySock(ChronySockConfig),
Influxdb(InfluxConfig),
}
#[derive(Serialize, Deserialize, Clone, Default)]
pub struct Config {
pub sources: Map<String, SourceConfig>,
pub targets: Map<String, TargetConfig>,
}
impl Provider for Config {
fn metadata(&self) -> figment::Metadata {
figment::Metadata::named("Default config")
}
fn data(&self) -> Result<Map<figment::Profile, figment::value::Dict>, figment::Error> {
Serialized::defaults(Config::default()).data()
}
}

View File

@@ -1,83 +0,0 @@
use async_trait::async_trait;
use chimemon::{ChimemonSource, ChimemonSourceChannel, Config};
use futures::{stream, StreamExt};
use influxdb2::models::DataPoint;
use log::{debug, info};
use std::{
path::PathBuf,
time::{Duration, SystemTime, UNIX_EPOCH},
};
pub struct HwmonSource {
config: Config,
sensors: Vec<HwmonSensor>,
}
struct HwmonSensor {
path: PathBuf,
device: String,
sensor: String,
name: String,
}
const HWMON_ROOT: &str = "/sys/class/hwmon";
impl HwmonSource {
pub fn new(config: Config) -> Self {
let sensors = config
.sources
.hwmon
.sensors
.iter()
.map(|(k, v)| HwmonSensor {
name: k.into(),
device: v.name.clone(),
sensor: v.sensor.clone(),
path: PathBuf::from(HWMON_ROOT).join(&v.name).join(&v.sensor),
})
.collect();
HwmonSource { config, sensors }
}
async fn get_raw_value(sensor: &HwmonSensor) -> Result<String, std::io::Error> {
tokio::fs::read_to_string(&sensor.path).await
}
}
#[async_trait]
impl ChimemonSource for HwmonSource {
async fn run(self, chan: ChimemonSourceChannel) {
info!("hwmon task started");
let mut interval =
tokio::time::interval(Duration::from_secs(self.config.sources.hwmon.interval));
loop {
interval.tick().await;
let s = stream::iter(&self.sensors).then(|s| async {
let sensor_val = HwmonSource::get_raw_value(s)
.await
.expect("Unable to read sensor");
debug!(
"hwmon {} raw value {}",
s.path.to_string_lossy(),
sensor_val
);
let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
let mut builder = DataPoint::builder(&self.config.sources.hwmon.measurement)
.timestamp(now.as_nanos().try_into().unwrap());
for (key, value) in &self.config.influxdb.tags {
builder = builder.tag(key, value)
}
builder = builder
.tag("name", &s.name)
.tag("sensor", &s.sensor)
.tag("device", &s.device)
.field("value", sensor_val.trim().parse::<i64>().unwrap());
builder.build().unwrap()
});
info!("Writing hwmon data");
chan.send(s.collect::<Vec<DataPoint>>().await.into())
.unwrap();
}
}
}

View File

@@ -1,105 +1,23 @@
pub mod config;
pub mod sources;
pub mod targets;
#[macro_export]
macro_rules! fatal {
($($arg:tt)*) => {{
tracing::error!($($arg)*);
std::process::exit(-1);
}};
}
use async_trait::async_trait; use async_trait::async_trait;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use figment::{
providers::{Format, Serialized, Toml},
util::map,
value::Map,
Figment,
};
use gethostname::gethostname;
use influxdb2::models::DataPoint;
use serde_derive::{Deserialize, Serialize};
use std::path::Path;
use tokio::sync::broadcast::*;
#[derive(Serialize, Deserialize, Clone)] use tokio::sync::broadcast;
pub struct InfluxConfig { use tokio_util::sync::CancellationToken;
pub url: String,
pub org: String,
pub bucket: String,
pub token: String,
pub tags: Map<String, String>,
}
impl Default for InfluxConfig { use std::{fmt::Debug, sync::Arc};
fn default() -> Self {
let host = gethostname().into_string().unwrap();
InfluxConfig {
url: "http://localhost:8086".into(),
org: "default".into(),
bucket: "default".into(),
token: "".into(),
tags: map! { "host".into() => host },
}
}
}
#[derive(Serialize, Deserialize, Clone)]
pub struct ChronyConfig {
pub enabled: bool,
pub timeout: u64,
pub tracking_interval: u64,
pub sources_interval: u64,
pub measurement_prefix: String,
pub tracking_measurement: String,
pub sources_measurement: String,
pub host: String,
}
impl Default for ChronyConfig {
fn default() -> Self {
ChronyConfig {
enabled: false,
timeout: 5,
tracking_interval: 60,
sources_interval: 300,
measurement_prefix: "chrony.".into(),
tracking_measurement: "tracking".into(),
sources_measurement: "sources".into(),
host: "127.0.0.1:323".into(),
}
}
}
#[derive(Serialize, Deserialize, Clone)]
pub struct ChronySockConfig {
pub enabled: bool,
pub sock: String,
}
impl Default for ChronySockConfig {
fn default() -> Self {
ChronySockConfig {
enabled: false,
sock: "".into(),
}
}
}
#[derive(Serialize, Deserialize, Clone)]
pub struct HwmonSensorConfig {
pub name: String,
pub sensor: String,
}
#[derive(Serialize, Deserialize, Clone)]
pub struct HwmonConfig {
pub enabled: bool,
pub interval: u64,
pub measurement: String,
pub sensors: Map<String, HwmonSensorConfig>,
}
impl Default for HwmonConfig {
fn default() -> Self {
HwmonConfig {
enabled: false,
interval: 60,
measurement: "hwmon".into(),
sensors: map! {},
}
}
}
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct TimeReport { pub struct TimeReport {
pub system_time: DateTime<Utc>, pub system_time: DateTime<Utc>,
@@ -109,68 +27,76 @@ pub struct TimeReport {
pub valid: bool, pub valid: bool,
} }
#[derive(Serialize, Deserialize, Clone)] #[derive(Clone, Debug)]
pub struct UCCMConfig { pub enum SourceStatus {
pub enabled: bool, Healthy,
pub port: String, LossOfSignal(Option<String>),
pub baud: u32, LossOfSync(Option<String>),
pub status_interval: std::time::Duration, Other(Option<String>),
pub timeout: std::time::Duration, Unknown,
pub measurement: String,
} }
impl Default for UCCMConfig { #[derive(Copy, Clone, Debug)]
fn default() -> Self { pub enum MetricValue {
UCCMConfig { Int(i64),
enabled: false, Float(f64),
port: "/dev/ttyS0".into(), Bool(bool),
baud: 57600, }
status_interval: std::time::Duration::from_secs(10),
timeout: std::time::Duration::from_secs(1), type MetricTag = (&'static str, String);
measurement: "uccm_gpsdo".into(), type MetricTags = Vec<MetricTag>;
#[derive(Clone, Debug)]
pub struct SourceMetric {
name: &'static str,
value: MetricValue,
}
impl SourceMetric {
pub fn new_int(name: &'static str, value: i64) -> Self {
Self {
name: name,
value: MetricValue::Int(value),
}
}
pub fn new_float(name: &'static str, value: f64) -> Self {
Self {
name: name,
value: MetricValue::Float(value),
}
}
pub fn new_bool(name: &'static str, value: bool) -> Self {
Self {
name: name,
value: MetricValue::Bool(value),
} }
} }
} }
#[derive(Serialize, Deserialize, Clone, Default)] #[derive(Debug)]
pub struct SourcesConfig { pub struct SourceMetricSet {
pub chrony: ChronyConfig, metrics: Vec<SourceMetric>,
pub hwmon: HwmonConfig, tags: Arc<MetricTags>,
pub uccm: UCCMConfig,
} }
#[derive(Serialize, Deserialize, Clone, Default)] pub trait SourceReportDetails: Debug + Send + Sync {
pub struct TargetsConfig { fn to_metrics(&self) -> Vec<SourceMetricSet>;
pub chrony: ChronySockConfig, fn is_healthy(&self) -> bool;
}
#[derive(Serialize, Deserialize, Clone, Default)]
pub struct Config {
pub influxdb: InfluxConfig,
pub sources: SourcesConfig,
pub targets: TargetsConfig,
}
pub fn load_config(filename: &Path) -> Figment {
Figment::from(Serialized::defaults(Config::default())).merge(Toml::file(filename))
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub enum ChimemonMessage { pub struct SourceReport {
DataPoint(DataPoint), pub name: String,
DataPoints(Vec<DataPoint>), pub status: SourceStatus,
TimeReport(TimeReport), pub details: Arc<dyn SourceReportDetails + Send + Sync>,
} }
impl From<DataPoint> for ChimemonMessage { #[derive(Debug, Clone)]
fn from(dp: DataPoint) -> Self { pub enum ChimemonMessage {
ChimemonMessage::DataPoint(dp) TimeReport(TimeReport),
} SourceReport(SourceReport),
}
impl From<Vec<DataPoint>> for ChimemonMessage {
fn from(dps: Vec<DataPoint>) -> Self {
ChimemonMessage::DataPoints(dps)
}
} }
impl From<TimeReport> for ChimemonMessage { impl From<TimeReport> for ChimemonMessage {
@@ -179,15 +105,27 @@ impl From<TimeReport> for ChimemonMessage {
} }
} }
pub type ChimemonSourceChannel = Sender<ChimemonMessage>; impl From<SourceReport> for ChimemonMessage {
pub type ChimemonTargetChannel = Receiver<ChimemonMessage>; fn from(sr: SourceReport) -> Self {
ChimemonMessage::SourceReport(sr)
}
}
pub type ChimemonSourceChannel = broadcast::Sender<ChimemonMessage>;
pub type ChimemonTargetChannel = broadcast::Receiver<ChimemonMessage>;
#[async_trait] #[async_trait]
pub trait ChimemonSource { pub trait ChimemonSource {
async fn run(self, chan: ChimemonSourceChannel); type Config;
const TASK_NAME: &'static str;
fn new(name: &str, config: Self::Config) -> Self;
async fn run(self, chan: ChimemonSourceChannel, cancel: CancellationToken);
} }
#[async_trait] #[async_trait]
pub trait ChimemonTarget { pub trait ChimemonTarget {
async fn run(self, chan: ChimemonTargetChannel); type Config;
const TASK_NAME: &'static str;
fn new(name: &str, config: Self::Config) -> Self;
async fn run(self, mut chan: ChimemonTargetChannel, cancel: CancellationToken);
} }

View File

@@ -1,20 +1,31 @@
mod chrony; use std::sync::Arc;
mod chrony_refclock;
mod hwmon;
mod uccm;
use async_trait::async_trait;
use clap::{Parser, ValueEnum}; use clap::{Parser, ValueEnum};
use env_logger::{self, Env}; use figment::{
Figment,
providers::{Format, Toml},
};
use futures::future::join_all; use futures::future::join_all;
use log::{debug, info}; use tokio::{select, sync::broadcast, task::JoinHandle};
use std::path::Path; use tokio_util::sync::CancellationToken;
use tokio::sync::broadcast; use tracing::{Instrument, debug, error, info, info_span, warn};
use tracing_subscriber::{self, EnvFilter, fmt::format::FmtSpan};
use crate::{chrony::*, chrony_refclock::ChronySockServer, hwmon::HwmonSource, uccm::UCCMMonitor}; use chimemon::{
use chimemon::*; config::{SourceConfig, TargetConfig},
targets::influx::InfluxTarget,
*,
};
use config::Config;
use sources::{
chrony::ChronyClient, gpsd::GpsdSource, hwmon::HwmonSource, prs10::Prs10Monitor,
uccm::UCCMMonitor,
};
use targets::chrony_refclock::ChronySockServer;
const PROGRAM_NAME: &str = "chimemon"; const PROGRAM_NAME: &str = "chimemon";
const VERSION: &str = "0.0.1"; const VERSION: &str = env!("CARGO_PKG_VERSION");
#[derive(ValueEnum, Clone)] #[derive(ValueEnum, Clone)]
enum Level { enum Level {
@@ -24,107 +35,211 @@ enum Level {
Error, Error,
} }
impl From<Level> for tracing::Level {
fn from(level: Level) -> Self {
match level {
Level::Debug => tracing::Level::DEBUG,
Level::Info => tracing::Level::INFO,
Level::Warn => tracing::Level::WARN,
Level::Error => tracing::Level::ERROR,
}
}
}
impl Level {
fn level(self) -> tracing::Level {
self.into()
}
}
#[derive(Parser)] #[derive(Parser)]
struct Args { struct Args {
/// TOML configuration to load /// TOML configuration to load
#[arg(short, long, default_value_t = String::from("config.toml"))] #[arg(short, long, default_value_t = String::from("config.toml"))]
config_file: String, config_file: String,
#[arg(value_enum, default_value_t = Level::Warn)] #[arg(value_enum, default_value_t = Level::Info)]
log_level: Level, log_level: Level,
#[arg(short, long, default_value_t = false)]
echo_task: bool,
} }
#[tokio::main(flavor = "current_thread")] fn run_source(
name: &str,
source: SourceConfig,
chan: ChimemonSourceChannel,
cancel: CancellationToken,
) -> Option<JoinHandle<()>> {
match source {
SourceConfig::Chrony(cfg) if cfg.enabled => {
spawn_source::<ChronyClient>(name, cfg, chan, cancel)
}
SourceConfig::Gpsd(cfg) if cfg.enabled => {
spawn_source::<GpsdSource>(name, cfg, chan, cancel)
}
SourceConfig::Hwmon(cfg) if cfg.enabled => {
spawn_source::<HwmonSource>(name, cfg, chan, cancel)
}
SourceConfig::Prs10(cfg) if cfg.enabled => {
spawn_source::<Prs10Monitor>(name, cfg, chan, cancel)
}
SourceConfig::Uccm(cfg) if cfg.enabled => {
spawn_source::<UCCMMonitor>(name, cfg, chan, cancel)
}
_ => {
debug!("Disabled source {name} skipped");
None
}
}
}
fn run_target(
name: &str,
target: TargetConfig,
chan: ChimemonTargetChannel,
cancel: CancellationToken,
) -> Option<JoinHandle<()>> {
match target {
TargetConfig::ChronySock(cfg) if cfg.enabled => {
spawn_target::<ChronySockServer>(name, cfg, chan, cancel)
}
TargetConfig::Influxdb(cfg) if cfg.enabled => {
spawn_target::<InfluxTarget>(name, cfg, chan, cancel)
}
_ => {
debug!("Disabled target {name} skipped");
None
}
}
}
fn spawn_source<T: ChimemonSource + Send + Sync + 'static>(
name: &str,
config: T::Config,
chan: ChimemonSourceChannel,
cancel: CancellationToken,
) -> Option<JoinHandle<()>> {
let span = info_span!("source", task = name);
let s = T::new(name, config);
Some(tokio::spawn(s.run(chan, cancel).instrument(span)))
}
fn spawn_target<T: ChimemonTarget + Send + Sync + 'static>(
name: &str,
config: T::Config,
chan: ChimemonTargetChannel,
cancel: CancellationToken,
) -> Option<JoinHandle<()>> {
let span = info_span!("target", task = name);
let t = T::new(name, config);
Some(tokio::spawn(async move {
t.run(chan, cancel).instrument(span).await
}))
}
struct EchoTarget {}
struct EchoTargetConfig {}
#[async_trait]
impl ChimemonTarget for EchoTarget {
type Config = EchoTargetConfig;
const TASK_NAME: &'static str = "echo-task";
fn new(_name: &str, _config: Self::Config) -> Self {
EchoTarget {}
}
async fn run(self, mut chan: ChimemonTargetChannel, cancel: CancellationToken) {
info!("Dummy receiver task started");
loop {
let msg = select! {
_ = cancel.cancelled() => {
return
},
msg = chan.recv() => msg
};
match msg {
Ok(ChimemonMessage::SourceReport(report)) => {
let metrics = report.details.to_metrics();
info!("instance: {} metrics: {metrics:?}", report.name);
}
Ok(msg) => {
info!("message: {msg:?}");
}
Err(e) => {
warn!(error = ?e, "Error receiving message");
}
}
}
}
}
#[tokio::main(flavor = "multi_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error>> { async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args = Args::parse(); let args = Args::parse();
let loglevel = args tracing_subscriber::fmt()
.log_level .with_env_filter(
.to_possible_value() EnvFilter::try_from_default_env()
.unwrap() .unwrap_or(EnvFilter::default().add_directive(args.log_level.level().into())),
.get_name() )
.to_owned(); // .event_format(format::Format::default().pretty())
env_logger::Builder::from_env(Env::default().default_filter_or(loglevel)).init(); .with_span_events(FmtSpan::CLOSE)
.init();
info!("{} v{} starting...", PROGRAM_NAME, VERSION); info!("{PROGRAM_NAME} v{VERSION} starting...");
let fig = load_config(Path::new(&args.config_file)); let fig = Figment::new()
debug!("{:?}", fig); .merge(Config::default())
.merge(Toml::file(&args.config_file));
debug!("{fig:?}");
let config: Config = fig.extract()?; let config: Config = fig.extract()?;
let mut tasks = Vec::new(); let mut tasks = Vec::new();
let (tx, _) = broadcast::channel(16); let (sourcechan, _) = broadcast::channel(16);
let sourcechan: ChimemonSourceChannel = tx;
info!( let shutdown_token = CancellationToken::new();
"Connecting to influxdb {} org: {} using token",
&config.influxdb.url, &config.influxdb.org
);
let influx = influxdb2::Client::new(
&config.influxdb.url,
&config.influxdb.org,
&config.influxdb.token,
);
let chrony = if config.sources.chrony.enabled { for (name, target) in config.targets {
Some(ChronyClient::new(config.to_owned())) if let Some(task) = run_target(
} else { &name,
None target,
}; sourcechan.subscribe(),
if let Some(c) = chrony { shutdown_token.clone(),
tasks.push(tokio::spawn(c.run(sourcechan.clone()))); ) {
}; tasks.push(task)
}
}
let hwmon = if config.sources.hwmon.enabled { for (name, source) in config.sources {
Some(HwmonSource::new(config.to_owned())) if let Some(task) = run_source(&name, source, sourcechan.clone(), shutdown_token.clone()) {
} else { tasks.push(task)
None }
}; }
if let Some(hwmon) = hwmon {
tasks.push(tokio::spawn(hwmon.run(sourcechan.clone())));
};
let uccm = if config.sources.uccm.enabled { if tasks.len() == 0 {
info!("Spawning UCCMMonitor"); error!("No tasks configured, exiting.");
Some(UCCMMonitor::new(config.to_owned())) return Ok(()); // not an error, but exit before starting a dummy task
} else { }
info!("UCCMMonitor not configured"); if sourcechan.strong_count() == 0 {
None warn!("No sources configured, no events will be generated");
}; }
if let Some(uccm) = uccm { if sourcechan.receiver_count() == 0 {
tasks.push(tokio::spawn(uccm.run(sourcechan.clone()))); warn!("No targets configured, events will be discarded");
}; }
if args.echo_task || sourcechan.receiver_count() == 0 {
let c = EchoTargetConfig {};
tasks.push(
spawn_target::<EchoTarget>("echo", c, sourcechan.subscribe(), shutdown_token.clone())
.unwrap(),
)
}
let chrony_refclock = if config.targets.chrony.enabled { debug!("Task setup complete, tasks: {}", tasks.len());
Some(ChronySockServer::new(config.targets.chrony.to_owned())) ctrlc::set_handler(move || {
} else { if shutdown_token.is_cancelled() {
None info!("Forced shutdown");
}; std::process::exit(1);
if let Some(chrony_refclock) = chrony_refclock { }
tasks.push(tokio::spawn(chrony_refclock.run(sourcechan.subscribe()))); info!("Shutting down");
}; shutdown_token.cancel()
})
let mut influxrx = sourcechan.subscribe(); .unwrap();
tasks.push(tokio::spawn(async move {
let stream = async_stream::stream! {
while let Ok(msg) = influxrx.recv().await {
match msg { ChimemonMessage::DataPoint(dp) => {
yield dp
}, ChimemonMessage::DataPoints(dps) => {
for p in dps {
yield p
}
}, ChimemonMessage::TimeReport(_tr) => {}
} }
};
influx.write(&config.influxdb.bucket, stream).await.unwrap();
}));
// let mut debugrx = sourcechan.subscribe();
// tasks.push(tokio::spawn(async move {
// loop {
// let v = debugrx.recv().await;
// warn!("streamed: {:?}", v.unwrap());
// }
// }));
join_all(tasks).await; join_all(tasks).await;

321
src/sources/chrony.rs Normal file
View File

@@ -0,0 +1,321 @@
use std::net::{SocketAddr, ToSocketAddrs};
use std::sync::Arc;
use async_trait::async_trait;
use chrony_candm::reply::{self, ReplyBody, SourceMode};
use chrony_candm::request::{self, RequestBody};
use chrony_candm::{ClientOptions, blocking_query};
use tokio::select;
use tokio_util::sync::CancellationToken;
use tracing::{info, warn};
use crate::SourceMetricSet;
use crate::{
ChimemonSource, ChimemonSourceChannel, MetricTags, SourceMetric, SourceReport,
SourceReportDetails, SourceStatus, config::ChronyConfig,
};
pub struct ChronyClient {
pub server: SocketAddr,
pub name: String,
client_options: ClientOptions,
config: ChronyConfig,
}
#[derive(Debug)]
pub struct ChronyTrackingReport {
tags: Arc<MetricTags>,
pub ref_id: i64,
pub ref_ip_addr: String,
pub stratum: i64,
pub leap_status: i64,
pub current_correction: f64,
pub last_offset: f64,
pub rms_offset: f64,
pub freq_ppm: f64,
pub resid_freq_ppm: f64,
pub skew_ppm: f64,
pub root_delay: f64,
pub root_dispersion: f64,
pub last_update_interval: f64,
}
impl SourceReportDetails for ChronyTrackingReport {
fn is_healthy(&self) -> bool {
true
}
fn to_metrics(&self) -> Vec<SourceMetricSet> {
vec![SourceMetricSet {
tags: self.tags.clone(),
metrics: vec![
SourceMetric::new_int("ref_id", self.ref_id),
SourceMetric::new_int("stratum", self.stratum),
SourceMetric::new_int("leap_status", self.leap_status),
SourceMetric::new_float("current_correction", self.current_correction),
SourceMetric::new_float("last_offset", self.last_offset),
SourceMetric::new_float("rms_offset", self.rms_offset),
SourceMetric::new_float("freq_ppm", self.freq_ppm),
SourceMetric::new_float("resid_freq_ppm", self.resid_freq_ppm),
SourceMetric::new_float("skew_ppm", self.skew_ppm),
SourceMetric::new_float("root_delay", self.root_delay),
SourceMetric::new_float("root_dispersion", self.root_dispersion),
SourceMetric::new_float("last_update_interval", self.last_update_interval),
],
}]
}
}
#[derive(Debug)]
pub struct ChronySourcesReport {
pub sources: Vec<reply::SourceData>,
}
impl SourceReportDetails for ChronySourcesReport {
fn is_healthy(&self) -> bool {
//TODO: think about whether there is an idea of unhealthy sources
true
}
fn to_metrics(&self) -> Vec<SourceMetricSet> {
let mut metrics = Vec::with_capacity(self.sources.len());
for source in &self.sources {
let tags = Arc::new(vec![
("ref_id", source.ip_addr.to_string()),
(
"mode",
match source.mode {
SourceMode::Client => String::from("server"),
SourceMode::Peer => String::from("peer"),
SourceMode::Ref => String::from("refclock"),
},
),
(
"state",
match source.state {
reply::SourceState::Selected => String::from("best"),
reply::SourceState::NonSelectable => String::from("unusable"),
reply::SourceState::Falseticker => String::from("falseticker"),
reply::SourceState::Jittery => String::from("jittery"),
reply::SourceState::Unselected => String::from("combined"),
reply::SourceState::Selectable => String::from("unused"),
},
),
]);
metrics.push(SourceMetricSet {
tags: tags,
metrics: vec![
SourceMetric::new_int("poll", source.poll as i64),
SourceMetric::new_int("stratum", source.stratum as i64),
SourceMetric::new_int("flags", source.flags.bits() as i64),
SourceMetric::new_int("reachability", source.reachability.count_ones() as i64),
SourceMetric::new_int("since_sample", source.since_sample as i64),
SourceMetric::new_float("orig_latest_meas", source.orig_latest_meas.into()),
SourceMetric::new_float("latest_meas", source.latest_meas.into()),
SourceMetric::new_float("latest_meas_err", source.latest_meas_err.into()),
],
});
}
metrics
}
}
fn report_from_tracking(
t: &reply::Tracking,
config: &ChronyConfig,
) -> Result<ChronyTrackingReport, Box<dyn std::error::Error>> {
let report = ChronyTrackingReport {
tags: Arc::new(vec![]), //TODO: allow configuring tags in the source
ref_id: t.ref_id as i64,
ref_ip_addr: t.ip_addr.to_string(),
stratum: t.stratum as i64,
leap_status: t.leap_status as i64,
current_correction: t.current_correction.into(),
last_offset: t.last_offset.into(),
rms_offset: t.rms_offset.into(),
freq_ppm: t.freq_ppm.into(),
resid_freq_ppm: t.resid_freq_ppm.into(),
skew_ppm: t.skew_ppm.into(),
root_delay: t.root_delay.into(),
root_dispersion: t.root_dispersion.into(),
last_update_interval: t.last_update_interval.into(),
};
Ok(report)
}
impl ChronyClient {
async fn query(&self, request: RequestBody) -> Result<reply::Reply, std::io::Error> {
let server = self.server;
let client_options = self.client_options;
tokio::task::spawn_blocking(move || blocking_query(request, client_options, &server))
.await
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!("Error joining thread: {}", e),
)
})?
}
pub async fn get_tracking(&self) -> Result<reply::Tracking, std::io::Error> {
let reply = self.query(RequestBody::Tracking).await?;
match reply.body {
ReplyBody::Tracking(tracking) => Ok(tracking),
_ => Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"Unexpected response type",
)),
}
}
pub async fn get_sources(&self) -> Result<Vec<reply::SourceData>, std::io::Error> {
let reply = self.query(RequestBody::NSources).await?;
let nsources = match reply.body {
ReplyBody::NSources(ns) => Ok(i32::try_from(ns.n_sources).unwrap()),
_ => Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"Unexpected response type",
)),
}?;
let mut res = Vec::with_capacity(
nsources
.try_into()
.expect("Ridiculously unconvertible number of sources"),
);
for x in 0..nsources {
res.push(self.get_source(x).await?);
}
Ok(res)
}
async fn get_source(&self, index: i32) -> Result<reply::SourceData, std::io::Error> {
let reply = self
.query(RequestBody::SourceData(request::SourceData { index }))
.await?;
let sourcedata = match reply.body {
ReplyBody::SourceData(sourcedata) => Ok(sourcedata),
_ => Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"Invalid response",
)),
}?;
// if sourcedata.mode == SourceMode::Ref {
// // Get the name if it's a refclock
// let reply = timeout(
// self.timeout,
// self.client.query(
// RequestBody::NtpSourceName(request::NtpSourceName { ip_addr: sourcedata.ip_addr }),
// self.server,
// ),
// )
// .await??;
// let sourcename = match reply.body {
// ReplyBody::NtpSourceName(sourcename) => Ok(sourcename),
// _ => Err(std::io::Error::new(
// std::io::ErrorKind::InvalidData,
// "Invalid response",
// )),
// }?;
// sourcedata.ip_addr = sourcename;
// }
Ok(sourcedata)
}
async fn tracking_poll(
&self,
chan: &ChimemonSourceChannel,
) -> Result<(), Box<dyn std::error::Error>> {
let tracking = self.get_tracking().await?;
let tracking_data = report_from_tracking(&tracking, &self.config)?;
let report = SourceReport {
name: self.name.clone(),
status: SourceStatus::Unknown,
details: Arc::new(tracking_data),
};
info!("Sending tracking data");
chan.send(report.into())?;
Ok(())
}
async fn sources_poll(
&self,
chan: &ChimemonSourceChannel,
) -> Result<(), Box<dyn std::error::Error>> {
let sources = self.get_sources().await?;
let details = ChronySourcesReport { sources };
let report = SourceReport {
name: self.name.clone(),
status: SourceStatus::Unknown,
details: Arc::new(details),
};
info!("Sending source data");
chan.send(report.into())?;
Ok(())
}
}
#[async_trait]
impl ChimemonSource for ChronyClient {
type Config = ChronyConfig;
const TASK_NAME: &'static str = "chrony-task";
fn new(name: &str, config: Self::Config) -> Self {
let server = config
.host
.to_socket_addrs()
.unwrap()
.next()
.expect("Unable to parse host:port:");
let client_options = ClientOptions {
n_tries: 3,
timeout: config.timeout,
};
ChronyClient {
name: name.to_owned(),
server,
client_options,
config,
}
}
async fn run(self, chan: ChimemonSourceChannel, cancel: CancellationToken) {
info!("Chrony task started");
let mut t_interval = tokio::time::interval(self.config.tracking_interval);
let mut s_interval = tokio::time::interval(self.config.sources_interval);
loop {
select! {
_ = cancel.cancelled() => {
return
},
_ = t_interval.tick() => {
match self.tracking_poll(&chan).await {
Ok(_) => (),
Err(e) => {
warn!(error = ?e, "Error in chrony tracking task");
}
}
},
_ = s_interval.tick() => {
match self.sources_poll(&chan).await {
Ok(_) => (),
Err(e) => {
warn!(error = ?e, "Error in chrony sources task");
}
}
}
}
}
}
}

473
src/sources/gpsd.rs Normal file
View File

@@ -0,0 +1,473 @@
use std::collections::HashMap;
use std::f64;
use std::fmt::Debug;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use backoff::ExponentialBackoff;
use futures::{SinkExt, Stream, StreamExt};
use gpsd_proto::{Device, Gst, Mode, Pps, Sky, Tpv, UnifiedResponse, Version};
use serde::Serialize;
use serde_json;
use tokio::net::{TcpStream, ToSocketAddrs, lookup_host};
use tokio::time::{interval, timeout};
use tokio_util::codec::{Framed, LinesCodec};
use tokio_util::sync::CancellationToken;
use tracing::{debug, debug_span, error, info, instrument, warn};
use crate::SourceMetricSet;
use crate::{
ChimemonSource, ChimemonSourceChannel, SourceMetric, SourceReport, SourceReportDetails,
SourceStatus, config::GpsdConfig,
};
pub struct GpsdSource {
pub name: String,
pub config: GpsdConfig,
conn: GpsdTransport,
devices: HashMap<String, Device>,
last_gst: Option<Gst>,
last_pps: Option<Pps>,
last_tpv: Option<Tpv>,
last_sky: Option<Sky>,
}
#[derive(Eq, PartialEq, Clone, Copy, Debug)]
pub enum GpsdFixType {
Unknown,
NoFix,
Fix2D,
Fix3D,
Surveyed,
}
impl From<u32> for GpsdFixType {
fn from(value: u32) -> Self {
match value {
0 => Self::Unknown,
1 => Self::NoFix,
2 => Self::Fix2D,
3 => Self::Fix3D,
_ => panic!("Invalid fix mode {value}"),
}
}
}
impl From<Mode> for GpsdFixType {
fn from(value: Mode) -> Self {
match value {
Mode::NoFix => Self::NoFix,
Mode::Fix2d => Self::Fix2D,
Mode::Fix3d => Self::Fix3D,
}
}
}
#[derive(Clone, Debug)]
pub struct GpsdSourceReport {
fix_type: GpsdFixType,
sats_visible: u8,
sats_tracked: u8,
tdop: f64,
}
impl SourceReportDetails for GpsdSourceReport {
fn is_healthy(&self) -> bool {
self.fix_type != GpsdFixType::Unknown && self.fix_type != GpsdFixType::NoFix
}
fn to_metrics(&self) -> Vec<SourceMetricSet> {
let tags = Arc::new(vec![]);
vec![SourceMetricSet {
tags,
metrics: vec![
SourceMetric::new_int("sats_visible", self.sats_visible as i64),
SourceMetric::new_int("sats_tracked", self.sats_tracked as i64),
SourceMetric::new_float("tdop", self.tdop),
],
}]
}
}
impl GpsdSource {
async fn inner_new(name: &str, config: GpsdConfig) -> Result<Self, std::io::Error> {
let conn = GpsdTransport::new(&config.host).await?;
Ok(Self {
name: name.to_owned(),
config,
conn,
devices: HashMap::new(),
last_gst: None,
last_pps: None,
last_tpv: None,
last_sky: None,
})
}
async fn send_status(&self, chan: &mut ChimemonSourceChannel) {
let sky = self.last_sky.as_ref();
let tpv = self.last_tpv.as_ref();
let (sats_tracked, sats_visible) = sky.map_or((0, 0), |sky| {
let sats = sky.satellites.as_deref().unwrap_or_default();
(
sats.iter().filter(|s| s.used).count() as u8,
sats.len() as u8,
)
});
let tdop = sky
.and_then(|sky| sky.tdop)
.map_or(f64::INFINITY, |tdop| tdop as f64);
if let Err(e) = chan.send(
SourceReport {
name: self.name.clone(),
status: SourceStatus::Unknown,
details: Arc::new(GpsdSourceReport {
fix_type: tpv.map_or(GpsdFixType::Unknown, |tpv| tpv.mode.into()),
sats_tracked,
sats_visible,
tdop,
}),
}
.into(),
) {
error!(error = ?e, "Unable to send to channel")
}
}
fn handle_msg(&mut self, msg: String) -> Result<(), Box<dyn std::error::Error>> {
let _span = debug_span!("handle_msg").entered();
let parsed = serde_json::from_str::<UnifiedResponse>(&msg)?;
debug!("Received {parsed:?}");
match parsed {
UnifiedResponse::Device(d) => {
if let Some(path) = &d.path {
self.devices.insert(path.to_owned(), d);
} else {
warn!("No path on DEVICE response, ignoring.");
}
}
UnifiedResponse::Gst(g) => self.last_gst = Some(g),
UnifiedResponse::Pps(p) => self.last_pps = Some(p),
UnifiedResponse::Sky(s) => {
self.last_sky = Some({
let mut s = s;
if s.satellites.is_none() {
s.satellites = self.last_sky.as_mut().and_then(|old| old.satellites.take());
}
s
})
}
UnifiedResponse::Tpv(t) => self.last_tpv = Some(t),
_ => warn!("Unhandled response `{parsed:?}`"),
}
Ok(())
}
}
#[async_trait]
impl ChimemonSource for GpsdSource {
type Config = GpsdConfig;
const TASK_NAME: &'static str = "gpsd-task";
fn new(name: &str, config: Self::Config) -> Self {
// TODO: refactor so this mess isn't necessary
// Should do async setup at the start of run(), not here
let rt = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();
rt.block_on(Self::inner_new(name, config)).unwrap()
}
async fn run(mut self, mut chan: ChimemonSourceChannel, cancel: CancellationToken) {
info!("gpsd task started");
self.conn.conn().await.unwrap();
let mut ticker = interval(self.config.interval);
let mut params = WatchParams::default();
params.json = Some(true);
self.conn
.cmd_response(&GpsdCommand::Watch(Some(params)))
.await
.unwrap();
loop {
let framed = self.conn.framed.as_mut().expect("must be connected");
tokio::select! {
_ = cancel.cancelled() => {
return
},
_ = ticker.tick() => {
self.send_status(&mut chan).await
},
maybe_msg = framed.next() => {
if let Some(Ok(msg)) = maybe_msg {
self.handle_msg(msg).unwrap()
}
}
}
}
}
}
#[derive(Debug)]
struct GpsdTransport {
host: SocketAddr,
framed: Option<Framed<TcpStream, LinesCodec>>,
conn_backoff: ExponentialBackoff,
}
#[derive(Clone, Copy, Debug, Serialize)]
#[repr(u8)]
pub enum RawMode {
Off = 0,
RawHex = 1,
RawBin = 2,
}
#[derive(Clone, Copy, Debug, Serialize)]
#[repr(u8)]
pub enum NativeMode {
Nmea = 0,
Alt = 1,
}
#[derive(Clone, Copy, Debug, Serialize)]
#[repr(u8)]
pub enum ParityMode {
None = b'N',
Odd = b'O',
Even = b'E',
}
#[derive(Clone, Debug, Serialize)]
struct WatchParams {
#[serde(skip_serializing_if = "Option::is_none")]
enable: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
json: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
nmea: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
raw: Option<RawMode>,
#[serde(skip_serializing_if = "Option::is_none")]
scaled: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
split24: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pps: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
device: Option<String>,
}
impl Default for WatchParams {
fn default() -> Self {
WatchParams {
enable: Some(true),
json: Some(false),
nmea: None,
raw: None,
scaled: None,
split24: None,
pps: None,
device: None,
}
}
}
#[derive(Clone, Debug, Serialize)]
struct DeviceParams {
#[serde(skip_serializing_if = "Option::is_none")]
bps: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
cycle: Option<f64>,
#[serde(skip_serializing_if = "Option::is_none")]
flags: Option<u8>,
#[serde(skip_serializing_if = "Option::is_none")]
hexdata: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
native: Option<NativeMode>,
#[serde(skip_serializing_if = "Option::is_none")]
parity: Option<ParityMode>,
#[serde(skip_serializing_if = "Option::is_none")]
path: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
readonly: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
sernum: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
stopbits: Option<u8>,
}
#[derive(Clone, Debug)]
enum GpsdCommand {
Version, // no params
Devices, // no params
Watch(Option<WatchParams>),
Poll, // I don't understand the protocol for this one
Device(Option<DeviceParams>),
}
impl GpsdCommand {
fn command_string(&self) -> &str {
match self {
Self::Version => "?VERSION",
Self::Devices => "?DEVICES",
Self::Watch(_) => "?WATCH",
Self::Poll => "?POLL",
Self::Device(_) => "?DEVICE",
}
}
fn expected_responses(&self) -> usize {
match self {
Self::Version => 1,
Self::Devices => 1,
Self::Watch(_) => 2,
Self::Poll => 1,
Self::Device(_) => 1,
}
}
}
impl ToString for GpsdCommand {
fn to_string(&self) -> String {
let s = self.command_string().to_owned();
match self {
Self::Version | Self::Devices | Self::Poll | Self::Watch(None) | Self::Device(None) => {
s + ";"
}
Self::Watch(Some(w)) => s + "=" + &serde_json::to_string(w).unwrap() + ";",
Self::Device(Some(d)) => s + "=" + &serde_json::to_string(d).unwrap() + ";",
}
}
}
impl GpsdTransport {
async fn new<T: ToSocketAddrs + Debug>(host: &T) -> Result<Self, std::io::Error> {
// TODO: implement proper handling of multiple responses
let host_addr = lookup_host(host).await?.next().ok_or(std::io::Error::new(
std::io::ErrorKind::NotFound,
format!("No response looking up `{:?}`", host),
))?;
Ok(Self {
host: host_addr,
framed: None,
conn_backoff: ExponentialBackoff::default(),
})
}
async fn connect_inner(
&self,
) -> Result<Framed<TcpStream, LinesCodec>, Box<dyn std::error::Error>> {
info!("Connecting to gpsd @ {}", self.host);
let mut framed = Framed::new(TcpStream::connect(self.host).await?, LinesCodec::new());
debug!("Waiting for initial VERSION");
match timeout(Duration::from_secs(5), framed.next()).await {
Ok(Some(Ok(r))) => {
if let Ok(version) = serde_json::from_str::<Version>(&r) {
info!(
"Connected to gpsd @ {}, release {}",
self.host, version.release
);
} else {
warn!("Got unexpected non-VERSION response after connection (`{r}`)");
}
Ok(framed)
}
_ => Err("Unexpected failure to receive initial handshake response".into()),
}
}
async fn conn(
&mut self,
) -> Result<&mut Framed<TcpStream, LinesCodec>, Box<dyn std::error::Error>> {
if self.framed.is_none() {
let framed = backoff::future::retry_notify(
self.conn_backoff.clone(),
|| async {
self.connect_inner()
.await
.map_err(backoff::Error::transient)
},
|e, d| warn!("Failed to connect to {} after {:?}: `{}`", self.host, d, e),
)
.await?;
self.framed = Some(framed);
}
Ok(self.framed.as_mut().unwrap())
}
#[instrument(level = "debug", skip_all, fields(cmd = cmd.to_string()))]
async fn cmd_response(
&mut self,
cmd: &GpsdCommand,
) -> Result<Vec<UnifiedResponse>, Box<dyn std::error::Error>> {
let mut responses = Vec::new();
let conn = self.conn().await?;
conn.send(cmd.to_string()).await?;
for _ in 0..cmd.expected_responses() {
match conn.next().await {
None => return Err("Connection lost".into()),
Some(Err(e)) => return Err(format!("Unable to parse response {e}").into()),
Some(Ok(r)) => {
debug!("Raw response: `{r}`");
responses.push(serde_json::from_str::<UnifiedResponse>(&r)?)
}
}
}
Ok(responses)
}
async fn stream(
&mut self,
) -> Result<
impl Stream<Item = Result<UnifiedResponse, Box<dyn std::error::Error>>>,
Box<dyn std::error::Error>,
> {
Ok(self
.conn()
.await?
.map(|line| Ok(serde_json::from_str::<UnifiedResponse>(&line?)?)))
}
}
mod tests {
use gpsd_proto::{ResponseData, UnifiedResponse};
use tokio_stream::StreamExt;
use crate::sources::gpsd::{GpsdCommand, GpsdTransport, WatchParams};
use std::sync::Once;
static INIT: Once = Once::new();
fn init_logger() {
INIT.call_once(|| tracing_subscriber::fmt::init());
}
#[tokio::test]
async fn test_gpsd() {
init_logger();
let mut gpsd = GpsdTransport::new(&"192.168.65.93:2947").await.unwrap();
gpsd.conn().await.unwrap();
let mut params = WatchParams::default();
params.enable = Some(true);
params.json = Some(true);
let res = {
gpsd.cmd_response(&GpsdCommand::Watch(Some(params)))
.await
.unwrap()
};
println!("{res:?}");
while let Some(Ok(s)) = gpsd.framed.as_mut().unwrap().next().await {
let res2 = serde_json::from_str::<UnifiedResponse>(&s);
println!("{res2:?}");
}
}
}

169
src/sources/hwmon.rs Normal file
View File

@@ -0,0 +1,169 @@
use std::{fs::File, io::Read, path::PathBuf, sync::Arc};
use async_trait::async_trait;
use tokio::select;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
use crate::{
ChimemonSource, ChimemonSourceChannel, MetricTags, SourceMetric, SourceMetricSet, SourceReport,
SourceReportDetails, SourceStatus, config::HwmonConfig,
};
pub struct HwmonSource {
name: String,
config: HwmonConfig,
sensors: Vec<Arc<HwmonSensor>>,
}
#[derive(Debug, Clone)]
struct HwmonSensor {
name: String,
value_path: PathBuf,
device: String,
sensor: String,
label: Option<String>,
tags: Arc<MetricTags>,
}
impl HwmonSensor {
fn new(name: &str, device: &str, sensor: &str) -> Self {
let value_path = PathBuf::from(HWMON_ROOT)
.join(device)
.join(sensor.to_owned() + "_input");
let label_path_raw = PathBuf::from(HWMON_ROOT)
.join(device)
.join(sensor.to_owned() + "_label");
let label = if label_path_raw.is_file() {
let mut f =
File::open(&label_path_raw).expect(&format!("Unable to open `{label_path_raw:?}`"));
let mut label = String::new();
f.read_to_string(&mut label)
.expect(&format!("Unable to read from `{label_path_raw:?}"));
Some(label.trim().to_owned())
} else {
None
};
let mut tags_vec = vec![
("name", name.to_owned()),
("device", device.to_owned()),
("sensor", sensor.to_owned()),
];
if let Some(label) = &label {
tags_vec.push(("label", label.clone()))
}
Self {
value_path,
label,
device: device.to_owned(),
sensor: sensor.to_owned(),
name: name.to_owned(),
tags: Arc::new(tags_vec),
}
}
}
#[derive(Debug)]
struct HwmonReport {
values: Vec<(Arc<HwmonSensor>, f64)>,
}
impl SourceReportDetails for HwmonReport {
fn is_healthy(&self) -> bool {
//self.alarms.iter().any(|(_sensor, alarm)| *alarm)
true
}
fn to_metrics(&self) -> Vec<SourceMetricSet> {
let mut metrics = Vec::new();
for (sensor, value) in &self.values {
metrics.push(SourceMetricSet {
tags: sensor.tags.clone(),
metrics: vec![SourceMetric::new_float("hwmon_value", *value)],
})
}
// for (sensor, alarm) in &self.alarms {
// metrics.push(SourceMetric::new_bool(
// "hwmon_alarm",
// *alarm,
// sensor.tags.clone(),
// ))
// }
metrics
}
}
const HWMON_ROOT: &str = "/sys/class/hwmon";
impl HwmonSource {
async fn get_raw_value(sensor: &HwmonSensor) -> Result<String, std::io::Error> {
tokio::fs::read_to_string(&sensor.value_path).await
}
}
#[async_trait]
impl ChimemonSource for HwmonSource {
type Config = HwmonConfig;
const TASK_NAME: &'static str = "hwmon-task";
fn new(name: &str, config: Self::Config) -> Self {
let sensors = config
.sensors
.iter()
.map(|(k, v)| Arc::new(HwmonSensor::new(k, &v.device, &v.sensor)))
.collect();
debug!("config: {config:?}");
HwmonSource {
name: name.to_owned(),
config,
sensors,
}
}
async fn run(self, chan: ChimemonSourceChannel, cancel: CancellationToken) {
info!("hwmon task started");
let mut interval = tokio::time::interval(self.config.interval);
loop {
select! {
_ = cancel.cancelled() => { return; },
_ = interval.tick() => {
let mut values = Vec::new();
for s in &self.sensors {
debug!("Sensor {s:?}");
match HwmonSource::get_raw_value(s).await {
Ok(sensor_val) => {
debug!(
"hwmon {} raw value {}",
s.value_path.to_string_lossy(),
sensor_val
);
if let Ok(parsed) = sensor_val.trim().parse::<f64>() {
values.push((s.clone(), parsed));
} else {
error!(
"Unable to parse sensor value {sensor_val} at {}",
s.value_path.to_string_lossy()
);
}
},
Err(e) => {
error!("Unable to get hwmon sensor value ({})", e.to_string());
continue
}
}
}
let report = SourceReport {
name: self.name.clone(),
status: SourceStatus::Healthy,
details: Arc::new(HwmonReport { values }),
};
info!("Writing hwmon data");
match chan.send(report.into()) {
Ok(_) => {}
Err(e) => {
warn!("Unable to send to message channel ({e})")
}
}
}
}
}
}
}

5
src/sources/mod.rs Normal file
View File

@@ -0,0 +1,5 @@
pub mod chrony;
pub mod gpsd;
pub mod hwmon;
pub mod prs10;
pub mod uccm;

601
src/sources/prs10.rs Normal file
View File

@@ -0,0 +1,601 @@
use std::any::type_name;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use bitflags::bitflags;
use itertools::Itertools;
use tokio::io::{
AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, BufWriter, ReadHalf, WriteHalf,
};
use tokio::select;
use tokio::sync::OnceCell;
use tokio::time::{interval, timeout};
use tokio_serial::{SerialPort, SerialStream};
use tokio_util::sync::CancellationToken;
use tracing::{debug, debug_span, error, info, instrument, warn};
use crate::SourceMetricSet;
use crate::{
ChimemonMessage, ChimemonSource, ChimemonSourceChannel, MetricTags, SourceMetric, SourceReport,
SourceReportDetails, SourceStatus, config::Prs10Config, fatal,
};
#[derive(Debug)]
pub struct Prs10Info {
pub model: String,
pub version: String,
pub serial: String,
}
impl TryFrom<&[u8]> for Prs10Info {
type Error = Box<dyn std::error::Error>;
fn try_from(value: &[u8]) -> Result<Self, Self::Error> {
let parts = value.splitn(3, |c| *c == b'_');
let (model, version, serial) = parts
.collect_tuple()
.ok_or("Not enough parts in ID response")?;
Ok(Self {
model: str::from_utf8(model)?.to_string(),
version: str::from_utf8(version)?.to_string(),
serial: str::from_utf8(serial)?.to_string(),
})
}
}
bitflags! {
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub struct Prs10PowerLampFlags: u8 {
const ELEC_VOLTAGE_LOW = (1<<0);
const ELEC_VOLTAGE_HIGH = (1<<1);
const HEAT_VOLTAGE_LOW = (1<<2);
const HEAT_VOLTAGE_HIGH = (1<<3);
const LAMP_LIGHT_LOW = (1<<4);
const LAMP_LIGHT_HIGH = (1<<5);
const GATE_VOLTAGE_LOW = (1<<6);
const GATE_VOLTAGE_HIGH = (1<<7);
}
}
impl Prs10PowerLampFlags {
pub fn get_metrics(&self, tags: Arc<MetricTags>) -> Vec<SourceMetric> {
// Define the mapping statically
const FLAG_LABELS: [(&Prs10PowerLampFlags, &str); 8] = [
(&Prs10PowerLampFlags::ELEC_VOLTAGE_LOW, "elec_voltage_low"),
(&Prs10PowerLampFlags::ELEC_VOLTAGE_HIGH, "elec_voltage_high"),
(&Prs10PowerLampFlags::HEAT_VOLTAGE_LOW, "heat_voltage_low"),
(&Prs10PowerLampFlags::HEAT_VOLTAGE_HIGH, "heat_voltage_high"),
(&Prs10PowerLampFlags::LAMP_LIGHT_LOW, "lamp_light_low"),
(&Prs10PowerLampFlags::LAMP_LIGHT_HIGH, "lamp_light_high"),
(&Prs10PowerLampFlags::GATE_VOLTAGE_LOW, "gate_voltage_low"),
(&Prs10PowerLampFlags::GATE_VOLTAGE_HIGH, "gate_voltage_high"),
];
// Generate metrics based on flag availability
FLAG_LABELS
.iter()
.map(|(flag, label)| {
// We track whether each flag is set (true) or not (false)
SourceMetric::new_bool(*label, self.contains(**flag))
})
.collect()
}
}
bitflags! {
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub struct Prs10RfFlags: u8 {
const PLL_UNLOCK = (1<<0);
const XTAL_VAR_LOW = (1<<1);
const XTAL_VAR_HIGH = (1<<2);
const VCO_CTRL_LOW = (1<<3);
const VCO_CTRL_HIGH = (1<<4);
const AGC_CTRL_LOW = (1<<5);
const AGC_CTRL_HIGH = (1<<6);
const PLL_BAD_PARAM = (1<<7);
}
}
bitflags! {
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub struct Prs10TempFlags: u8 {
const LAMP_TEMP_LOW = (1<<0);
const LAMP_TEMP_HIGH = (1<<1);
const XTAL_TEMP_LOW = (1<<2);
const XTAL_TEMP_HIGH = (1<<3);
const CELL_TEMP_LOW = (1<<4);
const CELL_TEMP_HIGH = (1<<5);
const CASE_TEMP_LOW = (1<<6);
const CASE_TEMP_HIGH = (1<<7);
}
}
bitflags! {
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub struct Prs10FllFlags: u8 {
const FLL_OFF = (1<<0);
const FLL_DISABLED = (1<<1);
const EFC_HIGH = (1<<2);
const EFC_LOW = (1<<3);
const CAL_VOLTAGE_HIGH = (1<<4);
const CAL_VOLTAGE_LOW = (1<<5);
const _ = !0;
}
}
bitflags! {
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub struct Prs10PpsFlags: u8 {
const PLL_DISABLED = (1<<0);
const PPS_WARMUP = (1<<1);
const PLL_ACTIVE = (1<<2);
const PPS_BAD = (1<<3);
const PPS_INTERVAL_LONG = (1<<4);
const PLL_RESTART = (1<<5);
const PLL_SATURATED = (1<<6);
const PPS_MISSING = (1<<7);
}
}
bitflags! {
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub struct Prs10SystemFlags: u8 {
const LAMP_RESTART = (1<<0);
const WDT_RESET = (1<<1);
const BAD_INT_VECTOR = (1<<2);
const EEPROM_WRITE_FAIL = (1<<3);
const EEPROM_CORRUPT = (1<<4);
const BAD_COMMAND = (1<<5);
const BAD_COMMAND_PARAM = (1<<6);
const SYSTEM_RESET = (1<<7);
}
}
#[derive(Debug, Eq, PartialEq, Clone)]
pub struct Prs10Status {
pub volt_lamp_flags: Prs10PowerLampFlags,
pub rf_flags: Prs10RfFlags,
pub temp_flags: Prs10TempFlags,
pub fll_flags: Prs10FllFlags,
pub pps_flags: Prs10PpsFlags,
pub system_flags: Prs10SystemFlags,
}
impl Default for Prs10Status {
fn default() -> Self {
Self {
volt_lamp_flags: Prs10PowerLampFlags::empty(),
rf_flags: Prs10RfFlags::empty(),
temp_flags: Prs10TempFlags::empty(),
fll_flags: Prs10FllFlags::empty(),
pps_flags: Prs10PpsFlags::empty(),
system_flags: Prs10SystemFlags::empty(),
}
}
}
impl SourceReportDetails for Prs10Status {
fn is_healthy(&self) -> bool {
const HEALTHY_PPS: Prs10PpsFlags = Prs10PpsFlags::from_bits(4).unwrap();
self.volt_lamp_flags.is_empty()
&& self.rf_flags.is_empty()
&& self.temp_flags.is_empty()
&& self.fll_flags.is_empty()
&& self.pps_flags == HEALTHY_PPS
}
fn to_metrics(&self) -> Vec<SourceMetricSet> {
let tags = Arc::new(vec![]);
vec![SourceMetricSet {
tags,
metrics: vec![
SourceMetric::new_int("volt_lamp_flags", self.volt_lamp_flags.bits() as i64),
SourceMetric::new_int("rf_flags", self.rf_flags.bits() as i64),
SourceMetric::new_int("temp_flags", self.temp_flags.bits() as i64),
SourceMetric::new_int("fll_flags", self.fll_flags.bits() as i64),
SourceMetric::new_int("pps_flags", self.pps_flags.bits() as i64),
// system flags are kind of useless because we can't guarantee they get upstreamed and will only appear once since they are 'event flags'
],
}]
}
}
impl TryFrom<&[u8]> for Prs10Status {
type Error = Box<dyn std::error::Error>;
fn try_from(value: &[u8]) -> Result<Self, Self::Error> {
let (volt_lamp_flags, rf_flags, temp_flags, fll_flags, pps_flags, system_flags) = value
.splitn(6, |c| *c == b',')
.map(|s| str::from_utf8(s).unwrap().parse::<u8>())
.collect_tuple()
.ok_or("Not enough parts in ST reply")?;
let volt_lamp_flags = volt_lamp_flags?;
let rf_flags = rf_flags?;
let temp_flags = temp_flags?;
let fll_flags = fll_flags?;
let pps_flags = pps_flags?;
let system_flags = system_flags?;
Ok(Self {
volt_lamp_flags: Prs10PowerLampFlags::from_bits(volt_lamp_flags).ok_or_else(|| {
format!("Invalid bits set ({volt_lamp_flags}) for power/lamp flags")
})?,
rf_flags: Prs10RfFlags::from_bits(rf_flags)
.ok_or_else(|| format!("Invalid bits set ({rf_flags}) for RF flags"))?,
temp_flags: Prs10TempFlags::from_bits(temp_flags)
.ok_or_else(|| format!("Invalid bits set ({temp_flags}) for temp flags"))?,
fll_flags: Prs10FllFlags::from_bits(fll_flags)
.ok_or_else(|| format!("Invalid bits set ({fll_flags}) for FLL flags"))?,
pps_flags: Prs10PpsFlags::from_bits(pps_flags)
.ok_or_else(|| format!("Invalid bits set ({pps_flags}) for PPS flags"))?,
system_flags: Prs10SystemFlags::from_bits(system_flags)
.ok_or_else(|| format!("Invalid bits set ({system_flags}) for system flags"))?,
})
}
}
#[derive(Debug, Clone)]
pub struct Prs10Stats {
pub ocxo_efc: u32,
pub error_signal_volts: f64,
pub detect_signal_volts: f64,
pub freq_offset_ppt: i16,
pub mag_efc: u16,
pub heat_volts: f64,
pub elec_volts: f64,
pub lamp_fet_drain_volts: f64,
pub lamp_fet_gate_volts: f64,
pub ocxo_heat_volts: f64,
pub cell_heat_volts: f64,
pub lamp_heat_volts: f64,
pub rb_photo: f64,
pub rb_photo_iv: f64,
pub case_temp: f64,
pub ocxo_therm: f64,
pub cell_therm: f64,
pub lamp_therm: f64,
pub ext_cal_volts: f64,
pub analog_gnd_volts: f64,
pub if_vco_varactor_volts: f64,
pub op_vco_varactor_volts: f64,
pub mul_amp_gain_volts: f64,
pub rf_lock_volts: f64,
}
impl SourceReportDetails for Prs10Stats {
fn is_healthy(&self) -> bool {
true
}
fn to_metrics(&self) -> Vec<SourceMetricSet> {
let tags = Arc::new(vec![]);
vec![SourceMetricSet {
tags,
metrics: vec![
// Integer Metrics
SourceMetric::new_int("ocxo_efc", self.ocxo_efc as i64),
// Float Metrics
SourceMetric::new_float("error_signal_volts", self.error_signal_volts),
SourceMetric::new_float("detect_signal_volts", self.detect_signal_volts),
SourceMetric::new_float("heat_volts", self.heat_volts),
SourceMetric::new_float("elec_volts", self.elec_volts),
SourceMetric::new_float("lamp_fet_drain_volts", self.lamp_fet_drain_volts),
SourceMetric::new_float("lamp_fet_gate_volts", self.lamp_fet_gate_volts),
SourceMetric::new_float("ocxo_heat_volts", self.ocxo_heat_volts),
SourceMetric::new_float("cell_heat_volts", self.cell_heat_volts),
SourceMetric::new_float("lamp_heat_volts", self.lamp_heat_volts),
SourceMetric::new_float("rb_photo", self.rb_photo),
SourceMetric::new_float("rb_photo_iv", self.rb_photo_iv),
SourceMetric::new_float("case_temp", self.case_temp),
SourceMetric::new_float("ocxo_therm", self.ocxo_therm),
SourceMetric::new_float("cell_therm", self.cell_therm),
SourceMetric::new_float("lamp_therm", self.lamp_therm),
SourceMetric::new_float("ext_cal_volts", self.ext_cal_volts),
SourceMetric::new_float("analog_gnd_volts", self.analog_gnd_volts),
SourceMetric::new_float("if_vco_varactor_volts", self.if_vco_varactor_volts),
SourceMetric::new_float("op_vco_varactor_volts", self.op_vco_varactor_volts),
SourceMetric::new_float("mul_amp_gain_volts", self.mul_amp_gain_volts),
SourceMetric::new_float("rf_lock_volts", self.rf_lock_volts),
// U16 Metrics (optional, but can be treated as integers)
SourceMetric::new_int("freq_offset_ppt", self.freq_offset_ppt as i64),
SourceMetric::new_int("mag_efc", self.mag_efc as i64),
],
}]
}
}
#[derive(Debug)]
pub struct Prs10Monitor {
name: String,
config: Prs10Config,
rx: ReadHalf<SerialStream>,
tx: BufWriter<WriteHalf<SerialStream>>,
info: OnceCell<Prs10Info>,
}
impl Prs10Monitor {
pub fn info(&self) -> &Prs10Info {
self.info.get().expect("info() used before run()")
}
#[instrument(level = "debug", skip_all, fields(cmd = String::from_utf8_lossy(cmd).to_string()))]
pub async fn cmd_response(&mut self, cmd: &[u8]) -> Result<Vec<u8>, std::io::Error> {
self.tx.write_all(cmd).await?;
self.tx.write_u8(b'\r').await?;
self.tx.flush().await?;
let mut reader = BufReader::new(&mut self.rx);
let mut buf = Vec::new();
let read = timeout(self.config.timeout, reader.read_until(b'\r', &mut buf)).await??;
buf.truncate(buf.len() - 1); // strip "\r"
debug!(
"raw response: ({read}) `{}`",
str::from_utf8(&buf).unwrap_or("<garbage>")
);
Ok(buf)
}
async fn set_info(&mut self) -> Result<(), Box<dyn std::error::Error>> {
let id = self.get_id().await?;
self.info.set(id)?;
debug!("Set info to {:?}", self.info);
Ok(())
}
pub async fn get_status(&mut self) -> Result<Prs10Status, Box<dyn std::error::Error>> {
debug!("Getting status");
let resp = self.cmd_response(b"ST?").await?;
let status = resp.as_slice().try_into();
debug!("Got: {status:?}");
status
}
pub async fn get_id(&mut self) -> Result<Prs10Info, Box<dyn std::error::Error>> {
debug!("Getting identity");
let resp = self.cmd_response(b"ID?").await?;
let id = resp.as_slice().try_into();
debug!("Got: {id:?}");
id
}
pub async fn get_analog(&mut self, id: u16) -> Result<f64, Box<dyn std::error::Error>> {
let mut cmd = b"AD".to_vec();
cmd.extend_from_slice(id.to_string().as_bytes());
cmd.push(b'?');
let value = self.get_parsed(&cmd).await?;
debug!("Got: {value}");
Ok(value)
}
pub async fn get_parsed<T: FromStr>(
&mut self,
cmd: &[u8],
) -> Result<T, Box<dyn std::error::Error>>
where
T::Err: std::error::Error + 'static,
{
debug!(
"Getting parsed <{}> value for command {}",
type_name::<T>(),
str::from_utf8(cmd).unwrap_or("<garbage>"),
);
let resp = self.cmd_response(cmd).await?;
let val = str::from_utf8(&resp)?.parse::<T>()?;
Ok(val)
}
pub async fn get_ocxo_efc(&mut self) -> Result<u32, Box<dyn std::error::Error>> {
debug!("Getting u16,u16 -> u32 for OCXO EFC value");
let resp = self.cmd_response(b"FC?").await?;
let values = resp
.splitn(2, |c| *c == b',')
.map(|s| str::from_utf8(s).unwrap().parse::<u16>())
.collect_tuple()
.ok_or("Not enough values in response to FC?")?;
if let (Ok(high), Ok(low)) = values {
Ok((high as u32) << 8 | low as u32)
} else {
Err("Unparseable numbers in response to FC?".into())
}
}
pub async fn get_detected_signals(&mut self) -> Result<(f64, f64), Box<dyn std::error::Error>> {
debug!("Getting i16,i16 -> f64,f64 detected signals pair");
let resp = self.cmd_response(b"DS?").await?;
let (error, signal) = resp
.splitn(2, |c| *c == b',')
.map(|s| str::from_utf8(s).unwrap().parse::<i16>())
.collect_tuple()
.ok_or("Not enough values in response to DS?".to_string())?;
Ok((error? as f64 * 0.15e-6, signal? as f64 * 0.001))
}
#[instrument(skip_all)]
async fn status_poll(&mut self) -> Result<ChimemonMessage, Box<dyn std::error::Error>> {
let status = self.get_status().await?;
Ok(SourceReport {
name: self.name.clone(),
status: if status.is_healthy() {
SourceStatus::Healthy
} else {
SourceStatus::Unknown
},
details: Arc::new(status),
}
.into())
}
#[instrument(skip_all)]
async fn stats_poll(&mut self) -> Result<ChimemonMessage, Box<dyn std::error::Error>> {
const ANALOG_SCALING: [f64; 20] = [
0.0, 10.0, 10.0, 10.0, 10.0, 1.0, 1.0, 1.0, 1.0, 4.0, 100.0, 1.0, 1.0, 1.0, 1.0, 1.0,
4.0, 4.0, 4.0, 1.0,
];
let stats_span = debug_span!("get_stats_serial");
let stats_guard = stats_span.enter();
let ocxo_efc = self.get_ocxo_efc().await?;
let (error_signal_volts, detect_signal_volts) = self.get_detected_signals().await?;
let freq_offset_ppt = self.get_parsed(b"SF?").await?;
let mag_efc = self.get_parsed(b"MR?").await?;
let mut analog_values = [0.0; 20];
for i in 1u16..=19 {
analog_values[i as usize] = self.get_analog(i).await? * ANALOG_SCALING[i as usize]
}
drop(stats_guard);
Ok(SourceReport {
name: self.name.clone(),
status: SourceStatus::Unknown,
details: Arc::new(Prs10Stats {
ocxo_efc,
error_signal_volts,
detect_signal_volts,
freq_offset_ppt,
mag_efc,
heat_volts: analog_values[1],
elec_volts: analog_values[2],
lamp_fet_drain_volts: analog_values[3],
lamp_fet_gate_volts: analog_values[4],
ocxo_heat_volts: analog_values[5],
cell_heat_volts: analog_values[6],
lamp_heat_volts: analog_values[7],
rb_photo: analog_values[8],
rb_photo_iv: analog_values[9],
case_temp: analog_values[10],
ocxo_therm: analog_values[11],
cell_therm: analog_values[12],
lamp_therm: analog_values[13],
ext_cal_volts: analog_values[14],
analog_gnd_volts: analog_values[15],
if_vco_varactor_volts: analog_values[16],
op_vco_varactor_volts: analog_values[17],
mul_amp_gain_volts: analog_values[18],
rf_lock_volts: analog_values[19],
}),
}
.into())
}
async fn reset_rx_state(&mut self) -> Result<(), Box<dyn std::error::Error>> {
// flush any pending input and potential responses from the receiver side
self.tx.write_u8(b'\r').await?;
self.tx.flush().await?;
let mut discard = vec![];
loop {
match timeout(Duration::from_millis(100), self.rx.read_buf(&mut discard)).await {
Ok(_) => discard.clear(),
Err(_) => break,
}
}
Ok(())
}
}
#[async_trait]
impl ChimemonSource for Prs10Monitor {
type Config = Prs10Config;
const TASK_NAME: &'static str = "prs10-task";
fn new(name: &str, config: Self::Config) -> Self {
let builder = tokio_serial::new(&config.port, config.baud)
.timeout(config.timeout)
.data_bits(tokio_serial::DataBits::Eight)
.parity(tokio_serial::Parity::None)
.stop_bits(tokio_serial::StopBits::One)
.flow_control(tokio_serial::FlowControl::None);
let mut port = SerialStream::open(&builder).unwrap_or_else(|e| {
fatal!(
"Failed to open serial port `{}` ({})",
config.port,
e.to_string()
)
});
port.set_exclusive(true).unwrap_or_else(|e| {
fatal!(
"Can't lock serial port `{}` ({})",
config.port,
e.to_string()
)
});
info!(
"Opened serial port {}@{}",
port.name().unwrap(),
port.baud_rate().unwrap()
);
let (rx, tx) = tokio::io::split(port);
let tx = BufWriter::new(tx);
Self {
name: name.to_owned(),
config,
rx,
tx,
info: OnceCell::new(),
}
}
async fn run(mut self, chan: ChimemonSourceChannel, cancel: CancellationToken) {
info!("PRS10 task started");
if let Err(e) = self.reset_rx_state().await {
error!(error = ?e, "Error clearing PRS10 RX state");
return;
}
if let Err(e) = self.set_info().await {
error!(error = ?e, "Error starting PRS10");
return;
}
info!(
"Connected to PRS10 model: {} version: {} serial: {}",
self.info().model,
self.info().version,
self.info().serial
);
let mut status_timer = interval(self.config.status_interval);
let mut stats_timer = interval(self.config.stats_interval);
loop {
let msg = select! {
_ = cancel.cancelled() => {
return;
},
_ = status_timer.tick() => {
self.status_poll().await
},
_ = stats_timer.tick() => {
self.stats_poll().await
}
};
match msg {
Ok(msg) => {
if let Err(e) = chan.send(msg) {
error!("Unable to send to channel {e}")
}
}
Err(e) => error!("Error in poll task: {e}"),
}
}
}
}
mod tests {
use crate::sources::prs10::{Prs10Info, Prs10PowerLampFlags, Prs10PpsFlags, Prs10Status};
#[test]
fn test_info_parse() -> Result<(), Box<dyn std::error::Error>> {
const INFO_VECTOR: &[u8] = b"PRS10_3.15_SN_12345";
let info: Prs10Info = INFO_VECTOR.try_into()?;
assert_eq!(info.model, "PRS10");
assert_eq!(info.version, "3.15");
assert_eq!(info.serial, "SN_12345");
Ok(())
}
#[test]
fn test_status_parse() -> Result<(), Box<dyn std::error::Error>> {
//TODO: Add vectors for some more complicated state
const STATUS_VECTOR1: &[u8] = b"0,0,0,0,4,0";
let status: Prs10Status = STATUS_VECTOR1.try_into()?;
let mut expect = Prs10Status::default();
expect.pps_flags.set(Prs10PpsFlags::PLL_ACTIVE, true);
assert_eq!(status, expect);
Ok(())
}
}

View File

@@ -1,22 +1,26 @@
use std::io::{BufRead, Cursor};
use std::str;
use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
use bitflags::bitflags; use bitflags::bitflags;
use byteorder::{BigEndian, ReadBytesExt}; use byteorder::{BigEndian, ReadBytesExt};
use bytes::{Buf, BytesMut}; use bytes::{Buf, BytesMut};
use chimemon::{ChimemonMessage, ChimemonSource, ChimemonSourceChannel, Config, TimeReport};
use chrono::{DateTime, Duration, NaiveDateTime, Utc}; use chrono::{DateTime, Duration, NaiveDateTime, Utc};
use figment::value::Map;
use influxdb2::models::data_point::DataPointBuilder;
use influxdb2::models::DataPoint;
use itertools::Itertools; use itertools::Itertools;
use log::{debug, info, warn};
use std::io::{BufRead, Cursor};
use std::str;
use std::sync::Arc;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, ReadHalf, WriteHalf}; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, ReadHalf, WriteHalf};
use tokio::join; use tokio::select;
use tokio::sync::Mutex; use tokio::sync::Mutex;
use tokio::time::sleep; use tokio::time::sleep;
use tokio_serial::{SerialPort, SerialStream}; use tokio_serial::{SerialPort, SerialStream};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
use crate::{
ChimemonSource, ChimemonSourceChannel, SourceMetric, SourceReport, SourceReportDetails,
SourceStatus, TimeReport, config::UCCMConfig,
};
use crate::{SourceMetricSet, fatal};
pub const GPS_EPOCH: i64 = 315964800; // Doesn't seem possible to have a const DateTime object pub const GPS_EPOCH: i64 = 315964800; // Doesn't seem possible to have a const DateTime object
pub type UccmEndian = BigEndian; pub type UccmEndian = BigEndian;
@@ -29,11 +33,11 @@ pub enum UCCMMonitorParseState {
} }
pub struct UCCMMonitor { pub struct UCCMMonitor {
// pub port: SerialStream, pub name: String,
config: UCCMConfig,
rx: ReadHalf<SerialStream>, rx: ReadHalf<SerialStream>,
tx: WriteHalf<SerialStream>, tx: WriteHalf<SerialStream>,
pub info: Option<UCCMInfo>, pub info: Option<UCCMInfo>,
config: Config,
} }
#[derive(Debug)] #[derive(Debug)]
@@ -43,30 +47,46 @@ pub struct UCCMTODReport {
pub flags: UCCMFlags, pub flags: UCCMFlags,
} }
impl UCCMTODReport { impl SourceReportDetails for UCCMTODReport {
pub fn as_builder(&self, measurement: &String, tags: &Map<String, String>) -> DataPointBuilder { fn is_healthy(&self) -> bool {
let mut builder = self.flags.contains(UCCMFlags::OSC_LOCK)
DataPoint::builder(measurement).timestamp(self.time.timestamp_nanos_opt().unwrap()); && self.flags.contains(UCCMFlags::HAVE_GPS_TIME)
builder = builder.field("leaps", self.leaps as i64); && !self.flags.contains(UCCMFlags::INIT_UNLOCK)
builder = builder.field("osc_lock", self.flags.contains(UCCMFlags::OSC_LOCK)); && !self.flags.contains(UCCMFlags::INIT_NO_SATS)
builder = builder.field("leap_flag", self.flags.contains(UCCMFlags::LEAP_FLAG)); && !self.flags.contains(UCCMFlags::POWER_FAIL)
builder = builder.field("init_unlock", self.flags.contains(UCCMFlags::INIT_UNLOCK)); && !self.flags.contains(UCCMFlags::NO_GPS_SYNC)
builder = builder.field("init_no_sats", self.flags.contains(UCCMFlags::INIT_NO_SATS)); && !self.flags.contains(UCCMFlags::NO_GPS_SYNC2)
builder = builder.field( && !self.flags.contains(UCCMFlags::NO_ANT)
"have_gps_time", && !self.flags.contains(UCCMFlags::GPS_LOS)
self.flags.contains(UCCMFlags::HAVE_GPS_TIME), }
);
builder = builder.field("power_fail", self.flags.contains(UCCMFlags::POWER_FAIL));
builder = builder.field("no_gps_sync", self.flags.contains(UCCMFlags::NO_GPS_SYNC));
builder = builder.field("no_gps_sync2", self.flags.contains(UCCMFlags::NO_GPS_SYNC2));
builder = builder.field("ant_fault", self.flags.contains(UCCMFlags::NO_ANT));
builder = builder.field("gps_los", self.flags.contains(UCCMFlags::GPS_LOS));
builder = tags fn to_metrics(&self) -> Vec<SourceMetricSet> {
.iter() let tags = Arc::new(vec![]);
.fold(builder, |builder, (k, v)| builder.tag(k, v)); vec![SourceMetricSet {
tags,
builder metrics: vec![
SourceMetric::new_int("leaps", self.leaps as i64),
SourceMetric::new_bool("osc_lock", self.flags.contains(UCCMFlags::OSC_LOCK)),
SourceMetric::new_bool("leap_flag", self.flags.contains(UCCMFlags::LEAP_FLAG)),
SourceMetric::new_bool("init_unlock", self.flags.contains(UCCMFlags::INIT_UNLOCK)),
SourceMetric::new_bool(
"init_no_sats",
self.flags.contains(UCCMFlags::INIT_NO_SATS),
),
SourceMetric::new_bool(
"have_gps_time",
self.flags.contains(UCCMFlags::HAVE_GPS_TIME),
),
SourceMetric::new_bool("power_fail", self.flags.contains(UCCMFlags::POWER_FAIL)),
SourceMetric::new_bool("no_gps_sync", self.flags.contains(UCCMFlags::NO_GPS_SYNC)),
SourceMetric::new_bool(
"no_gps_sync2",
self.flags.contains(UCCMFlags::NO_GPS_SYNC2),
),
SourceMetric::new_bool("ant_fault", self.flags.contains(UCCMFlags::NO_ANT)),
SourceMetric::new_bool("gps_los", self.flags.contains(UCCMFlags::GPS_LOS)),
],
}]
} }
} }
@@ -75,14 +95,16 @@ pub struct UCCMLoopDiagReport {
pub ocxo: f32, pub ocxo: f32,
} }
impl UCCMLoopDiagReport { impl SourceReportDetails for UCCMLoopDiagReport {
pub fn as_builder(&self, measurement: &String, tags: &Map<String, String>) -> DataPointBuilder { fn is_healthy(&self) -> bool {
let mut builder = DataPoint::builder(measurement); true
builder = builder.field("ocxo_offset", self.ocxo as f64); }
builder = tags fn to_metrics(&self) -> Vec<SourceMetricSet> {
.iter() let tags = Arc::new(vec![]);
.fold(builder, |builder, (k, v)| builder.tag(k, v)); vec![SourceMetricSet {
builder tags,
metrics: vec![SourceMetric::new_float("ocxo_offset", self.ocxo as f64)],
}]
} }
} }
@@ -92,16 +114,9 @@ pub struct UCCMGpsSvTracking {
pub cno: u8, pub cno: u8,
} }
impl UCCMGpsSvTracking { impl From<&UCCMGpsSvTracking> for SourceMetric {
fn as_builder(&self, measurement: &String, tags: &Map<String, String>) -> DataPointBuilder { fn from(value: &UCCMGpsSvTracking) -> Self {
let mut builder = DataPoint::builder(measurement) SourceMetric::new_int("sv_cno", value.cno as i64)
.field("sv_cno", self.cno as i64)
.tag("sv_id", self.sv.to_string());
builder = tags
.iter()
.fold(builder, |builder, (k, v)| builder.tag(k, v));
builder
} }
} }
@@ -110,17 +125,20 @@ pub struct UCCMGPSSatsReport {
pub tracked_svs: Vec<UCCMGpsSvTracking>, pub tracked_svs: Vec<UCCMGpsSvTracking>,
} }
impl UCCMGPSSatsReport { impl SourceReportDetails for UCCMGPSSatsReport {
pub fn build(&self, measurement: &String, tags: &Map<String, String>) -> Vec<DataPoint> { fn is_healthy(&self) -> bool {
self.tracked_svs self.tracked_svs.len() >= 4
.iter() }
.map(|sv| sv.as_builder(measurement, tags)) fn to_metrics(&self) -> Vec<SourceMetricSet> {
.map(|b| b.build().unwrap()) vec![SourceMetricSet {
.collect() tags: Arc::new(vec![]),
metrics: self.tracked_svs.iter().map(|sv| sv.into()).collect(),
}]
} }
} }
bitflags! { bitflags! {
#[derive(Debug)]
pub struct UCCMFlags: u32 { pub struct UCCMFlags: u32 {
const OSC_LOCK = (1<<29); const OSC_LOCK = (1<<29);
const LEAP_FLAG = (1<<25); const LEAP_FLAG = (1<<25);
@@ -194,6 +212,30 @@ pub struct UCCMStatusReport {
pub freq_error: f32, pub freq_error: f32,
} }
impl SourceReportDetails for UCCMStatusReport {
fn is_healthy(&self) -> bool {
self.gps_pps_valid
}
fn to_metrics(&self) -> Vec<SourceMetricSet> {
let tags = Arc::new(vec![]);
vec![SourceMetricSet {
tags,
metrics: vec![
SourceMetric::new_int("tfom", self.tfom as i64),
SourceMetric::new_int("ffom", self.ffom as i64),
SourceMetric::new_float("gps_phase", self.gps_phase as f64),
// TODO: sv info
// TOOD: timestamp
SourceMetric::new_float("ant_voltage", self.ant_voltage as f64),
SourceMetric::new_float("ant_current", self.ant_current as f64),
SourceMetric::new_float("temp", self.temp as f64),
SourceMetric::new_int("efc_dac", self.efc_dac as i64),
SourceMetric::new_float("freq_error", self.freq_error as f64),
],
}]
}
}
pub struct UCCMInfo { pub struct UCCMInfo {
pub vendor: String, pub vendor: String,
pub model: String, pub model: String,
@@ -257,7 +299,8 @@ impl TryFrom<&str> for UCCMLoopDiagReport {
"No lines!", "No lines!",
))??; ))??;
let ocxo_val = ocxo_line let ocxo_val = ocxo_line
.split(':').nth(1) .split(':')
.nth(1)
.ok_or(std::io::Error::new( .ok_or(std::io::Error::new(
std::io::ErrorKind::InvalidData, std::io::ErrorKind::InvalidData,
"no colon!", "no colon!",
@@ -280,7 +323,10 @@ impl TryFrom<&str> for UCCMGPSSatsReport {
"Invalid response (expected `NSATS CNOS`)", "Invalid response (expected `NSATS CNOS`)",
))?; ))?;
let nsats = nsats.parse::<u8>().map_err(|e| { let nsats = nsats.parse::<u8>().map_err(|e| {
std::io::Error::new(std::io::ErrorKind::InvalidData, format!("Invalid number of sats ({e})")) std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("Invalid number of sats ({e})"),
)
})?; })?;
let tracked_svs = cnos let tracked_svs = cnos
.split(',') .split(',')
@@ -299,30 +345,6 @@ impl TryFrom<&str> for UCCMGPSSatsReport {
} }
impl UCCMMonitor { impl UCCMMonitor {
pub fn new(config: Config) -> Self {
let builder = tokio_serial::new(&config.sources.uccm.port, config.sources.uccm.baud)
.timeout(config.sources.uccm.timeout)
.data_bits(tokio_serial::DataBits::Eight)
.parity(tokio_serial::Parity::None)
.stop_bits(tokio_serial::StopBits::One)
.flow_control(tokio_serial::FlowControl::None);
let mut port = SerialStream::open(&builder).expect("Must be able to open serial port");
port.set_exclusive(true).expect("Can't lock serial port");
info!(
"Opened serial port {}@{}",
port.name().unwrap(),
port.baud_rate().unwrap()
);
let (rx, tx) = tokio::io::split(port);
UCCMMonitor {
// port,
rx,
tx,
info: None,
config,
}
}
pub async fn send_cmd(&mut self, cmd: &[u8]) -> Result<String, std::io::Error> { pub async fn send_cmd(&mut self, cmd: &[u8]) -> Result<String, std::io::Error> {
debug!("cmd: `{:?}`", String::from_utf8_lossy(cmd)); debug!("cmd: `{:?}`", String::from_utf8_lossy(cmd));
self.tx.write_all(cmd).await.unwrap(); self.tx.write_all(cmd).await.unwrap();
@@ -378,10 +400,10 @@ impl UCCMMonitor {
state: Arc<Mutex<UCCMMonitorParseState>>, state: Arc<Mutex<UCCMMonitorParseState>>,
) { ) {
let mut rdbuf = BytesMut::with_capacity(1024); let mut rdbuf = BytesMut::with_capacity(1024);
let mut last_loop_diag: Option<UCCMLoopDiagReport> = None; let mut last_loop_diag: Option<Arc<UCCMLoopDiagReport>> = None;
let mut last_gps_sats: Option<UCCMGPSSatsReport> = None; let mut last_gps_sats: Option<Arc<UCCMGPSSatsReport>> = None;
let mut last_sent_report = Utc::now() - self.config.sources.uccm.status_interval; let mut last_sent_report = Utc::now() - self.config.status_interval;
loop { loop {
match tokio::io::AsyncReadExt::read_buf(&mut self.rx, &mut rdbuf).await { match tokio::io::AsyncReadExt::read_buf(&mut self.rx, &mut rdbuf).await {
@@ -416,45 +438,45 @@ impl UCCMMonitor {
&& tod && tod
.flags .flags
.contains(UCCMFlags::OSC_LOCK | UCCMFlags::HAVE_GPS_TIME); .contains(UCCMFlags::OSC_LOCK | UCCMFlags::HAVE_GPS_TIME);
chan.send(ChimemonMessage::TimeReport(TimeReport { chan.send(
system_time: sysnow, TimeReport {
offset, system_time: sysnow,
leaps: tod.leaps as isize, offset,
leap_flag: tod.flags.contains(UCCMFlags::LEAP_FLAG), leaps: tod.leaps as isize,
valid, leap_flag: tod.flags.contains(UCCMFlags::LEAP_FLAG),
})) valid,
}
.into(),
)
.expect("Unable to send to channel"); .expect("Unable to send to channel");
if sysnow - last_sent_report if sysnow - last_sent_report
>= Duration::from_std(self.config.sources.uccm.status_interval) >= Duration::from_std(self.config.status_interval).unwrap()
.unwrap()
{ {
let mut points = vec![tod
.as_builder(
&self.config.sources.uccm.measurement,
&self.config.influxdb.tags,
)
.build()
.unwrap()];
if let Some(loop_diag) = &last_loop_diag { if let Some(loop_diag) = &last_loop_diag {
points.push( if let Err(e) = chan.send(
loop_diag SourceReport {
.as_builder( name: "uccm".to_owned(),
&self.config.sources.uccm.measurement, status: SourceStatus::Unknown,
&self.config.influxdb.tags, details: loop_diag.clone(),
) }
.build() .into(),
.unwrap(), ) {
) error!(error = ?e, "Unable to send message to channel");
}
} }
if let Some(gps_sats) = &last_gps_sats { if let Some(gps_sats) = &last_gps_sats {
points.extend(gps_sats.build( if let Err(e) = chan.send(
&self.config.sources.uccm.measurement, SourceReport {
&self.config.influxdb.tags, name: "uccm".to_owned(),
)); status: SourceStatus::Unknown,
details: gps_sats.clone(),
}
.into(),
) {
error!(error = ?e, "Unable to send message to channel");
}
} }
chan.send(ChimemonMessage::DataPoints(points))
.expect("Unable to send to channel");
last_sent_report = sysnow; last_sent_report = sysnow;
} }
} }
@@ -468,7 +490,7 @@ impl UCCMMonitor {
let loop_report = UCCMLoopDiagReport::try_from(loop_diag_resp.as_str()); let loop_report = UCCMLoopDiagReport::try_from(loop_diag_resp.as_str());
let gps_report = UCCMGPSSatsReport::try_from(gps_sats_resp.as_str()); let gps_report = UCCMGPSSatsReport::try_from(gps_sats_resp.as_str());
if let Ok(loop_report) = loop_report { if let Ok(loop_report) = loop_report {
last_loop_diag = Some(loop_report) last_loop_diag = Some(Arc::new(loop_report))
} else { } else {
warn!( warn!(
"Unable to parse loop diag report `{}`: {}", "Unable to parse loop diag report `{}`: {}",
@@ -477,7 +499,7 @@ impl UCCMMonitor {
); );
} }
if let Ok(gps_report) = gps_report { if let Ok(gps_report) = gps_report {
last_gps_sats = Some(gps_report) last_gps_sats = Some(Arc::new(gps_report))
} else { } else {
warn!( warn!(
"Unable to parse GPS sats report `{}`: {}", "Unable to parse GPS sats report `{}`: {}",
@@ -496,7 +518,38 @@ impl UCCMMonitor {
#[async_trait] #[async_trait]
impl ChimemonSource for UCCMMonitor { impl ChimemonSource for UCCMMonitor {
async fn run(mut self, chan: ChimemonSourceChannel) { type Config = UCCMConfig;
const TASK_NAME: &'static str = "uccm-task";
fn new(name: &str, config: Self::Config) -> Self {
let builder = tokio_serial::new(&config.port, config.baud)
.timeout(config.timeout)
.data_bits(tokio_serial::DataBits::Eight)
.parity(tokio_serial::Parity::None)
.stop_bits(tokio_serial::StopBits::One)
.flow_control(tokio_serial::FlowControl::None);
let mut port = match SerialStream::open(&builder) {
Ok(port) => port,
Err(e) => fatal!(error = ?e, "Error opening port {}", &config.port),
};
if let Err(e) = port.set_exclusive(true) {
fatal!(error= ?e, "Can't lock serial port");
};
info!(
"Opened serial port {}@{}",
port.name().unwrap(),
port.baud_rate().unwrap()
);
let (rx, tx) = tokio::io::split(port);
UCCMMonitor {
name: name.to_owned(),
config,
rx,
tx,
info: None,
}
}
async fn run(mut self, chan: ChimemonSourceChannel, cancel: CancellationToken) {
info!("UCCM task starting"); info!("UCCM task starting");
if self.get_info().await.is_err() { if self.get_info().await.is_err() {
warn!("Error starting UCCM"); warn!("Error starting UCCM");
@@ -522,7 +575,10 @@ impl ChimemonSource for UCCMMonitor {
// wfut.await; // wfut.await;
// } // }
// }); // });
//
join!(rx_handle).0.unwrap(); select! {
_ = cancel.cancelled() => { return },
_ = rx_handle => { return }
};
} }
} }

View File

@@ -0,0 +1,88 @@
use std::mem;
use std::os::unix::net::UnixDatagram;
use std::path::PathBuf;
use async_trait::async_trait;
use libc::{c_double, c_int, timeval};
use tokio::select;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, warn};
use crate::{ChimemonMessage, ChimemonTarget, ChimemonTargetChannel, config::ChronySockConfig};
const CHRONY_MAGIC: c_int = 0x534f434b;
pub struct ChronySockServer {
name: String,
sock_path: PathBuf,
}
#[repr(C)]
#[derive(Debug)]
pub struct ChronyTimeReport {
tv: timeval,
offset: c_double,
pulse: c_int,
leap: c_int,
_pad: c_int,
magic: c_int,
}
impl ChronySockServer {}
#[async_trait]
impl ChimemonTarget for ChronySockServer {
type Config = ChronySockConfig;
const TASK_NAME: &'static str = "chrony-refclock-task";
fn new(name: &str, config: ChronySockConfig) -> Self {
ChronySockServer {
name: name.to_owned(),
sock_path: config.sock.into(),
}
}
async fn run(mut self, mut chan: ChimemonTargetChannel, cancel: CancellationToken) {
info!("Chrony refclock task started");
loop {
select! {
_ = cancel.cancelled() => { return }
msg = chan.recv() => {
match msg {
Ok(ChimemonMessage::TimeReport(tr)) => {
debug!(tr = ?tr, "Got timereport");
if tr.valid {
{
let frame = ChronyTimeReport {
tv: timeval {
tv_sec: TryInto::<libc::time_t>::try_into(
tr.system_time.timestamp(),
)
.unwrap(),
tv_usec: tr.system_time.timestamp_subsec_micros()
as libc::suseconds_t,
},
offset: tr.offset.num_nanoseconds().unwrap() as f64 / 1e9,
leap: if tr.leap_flag { 1 } else { 0 },
pulse: 0,
_pad: 0,
magic: CHRONY_MAGIC,
};
let bs = unsafe {
std::slice::from_raw_parts(
(&frame as *const ChronyTimeReport) as *const u8,
mem::size_of::<ChronyTimeReport>(),
)
};
debug!("Sending to chrony sock {:#?}", frame);
let sock = UnixDatagram::unbound().unwrap();
sock.send_to(bs, &self.sock_path).unwrap();
}
}
},
Err(e) => warn!("Error receiving from channel: {}", e.to_string()),
_ => continue,
}
}
}
}
}
}

110
src/targets/influx.rs Normal file
View File

@@ -0,0 +1,110 @@
use async_trait::async_trait;
use futures::stream;
use influxdb2::{
Client,
models::{DataPoint, FieldValue},
};
use tokio::{select, sync::broadcast, time::timeout};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, instrument};
use crate::{
ChimemonMessage, ChimemonTarget, ChimemonTargetChannel, MetricValue, SourceReport,
config::InfluxConfig, fatal,
};
pub struct InfluxTarget {
name: String,
config: InfluxConfig,
influx: Client,
}
impl From<MetricValue> for FieldValue {
fn from(value: MetricValue) -> Self {
match value {
MetricValue::Bool(b) => FieldValue::Bool(b),
MetricValue::Float(f) => FieldValue::F64(f),
MetricValue::Int(i) => FieldValue::I64(i),
}
}
}
#[async_trait]
impl ChimemonTarget for InfluxTarget {
type Config = InfluxConfig;
const TASK_NAME: &'static str = "influx-task";
fn new(name: &str, config: Self::Config) -> Self {
let influx = Client::new(&config.url, &config.org, &config.token);
Self {
name: name.to_owned(),
config: config,
influx,
}
}
async fn run(self, mut chan: ChimemonTargetChannel, cancel: CancellationToken) {
info!("Influx task started");
loop {
let msg = select! {
_ = cancel.cancelled() => { return },
msg = chan.recv() => msg
};
debug!(msg = ?msg, "Got msg");
let msg = match msg {
Ok(msg) => msg,
Err(broadcast::error::RecvError::Closed) => {
fatal!("Permanent channel closed, terminating")
}
Err(broadcast::error::RecvError::Lagged(_)) => {
error!("Channel lagged");
continue;
}
};
if let Err(e) = self.handle_msg(&msg).await {
error!(error = ?e, msg=?&msg, "Error handling message");
}
}
}
}
impl InfluxTarget {
#[instrument(skip_all)]
async fn handle_source_report(
&self,
sr: &SourceReport,
) -> Result<(), Box<dyn std::error::Error>> {
debug!("Handling source report {}", sr.name);
let mut dps = Vec::new();
for metric_set in &sr.details.to_metrics() {
let mut builder = DataPoint::builder(&sr.name);
builder = self
.config
.tags
.iter()
.fold(builder, |builder, (k, v)| builder.tag(k, v));
builder = metric_set
.tags
.iter()
.fold(builder, |builder, (k, v)| builder.tag(*k, v));
builder = metric_set.metrics.iter().fold(builder, |builder, metric| {
builder.field(metric.name, metric.value)
});
dps.push(builder.build()?);
}
debug!("Sending {} datapoints to influx", dps.len());
timeout(
self.config.timeout,
self.influx.write(&self.config.bucket, stream::iter(dps)),
)
.await??;
debug!("All datapoints sent");
Ok(())
}
async fn handle_msg(&self, msg: &ChimemonMessage) -> Result<(), Box<dyn std::error::Error>> {
debug!(msg = ?msg, "Handling msg");
match msg {
ChimemonMessage::TimeReport(_tr) => Ok(()),
ChimemonMessage::SourceReport(sr) => self.handle_source_report(sr).await,
}
}
}

2
src/targets/mod.rs Normal file
View File

@@ -0,0 +1,2 @@
pub mod chrony_refclock;
pub mod influx;