Report TODs to influx

This commit is contained in:
Keenan Tims 2023-09-14 22:18:11 -07:00
parent 7c782e5800
commit 41284649c9
Signed by: ktims
GPG Key ID: 11230674D69038D4
6 changed files with 787 additions and 570 deletions

1235
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -24,6 +24,7 @@ tokio-serial = "5.4.4"
bytes = "1.2.1"
chrono = "0.4.23"
libc = "0.2.137"
reqwest = { version = "0.11.13", features = ["rustls-tls"], default-features = false }
[dependencies.chrony-candm]
git = "https://github.com/aws/chrony-candm"

View File

@ -18,6 +18,13 @@
name = "hwmon0"
sensor = "temp1_input"
[sources.uccm]
enabled = true
port = "/dev/ttyUSB0"
status_interval = 10
measurement = "uccm_gpsdo"
[targets]
[targets.chrony]
enabled = true

View File

@ -116,6 +116,7 @@ pub struct UCCMConfig {
pub baud: u32,
pub status_interval: std::time::Duration,
pub timeout: std::time::Duration,
pub measurement: String,
}
impl Default for UCCMConfig {
@ -126,6 +127,7 @@ impl Default for UCCMConfig {
baud: 57600,
status_interval: std::time::Duration::from_secs(10),
timeout: std::time::Duration::from_secs(1),
measurement: "uccm_gpsdo".into(),
}
}
}

View File

@ -3,6 +3,7 @@ mod chrony_refclock;
mod hwmon;
mod uccm;
use chrono::NaiveDateTime;
use clap::{Parser, ValueEnum};
use env_logger::{self, Env};
use futures::{future::join_all, prelude::*};
@ -46,7 +47,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
info!("{} v{} starting...", PROGRAM_NAME, VERSION);
let fig = load_config(Path::new(&args.config_file));
warn!("{:?}", fig);
debug!("{:?}", fig);
let config: Config = fig.extract()?;
let mut tasks = Vec::new();
@ -88,8 +89,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
};
let uccm = if config.sources.uccm.enabled {
info!("Spawning UCCMMonitor");
Some(UCCMMonitor::new(config.to_owned()))
} else {
info!("UCCMMonitor not configured");
None
};
match uccm {
@ -114,25 +117,29 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut influxrx = sourcechan.subscribe();
tasks.push(tokio::spawn(async move {
loop {
let msg = influxrx.recv().await.unwrap();
match msg {
ChimemonMessage::DataPoint(dp) => {
debug!("Writing datapoint to influx: {:?}", dp);
influx
.write(&config.influxdb.bucket, stream::iter([dp]))
.await
.expect("Error writing to influxdb");
}
ChimemonMessage::DataPoints(dps) => {
debug!("Writing datapoints to influx: {:?}", dps);
influx
.write(&config.influxdb.bucket, stream::iter(dps))
.await
.expect("Error writing to influxdb");
}
ChimemonMessage::TimeReport(tr) => {
debug!("GPS TOD: {:?}", tr);
}
match influxrx.recv().await {
Ok(msg) => match msg {
ChimemonMessage::DataPoint(dp) => {
debug!("Writing datapoint to influx: {:?}", dp);
influx
.write(&config.influxdb.bucket, stream::iter([dp]))
.await
.unwrap_or_else(|e| error!("Error writing to influxdb {:?}", e));
}
ChimemonMessage::DataPoints(dps) => {
debug!("Writing datapoints to influx: {:?}", dps);
influx
.write(&config.influxdb.bucket, stream::iter(dps))
.await
.unwrap_or_else(|e| error!("Error writing to influxdb {:?}", e));
}
ChimemonMessage::TimeReport(tr) => {
debug!("GPS TOD: {:?}", tr);
}
},
Err(e) => error!("Unable to receive from channel: {:?}", e),
}
}
}));

View File

@ -6,10 +6,13 @@ use chimemon::{
ChimemonMessage, ChimemonSource, ChimemonSourceChannel, Config, TimeReport, UCCMConfig,
};
use chrono::{Duration, NaiveDateTime, Utc};
use influxdb2::models::data_point::DataPointBuilder;
use influxdb2::models::DataPoint;
use log::{debug, info, warn};
use std::io::Cursor;
use std::str;
use std::sync::Arc;
use std::time::UNIX_EPOCH;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, ReadHalf, WriteHalf};
use tokio::join;
use tokio::sync::Mutex;
@ -31,7 +34,7 @@ pub struct UCCMMonitor {
rx: ReadHalf<SerialStream>,
tx: WriteHalf<SerialStream>,
pub info: Option<UCCMInfo>,
config: UCCMConfig,
config: Config,
}
#[derive(Debug)]
@ -41,6 +44,28 @@ pub struct UCCMTODReport {
pub flags: UCCMFlags,
}
impl UCCMTODReport {
pub fn as_builder(&self, measurement: &String) -> DataPointBuilder {
let mut builder = DataPoint::builder(measurement).timestamp(self.time.timestamp_nanos());
builder = builder.field("leaps", self.leaps as i64);
builder = builder.field("osc_lock", self.flags.contains(UCCMFlags::OSC_LOCK));
builder = builder.field("leap_flag", self.flags.contains(UCCMFlags::LEAP_FLAG));
builder = builder.field("init_unlock", self.flags.contains(UCCMFlags::INIT_UNLOCK));
builder = builder.field("init_no_sats", self.flags.contains(UCCMFlags::INIT_NO_SATS));
builder = builder.field(
"have_gps_time",
self.flags.contains(UCCMFlags::HAVE_GPS_TIME),
);
builder = builder.field("power_fail", self.flags.contains(UCCMFlags::POWER_FAIL));
builder = builder.field("no_gps_sync", self.flags.contains(UCCMFlags::NO_GPS_SYNC));
builder = builder.field("no_gps_sync2", self.flags.contains(UCCMFlags::NO_GPS_SYNC2));
builder = builder.field("ant_fault", self.flags.contains(UCCMFlags::NO_ANT));
builder = builder.field("gps_los", self.flags.contains(UCCMFlags::GPS_LOS));
builder
}
}
bitflags! {
pub struct UCCMFlags: u32 {
const OSC_LOCK = (1<<29);
@ -128,7 +153,7 @@ impl TryFrom<&[u8]> for UCCMTODReport {
debug!("TOD buffer: `{:#?}`", String::from_utf8(strbuf.to_vec()));
let resp: Vec<u8> = strbuf
.split(|c| *c == ' ' as u8)
.map(|x| u8::from_str_radix(str::from_utf8(x).unwrap(), 16).unwrap())
.map(|x| u8::from_str_radix(str::from_utf8(x).unwrap_or(""), 16).unwrap_or(0))
.collect();
let mut rdr = Cursor::new(resp);
@ -181,19 +206,20 @@ impl UCCMMonitor {
rx,
tx,
info: None,
config: config.sources.uccm,
config: config,
}
}
pub async fn send_cmd(&mut self, cmd: &[u8]) -> Result<String, std::io::Error> {
debug!("cmd: `{:?}`", String::from_utf8_lossy(cmd));
self.tx.write_all(cmd).await;
self.tx.write(&[b'\n']).await;
self.tx.write_all(cmd).await.unwrap();
self.tx.write(&[b'\n']).await.unwrap();
let mut reader = BufReader::new(&mut self.rx);
let mut resp = String::new();
while !resp.contains("UCCM>") {
debug!("'{}' doesn't contain UCCM>", resp);
let mut buf = Vec::new();
reader.read_until(b'>', &mut buf).await;
reader.read_until(b'>', &mut buf).await.unwrap();
resp.push_str(&String::from_utf8_lossy(&buf));
}
@ -238,8 +264,10 @@ async fn rx_loop(
mut rx: ReadHalf<SerialStream>,
chan: ChimemonSourceChannel,
state: Arc<Mutex<UCCMMonitorParseState>>,
config: Config,
) {
let mut rdbuf = BytesMut::with_capacity(1024);
let mut last_sent_report = Utc::now().naive_utc() - config.sources.uccm.status_interval;
loop {
match tokio::io::AsyncReadExt::read_buf(&mut rx, &mut rdbuf).await {
Ok(n) => {
@ -263,7 +291,10 @@ async fn rx_loop(
Ok(tod) => {
let sysnow = Utc::now().naive_utc();
let offset = tod.time - Duration::seconds(tod.leaps as i64) - sysnow;
debug!("System time: {:#?} GPS time: {:#?} Leaps: {:#?}", sysnow, tod.time, tod.leaps);
debug!(
"System time: {:#?} GPS time: {:#?} Leaps: {:#?}",
sysnow, tod.time, tod.leaps
);
debug!("TOD offset: {}ms", offset.num_milliseconds());
info!("{:#?}", tod);
let valid = tod.leaps > 0
@ -282,6 +313,17 @@ async fn rx_loop(
valid,
}))
.expect("Unable to send to channel");
if sysnow - last_sent_report
>= Duration::from_std(config.sources.uccm.status_interval).unwrap()
{
let mut builder = tod.as_builder(&config.sources.uccm.measurement);
for (key, value) in &config.influxdb.tags {
builder = builder.tag(key, value)
}
chan.send(ChimemonMessage::DataPoint(builder.build().unwrap()))
.expect("Unable to send to channel");
last_sent_report = sysnow;
}
}
Err(e) => {
warn!("Unable to parse TOD frame: {}", e);
@ -312,7 +354,12 @@ impl ChimemonSource for UCCMMonitor {
let state = Arc::new(Mutex::<UCCMMonitorParseState>::new(
UCCMMonitorParseState::Idle,
));
let rx_handle = tokio::spawn(rx_loop(self.rx, chan.clone(), state.clone()));
let rx_handle = tokio::spawn(rx_loop(
self.rx,
chan.clone(),
state.clone(),
self.config.clone(),
));
// let tx_handle = tokio::spawn(async move {
// let mut interval = interval(self.config.status_interval);
// loop {
@ -323,6 +370,6 @@ impl ChimemonSource for UCCMMonitor {
// }
// });
join!(rx_handle);
join!(rx_handle).0.unwrap();
}
}