diff --git a/Cargo.lock b/Cargo.lock index cb81a2d..5c6bbc9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -148,23 +148,26 @@ dependencies = [ "bytemuck", ] -[[package]] -name = "atty" -version = "0.2.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" -dependencies = [ - "hermit-abi 0.1.19", - "libc", - "winapi", -] - [[package]] name = "autocfg" version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" +[[package]] +name = "backoff" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b62ddb9cb1ec0a098ad4bbf9344d0713fa193ae1a80af55febcff2627b6a00c1" +dependencies = [ + "futures-core", + "getrandom 0.2.16", + "instant", + "pin-project-lite", + "rand", + "tokio", +] + [[package]] name = "backtrace" version = "0.3.74" @@ -255,25 +258,31 @@ version = "0.2.0" dependencies = [ "async-stream", "async-trait", + "backoff", "bitflags 1.3.2", "byteorder", "bytes", "chrono", "chrony-candm", "clap", - "env_logger", "figment", "futures", "gethostname", + "gpsd_proto", "influxdb2", "itertools 0.14.0", "libc", "log", "serde", "serde_derive", + "serde_json", + "serde_repr", "tokio", "tokio-serial", "tokio-stream", + "tokio-util", + "tracing", + "tracing-subscriber 0.3.22", ] [[package]] @@ -461,19 +470,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "env_logger" -version = "0.9.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a12e6657c4c97ebab115a42dcee77225f7f482cdd841cf7088c657a42e9e00e7" -dependencies = [ - "atty", - "humantime", - "log", - "regex", - "termcolor", -] - [[package]] name = "equivalent" version = "1.0.2" @@ -664,6 +660,18 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "558b88954871f5e5b2af0e62e2e176c8bde7a6c2c4ed41b13d138d96da2e2cbd" +[[package]] +name = "gpsd_proto" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "247df1fffe6bc378377d6cf894dac08aef2194a6c1c9e2f173c3c10979fa5ca5" +dependencies = [ + "log", + "serde", + "serde_derive", + "serde_json", +] + [[package]] name = "h2" version = "0.3.26" @@ -695,15 +703,6 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" -[[package]] -name = "hermit-abi" -version = "0.1.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33" -dependencies = [ - "libc", -] - [[package]] name = "hermit-abi" version = "0.3.9" @@ -750,12 +749,6 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" -[[package]] -name = "humantime" -version = "2.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b112acc8b3adf4b107a8ec20977da0273a8c386765a3ec0229bd500a1443f9f" - [[package]] name = "hyper" version = "0.14.32" @@ -995,7 +988,7 @@ dependencies = [ "snafu", "tempfile", "tracing", - "tracing-subscriber", + "tracing-subscriber 0.2.25", "url", ] @@ -1231,6 +1224,15 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "nu-ansi-term" +version = "0.50.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" +dependencies = [ + "windows-sys 0.59.0", +] + [[package]] name = "num-traits" version = "0.2.19" @@ -1246,7 +1248,7 @@ version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" dependencies = [ - "hermit-abi 0.3.9", + "hermit-abi", "libc", ] @@ -1641,18 +1643,28 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.219" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f0e2c6ed6606019b4e29e69dbaba95b11854410e5347d525002456dbbb786b6" +checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e" +dependencies = [ + "serde_core", + "serde_derive", +] + +[[package]] +name = "serde_core" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.219" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00" +checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" dependencies = [ "proc-macro2", "quote", @@ -1661,14 +1673,15 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.140" +version = "1.0.146" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "20068b6e96dc6c9bd23e01df8827e6c7e1f2fddd43c21810382803c136b99373" +checksum = "217ca874ae0207aac254aa02c957ded05585a90892cc8d87f9e5fa49669dadd8" dependencies = [ "itoa", "memchr", "ryu", "serde", + "serde_core", ] [[package]] @@ -1682,6 +1695,17 @@ dependencies = [ "thiserror 1.0.69", ] +[[package]] +name = "serde_repr" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "175ee3e80ae9982737ca543e96133087cbd9a485eecc3bc4de9c1a37b47ea59c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.101", +] + [[package]] name = "serde_spanned" version = "0.6.8" @@ -1873,15 +1897,6 @@ dependencies = [ "windows-sys 0.59.0", ] -[[package]] -name = "termcolor" -version = "1.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06794f8f6c5c898b3275aebefa6b8a1cb24cd2c6c79397ab15774837a0bc5755" -dependencies = [ - "winapi-util", -] - [[package]] name = "thiserror" version = "1.0.69" @@ -2007,9 +2022,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.15" +version = "0.7.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "66a539a9ad6d5d281510d5bd368c973d636c02dbf8a67300bfb6b950696ad7df" +checksum = "2efa149fe76073d6e8fd97ef4f4eca7b67f599660115591483572e406e165594" dependencies = [ "bytes", "futures-core", @@ -2078,9 +2093,9 @@ checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" [[package]] name = "tracing" -version = "0.1.41" +version = "0.1.44" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" +checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100" dependencies = [ "pin-project-lite", "tracing-attributes", @@ -2089,9 +2104,9 @@ dependencies = [ [[package]] name = "tracing-attributes" -version = "0.1.28" +version = "0.1.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "395ae124c09f9e6918a2310af6038fba074bcf474ac352496d5910dd59a2226d" +checksum = "7490cfa5ec963746568740651ac6781f701c9c5ea257c58e057f3ba8cf69e8da" dependencies = [ "proc-macro2", "quote", @@ -2100,14 +2115,25 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.33" +version = "0.1.36" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e672c95779cf947c5311f83787af4fa8fffd12fb27e4993211a84bdfd9610f9c" +checksum = "db97caf9d906fbde555dd62fa95ddba9eecfd14cb388e4f491a66d74cd5fb79a" dependencies = [ "once_cell", "valuable", ] +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + [[package]] name = "tracing-serde" version = "0.1.3" @@ -2140,6 +2166,20 @@ dependencies = [ "tracing-serde", ] +[[package]] +name = "tracing-subscriber" +version = "0.3.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f30143827ddab0d256fd843b7a66d164e9f271cfa0dde49142c5ca0ca291f1e" +dependencies = [ + "nu-ansi-term", + "sharded-slab", + "smallvec", + "thread_local", + "tracing-core", + "tracing-log", +] + [[package]] name = "try-lock" version = "0.2.5" @@ -2357,15 +2397,6 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" -[[package]] -name = "winapi-util" -version = "0.1.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" -dependencies = [ - "windows-sys 0.59.0", -] - [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" diff --git a/Cargo.toml b/Cargo.toml index 45132ad..352ad7a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,11 @@ [package] name = "chimemon" version = "0.2.0" -edition = "2021" +edition = "2024" + +[features] +default = [] +release-logs = ["tracing/max_level_info"] [dependencies] serde = "1.0" @@ -14,7 +18,6 @@ clap = { version = "4.0", features = ["derive"] } log = "0.4" figment = { version = "0.10", features = ["toml"] } gethostname = "0.3" -env_logger = "0.9.1" futures = "0.3.24" async-trait = "0.1.58" tokio-stream = { version = "0.1.11", features = ["sync"] } @@ -26,6 +29,13 @@ chrono = "0.4.23" libc = "0.2.137" async-stream = "0.3.6" itertools = "0.14.0" +gpsd_proto = { version = "1.0.0" } +tokio-util = { version = "0.7.17", features = ["codec"] } +serde_json = "1.0.146" +backoff = { version = "0.4.0", features = ["tokio"] } +serde_repr = "0.1.20" +tracing = "0.1.44" +tracing-subscriber = { version = "0.3.22", features = ["fmt"] } [dependencies.chrony-candm] git = "https://github.com/aws/chrony-candm" diff --git a/src/chrony.rs b/src/chrony.rs index 8f0b5a2..491f254 100644 --- a/src/chrony.rs +++ b/src/chrony.rs @@ -1,13 +1,17 @@ +use crate::{ + ChimemonSource, ChimemonSourceChannel, Config, SourceMetric, SourceReport, SourceReportDetails, + SourceStatus, +}; use async_trait::async_trait; -use chimemon::{ChimemonSource, ChimemonSourceChannel, Config}; -use chrony_candm::reply::{self, ReplyBody, SourceMode}; +use chrony_candm::reply::{self, ReplyBody, SourceMode, SourceState}; use chrony_candm::request::{self, RequestBody}; -use chrony_candm::{blocking_query, ClientOptions}; +use chrony_candm::{ClientOptions, blocking_query}; use influxdb2::models::DataPoint; -use log::{info, warn}; use std::net::{SocketAddr, ToSocketAddrs}; +use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use tokio::join; +use tracing::{info, warn}; pub struct ChronyClient { pub server: SocketAddr, @@ -15,82 +19,136 @@ pub struct ChronyClient { config: Config, } -fn datapoint_from_tracking( - t: &reply::Tracking, - config: &Config, -) -> Result> { - let now = SystemTime::now().duration_since(UNIX_EPOCH)?; - let measurement = config.sources.chrony.measurement_prefix.to_owned() - + &config.sources.chrony.tracking_measurement; - let mut builder = - DataPoint::builder(&measurement).timestamp(now.as_nanos().try_into().unwrap()); - for (key, value) in &config.influxdb.tags { - builder = builder.tag(key, value); - } - - let point = builder - .field("ref_id", t.ref_id as i64) - .field("ref_ip_addr", t.ip_addr.to_string()) - .field("stratum", t.stratum as i64) - .field("leap_status", t.leap_status as i64) - .field("current_correction", f64::from(t.current_correction)) - .field("last_offset", f64::from(t.last_offset)) - .field("rms_offset", f64::from(t.rms_offset)) - .field("freq_ppm", f64::from(t.freq_ppm)) - .field("resid_freq_ppm", f64::from(t.resid_freq_ppm)) - .field("skew_ppm", f64::from(t.skew_ppm)) - .field("root_delay", f64::from(t.root_delay)) - .field("root_dispersion", f64::from(t.root_dispersion)) - .field("last_update_interval", f64::from(t.last_update_interval)) - .build()?; - - Ok(point) +#[derive(Debug)] +pub struct ChronyTrackingReport { + tags: Arc>, + pub ref_id: i64, + pub ref_ip_addr: String, + pub stratum: i64, + pub leap_status: i64, + pub current_correction: f64, + pub last_offset: f64, + pub rms_offset: f64, + pub freq_ppm: f64, + pub resid_freq_ppm: f64, + pub skew_ppm: f64, + pub root_delay: f64, + pub root_dispersion: f64, + pub last_update_interval: f64, } -pub fn datapoint_from_sourcedata( - d: &reply::SourceData, - config: &Config, -) -> Result> { - let now = SystemTime::now().duration_since(UNIX_EPOCH)?; - let measurement = config.sources.chrony.measurement_prefix.to_owned() - + &config.sources.chrony.sources_measurement; - let mut builder = - DataPoint::builder(&measurement).timestamp(now.as_nanos().try_into().unwrap()); - for (key, value) in &config.influxdb.tags { - builder = builder.tag(key, value) +impl SourceReportDetails for ChronyTrackingReport { + fn is_healthy(&self) -> bool { + true } - builder = builder - .tag("ref_id", d.ip_addr.to_string()) - .tag( - "mode", - match d.mode { - SourceMode::Client => String::from("server"), - SourceMode::Peer => String::from("peer"), - SourceMode::Ref => String::from("refclock"), - }, - ) - .tag( - "state", - match d.state { - reply::SourceState::Selected => String::from("best"), - reply::SourceState::NonSelectable => String::from("unusable"), - reply::SourceState::Falseticker => String::from("falseticker"), - reply::SourceState::Jittery => String::from("jittery"), - reply::SourceState::Unselected => String::from("combined"), - reply::SourceState::Selectable => String::from("unused"), - }, - ) - .field("poll", d.poll as i64) - .field("stratum", d.stratum as i64) - .field("flags", d.flags.bits() as i64) - .field("reachability", d.reachability.count_ones() as i64) - .field("since_sample", d.since_sample as i64) - .field("orig_latest_meas", f64::from(d.orig_latest_meas)) - .field("latest_meas", f64::from(d.latest_meas)) - .field("latest_meas_err", f64::from(d.latest_meas_err)); - let point = builder.build()?; + fn to_metrics(&self) -> Vec { + let tags = &self.tags; + vec![ + SourceMetric::new_int("ref_id", self.ref_id, tags.clone()), + SourceMetric::new_int("stratum", self.stratum, tags.clone()), + SourceMetric::new_int("leap_status", self.leap_status, tags.clone()), + SourceMetric::new_float("current_correction", self.current_correction, tags.clone()), + SourceMetric::new_float("last_offset", self.last_offset, tags.clone()), + SourceMetric::new_float("rms_offset", self.rms_offset, tags.clone()), + SourceMetric::new_float("freq_ppm", self.freq_ppm, tags.clone()), + SourceMetric::new_float("resid_freq_ppm", self.resid_freq_ppm, tags.clone()), + SourceMetric::new_float("skew_ppm", self.skew_ppm, tags.clone()), + SourceMetric::new_float("root_delay", self.root_delay, tags.clone()), + SourceMetric::new_float("root_dispersion", self.root_dispersion, tags.clone()), + SourceMetric::new_float( + "last_update_interval", + self.last_update_interval, + tags.clone(), + ), + ] + } +} - Ok(point) +#[derive(Debug)] +pub struct ChronySourcesReport { + pub sources: Vec, +} + +impl SourceReportDetails for ChronySourcesReport { + fn is_healthy(&self) -> bool { + //TODO: think about whether there is an idea of unhealthy sources + true + } + fn to_metrics(&self) -> Vec { + let mut metrics = Vec::with_capacity(8 * self.sources.len()); + + for source in &self.sources { + let tags = Arc::new(vec![ + ("ref_id".to_owned(), source.ip_addr.to_string()), + ( + "mode".to_owned(), + match source.mode { + SourceMode::Client => String::from("server"), + SourceMode::Peer => String::from("peer"), + SourceMode::Ref => String::from("refclock"), + }, + ), + ( + "state".to_owned(), + match source.state { + reply::SourceState::Selected => String::from("best"), + reply::SourceState::NonSelectable => String::from("unusable"), + reply::SourceState::Falseticker => String::from("falseticker"), + reply::SourceState::Jittery => String::from("jittery"), + reply::SourceState::Unselected => String::from("combined"), + reply::SourceState::Selectable => String::from("unused"), + }, + ), + ]); + metrics.extend([ + SourceMetric::new_int("poll", source.poll as i64, tags.clone()), + SourceMetric::new_int("stratum", source.stratum as i64, tags.clone()), + SourceMetric::new_int("flags", source.flags.bits() as i64, tags.clone()), + SourceMetric::new_int( + "reachability", + source.reachability.count_ones() as i64, + tags.clone(), + ), + SourceMetric::new_int("since_sample", source.since_sample as i64, tags.clone()), + SourceMetric::new_float( + "orig_latest_meas", + source.orig_latest_meas.into(), + tags.clone(), + ), + SourceMetric::new_float("latest_meas", source.latest_meas.into(), tags.clone()), + SourceMetric::new_float( + "latest_meas_err", + source.latest_meas_err.into(), + tags.clone(), + ), + ]); + } + + metrics + } +} + +fn report_from_tracking( + t: &reply::Tracking, + config: &Config, +) -> Result> { + let report = ChronyTrackingReport { + tags: Arc::new(vec![]), //TODO: allow configuring tags in the source + ref_id: t.ref_id as i64, + ref_ip_addr: t.ip_addr.to_string(), + stratum: t.stratum as i64, + leap_status: t.leap_status as i64, + current_correction: t.current_correction.into(), + last_offset: t.last_offset.into(), + rms_offset: t.rms_offset.into(), + freq_ppm: t.freq_ppm.into(), + resid_freq_ppm: t.resid_freq_ppm.into(), + skew_ppm: t.skew_ppm.into(), + root_delay: t.root_delay.into(), + root_dispersion: t.root_dispersion.into(), + last_update_interval: t.last_update_interval.into(), + }; + Ok(report) } impl ChronyClient { @@ -206,13 +264,16 @@ impl ChronyClient { ) -> Result<(), Box> { let tracking = self.get_tracking().await?; - let tracking_data = datapoint_from_tracking(&tracking, &self.config)?; + let tracking_data = report_from_tracking(&tracking, &self.config)?; + let report = SourceReport { + name: "chrony-tracking".to_owned(), + status: SourceStatus::Unknown, + details: Arc::new(tracking_data), + }; info!("Sending tracking data"); - chan.send(tracking_data.into()) - .expect("Unable to send tracking data to targets"); - + chan.send(report.into())?; Ok(()) } @@ -221,14 +282,14 @@ impl ChronyClient { chan: &ChimemonSourceChannel, ) -> Result<(), Box> { let sources = self.get_sources().await?; - let mut dps = Vec::with_capacity(sources.len()); - for ds in sources { - let source_data = datapoint_from_sourcedata(&ds, &self.config)?; - dps.push(source_data); - } + let details = ChronySourcesReport { sources }; + let report = SourceReport { + name: "chrony-sources".to_owned(), + status: SourceStatus::Unknown, + details: Arc::new(details), + }; info!("Sending source data"); - chan.send(dps.into()) - .expect("Unable to send source data to targets"); + chan.send(report.into())?; Ok(()) } } diff --git a/src/chrony_refclock.rs b/src/chrony_refclock.rs index a7d0302..58002d2 100644 --- a/src/chrony_refclock.rs +++ b/src/chrony_refclock.rs @@ -1,10 +1,10 @@ +use crate::{ChimemonMessage, ChimemonTarget, ChimemonTargetChannel, ChronySockConfig}; use async_trait::async_trait; -use chimemon::{ChimemonMessage, ChimemonTarget, ChimemonTargetChannel, ChronySockConfig}; use libc::{c_double, c_int, timeval}; -use log::debug; use std::mem; use std::os::unix::net::UnixDatagram; use std::path::PathBuf; +use tracing::debug; const CHRONY_MAGIC: c_int = 0x534f434b; diff --git a/src/gpsd.rs b/src/gpsd.rs new file mode 100644 index 0000000..d1423d9 --- /dev/null +++ b/src/gpsd.rs @@ -0,0 +1,451 @@ +use crate::{ + ChimemonMessage, ChimemonSource, ChimemonSourceChannel, Config, SourceMetric, SourceReport, + SourceReportDetails, SourceStatus, +}; + +use std::collections::HashMap; +use std::f64; +use std::fmt::Debug; +use std::net::SocketAddr; +use std::sync::Arc; +use std::time::Duration; + +use async_trait::async_trait; +use backoff::ExponentialBackoff; +use futures::{SinkExt, Stream}; +use futures::{StreamExt, task::Context}; +use gpsd_proto::{ + Device, Gst, Mode, Pps, ResponseHandshake, Sky, Tpv, UnifiedResponse, Version, Watch, +}; +use serde::{Deserialize, Deserializer, Serialize}; +use serde_json; +use tokio::net::{TcpStream, ToSocketAddrs, lookup_host}; +use tokio::time::{Interval, interval, timeout}; +use tokio_util::codec::{Framed, LinesCodec}; +use tracing::{debug, debug_span, info, warn}; + +pub struct GpsdSource { + pub config: Config, + conn: GpsdTransport, + devices: HashMap, + last_gst: Option, + last_pps: Option, + last_tpv: Option, + last_sky: Option, +} + +#[derive(Eq, PartialEq, Clone, Copy, Debug)] +pub enum GpsdFixType { + Unknown, + NoFix, + Fix2D, + Fix3D, + Surveyed, +} + +impl From for GpsdFixType { + fn from(value: u32) -> Self { + match value { + 0 => Self::Unknown, + 1 => Self::NoFix, + 2 => Self::Fix2D, + 3 => Self::Fix3D, + _ => panic!("Invalid fix mode {value}"), + } + } +} + +impl From for GpsdFixType { + fn from(value: Mode) -> Self { + match value { + Mode::NoFix => Self::NoFix, + Mode::Fix2d => Self::Fix2D, + Mode::Fix3d => Self::Fix3D, + } + } +} + +#[derive(Clone, Debug)] +pub struct GpsdSourceReport { + fix_type: GpsdFixType, + sats_visible: u8, + sats_tracked: u8, + tdop: f64, +} + +impl SourceReportDetails for GpsdSourceReport { + fn is_healthy(&self) -> bool { + self.fix_type != GpsdFixType::Unknown && self.fix_type != GpsdFixType::NoFix + } + fn to_metrics(&self) -> Vec { + let no_tags = Arc::new(vec![]); + vec![ + SourceMetric::new_int("sats_visible", self.sats_visible as i64, no_tags.clone()), + SourceMetric::new_int("sats_tracked", self.sats_tracked as i64, no_tags.clone()), + SourceMetric::new_float("tdop", self.tdop, no_tags.clone()), + ] + } +} + +impl GpsdSource { + pub async fn new(config: Config) -> Result { + let conn = GpsdTransport::new(&config.sources.gpsd.host).await?; + Ok(Self { + config, + conn, + devices: HashMap::new(), + last_gst: None, + last_pps: None, + last_tpv: None, + last_sky: None, + }) + } +} + +impl GpsdSource { + async fn send_status(&self, chan: &mut ChimemonSourceChannel) { + let sky = self.last_sky.as_ref(); + let tpv = self.last_tpv.as_ref(); + + let (sats_tracked, sats_visible) = sky.map_or((0, 0), |sky| { + let sats = sky.satellites.as_deref().unwrap_or_default(); + ( + sats.iter().filter(|s| s.used).count() as u8, + sats.len() as u8, + ) + }); + + let tdop = sky + .and_then(|sky| sky.tdop) + .map_or(f64::INFINITY, |tdop| tdop as f64); + + chan.send(ChimemonMessage::SourceReport(SourceReport { + name: "gpsd".into(), + status: SourceStatus::Unknown, + details: Arc::new(GpsdSourceReport { + fix_type: tpv.map_or(GpsdFixType::Unknown, |tpv| tpv.mode.into()), + sats_tracked, + sats_visible, + tdop, + }), + })) + .unwrap(); + } + + fn handle_msg(&mut self, msg: String) -> Result<(), Box> { + let _span = debug_span!("handle_msg").entered(); + let parsed = serde_json::from_str::(&msg)?; + debug!("Received {parsed:?}"); + match parsed { + UnifiedResponse::Device(d) => { + if let Some(path) = &d.path { + self.devices.insert(path.to_owned(), d); + } else { + warn!("No path on DEVICE response, ignoring."); + } + } + UnifiedResponse::Gst(g) => self.last_gst = Some(g), + UnifiedResponse::Pps(p) => self.last_pps = Some(p), + UnifiedResponse::Sky(s) => { + self.last_sky = Some({ + let mut s = s; + if s.satellites.is_none() { + s.satellites = self.last_sky.as_mut().and_then(|old| old.satellites.take()); + } + s + }) + } + UnifiedResponse::Tpv(t) => self.last_tpv = Some(t), + _ => warn!("Unhandled response `{parsed:?}`"), + } + + Ok(()) + } +} + +#[async_trait] +impl ChimemonSource for GpsdSource { + async fn run(mut self, mut chan: ChimemonSourceChannel) { + info!("gpsd task started"); + self.conn.ensure_connection().await.unwrap(); + let mut ticker = interval(Duration::from_secs(self.config.sources.gpsd.interval)); + + let mut params = WatchParams::default(); + params.json = Some(true); + self.conn + .cmd_response(&GpsdCommand::Watch(Some(params))) + .await + .unwrap(); + loop { + let framed = self.conn.framed.as_mut().expect("must be connected"); + tokio::select! { + _ = ticker.tick() => { + self.send_status(&mut chan).await + }, + maybe_msg = framed.next() => { + if let Some(Ok(msg)) = maybe_msg { + self.handle_msg(msg).unwrap() + } + } + } + } + } +} + +struct GpsdTransport { + host: SocketAddr, + framed: Option>, + conn_backoff: ExponentialBackoff, +} + +#[derive(Clone, Copy, Debug, Serialize)] +#[repr(u8)] +pub enum RawMode { + Off = 0, + RawHex = 1, + RawBin = 2, +} + +#[derive(Clone, Copy, Debug, Serialize)] +#[repr(u8)] +pub enum NativeMode { + Nmea = 0, + Alt = 1, +} + +#[derive(Clone, Copy, Debug, Serialize)] +#[repr(u8)] +pub enum ParityMode { + None = b'N', + Odd = b'O', + Even = b'E', +} + +#[derive(Clone, Debug, Serialize)] +struct WatchParams { + #[serde(skip_serializing_if = "Option::is_none")] + enable: Option, + #[serde(skip_serializing_if = "Option::is_none")] + json: Option, + #[serde(skip_serializing_if = "Option::is_none")] + nmea: Option, + #[serde(skip_serializing_if = "Option::is_none")] + raw: Option, + #[serde(skip_serializing_if = "Option::is_none")] + scaled: Option, + #[serde(skip_serializing_if = "Option::is_none")] + split24: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pps: Option, + #[serde(skip_serializing_if = "Option::is_none")] + device: Option, +} + +impl Default for WatchParams { + fn default() -> Self { + WatchParams { + enable: Some(true), + json: Some(false), + nmea: None, + raw: None, + scaled: None, + split24: None, + pps: None, + device: None, + } + } +} + +#[derive(Clone, Debug, Serialize)] +struct DeviceParams { + #[serde(skip_serializing_if = "Option::is_none")] + bps: Option, + #[serde(skip_serializing_if = "Option::is_none")] + cycle: Option, + #[serde(skip_serializing_if = "Option::is_none")] + flags: Option, + #[serde(skip_serializing_if = "Option::is_none")] + hexdata: Option, + #[serde(skip_serializing_if = "Option::is_none")] + native: Option, + #[serde(skip_serializing_if = "Option::is_none")] + parity: Option, + #[serde(skip_serializing_if = "Option::is_none")] + path: Option, + #[serde(skip_serializing_if = "Option::is_none")] + readonly: Option, + #[serde(skip_serializing_if = "Option::is_none")] + sernum: Option, + #[serde(skip_serializing_if = "Option::is_none")] + stopbits: Option, +} + +#[derive(Clone, Debug)] +enum GpsdCommand { + Version, // no params + Devices, // no params + Watch(Option), + Poll, // I don't understand the protocol for this one + Device(Option), +} + +impl GpsdCommand { + fn command_string(&self) -> &str { + match self { + Self::Version => "?VERSION", + Self::Devices => "?DEVICES", + Self::Watch(_) => "?WATCH", + Self::Poll => "?POLL", + Self::Device(_) => "?DEVICE", + } + } + + fn expected_responses(&self) -> usize { + match self { + Self::Version => 1, + Self::Devices => 1, + Self::Watch(_) => 2, + Self::Poll => 1, + Self::Device(_) => 1, + } + } +} + +impl ToString for GpsdCommand { + fn to_string(&self) -> String { + let s = self.command_string().to_owned(); + match self { + Self::Version | Self::Devices | Self::Poll | Self::Watch(None) | Self::Device(None) => { + s + ";" + } + Self::Watch(Some(w)) => s + "=" + &serde_json::to_string(w).unwrap() + ";", + Self::Device(Some(d)) => s + "=" + &serde_json::to_string(d).unwrap() + ";", + } + } +} + +impl GpsdTransport { + async fn new(host: &T) -> Result { + // TODO: implement proper handling of multiple responses + let host_addr = lookup_host(host).await?.next().ok_or(std::io::Error::new( + std::io::ErrorKind::NotFound, + format!("No response looking up `{:?}`", host), + ))?; + Ok(Self { + host: host_addr, + framed: None, + conn_backoff: ExponentialBackoff::default(), + }) + } + async fn connect(&mut self) -> Result<(), Box> { + info!("Connecting to gpsd @ {}", self.host); + let mut framed = backoff::future::retry_notify( + self.conn_backoff.clone(), + async || { + Ok(Framed::new( + TcpStream::connect(self.host).await?, + LinesCodec::new(), + )) + }, + |e, d| warn!("Failed to connect to {} after {:?}: `{}`", self.host, d, e), + ) + .await?; + debug!("Waiting for initial VERSION"); + if let Ok(Some(Ok(r))) = timeout(Duration::from_secs(5), framed.next()).await { + if let Ok(version) = serde_json::from_str::(&r) { + info!( + "Connected to gpsd @ {}, release {}", + self.host, version.release + ) + } else { + warn!("Got unexpected non-VERSION response after connection (`{r}`)") + } + self.framed = Some(framed); + Ok(()) + } else { + Err("Unexpected failure to receive initial handshake response".into()) + } + } + async fn ensure_connection(&mut self) -> Result<(), Box> { + if let Some(conn) = &self.framed { + Ok(()) + } else { + self.connect().await + } + } + async fn cmd_response( + &mut self, + cmd: &GpsdCommand, + ) -> Result, Box> { + debug!("Command: `{cmd:?}`"); + self.ensure_connection().await?; + + let mut responses = Vec::new(); + if let Some(conn) = &mut self.framed { + debug!("Raw command: `{}`", cmd.to_string()); + conn.send(cmd.to_string()).await?; + for _ in 0..cmd.expected_responses() { + match conn.next().await { + None => return Err("Connection lost".into()), + Some(Err(e)) => return Err(format!("Unable to parse response {e}").into()), + Some(Ok(r)) => { + debug!("Raw response: `{r}`"); + responses.push(serde_json::from_str::(&r)?) + } + } + } + } else { + return Err("Missing connection despite ensure".into()); + } + + Ok(responses) + } + + async fn stream( + &mut self, + ) -> Result< + impl Stream>>, + Box, + > { + self.ensure_connection().await?; + if let Some(conn) = &mut self.framed { + Ok(conn.map(|line| Ok(serde_json::from_str::(&line?)?))) + } else { + Err("No connection after connecting.".into()) + } + } +} + +mod tests { + use gpsd_proto::{ResponseData, UnifiedResponse}; + use tokio_stream::StreamExt; + + use crate::gpsd::{GpsdCommand, GpsdTransport, WatchParams}; + use std::sync::Once; + + static INIT: Once = Once::new(); + + fn init_logger() { + INIT.call_once(|| tracing_subscriber::fmt::init()); + } + + #[tokio::test] + async fn test_gpsd() { + init_logger(); + let mut gpsd = GpsdTransport::new(&"192.168.65.93:2947").await.unwrap(); + gpsd.connect().await.unwrap(); + let mut params = WatchParams::default(); + params.enable = Some(true); + params.json = Some(true); + let res = { + gpsd.cmd_response(&GpsdCommand::Watch(Some(params))) + .await + .unwrap() + }; + println!("{res:?}"); + while let Some(Ok(s)) = gpsd.framed.as_mut().unwrap().next().await { + let res2 = serde_json::from_str::(&s); + println!("{res2:?}"); + } + } +} diff --git a/src/hwmon.rs b/src/hwmon.rs index 568f28e..41b839a 100644 --- a/src/hwmon.rs +++ b/src/hwmon.rs @@ -1,23 +1,100 @@ +use crate::{ + ChimemonSource, ChimemonSourceChannel, Config, SourceMetric, SourceReport, SourceReportDetails, + SourceStatus, +}; use async_trait::async_trait; -use chimemon::{ChimemonSource, ChimemonSourceChannel, Config}; -use futures::{stream, StreamExt}; +use futures::{StreamExt, stream}; use influxdb2::models::DataPoint; -use log::{debug, info}; use std::{ + fs::File, + io::Read, path::PathBuf, + sync::Arc, time::{Duration, SystemTime, UNIX_EPOCH}, }; +use tracing::{Instrument, debug, error, info, info_span, warn}; pub struct HwmonSource { config: Config, - sensors: Vec, + sensors: Vec>, } +#[derive(Debug, Clone)] struct HwmonSensor { - path: PathBuf, + name: String, + value_path: PathBuf, device: String, sensor: String, - name: String, + label: Option, + tags: Arc>, +} + +impl HwmonSensor { + fn new(name: &str, device: &str, sensor: &str) -> Self { + let value_path = PathBuf::from(HWMON_ROOT) + .join(device) + .join(sensor.to_owned() + "_input"); + let label_path_raw = PathBuf::from(HWMON_ROOT) + .join(device) + .join(sensor.to_owned() + "_label"); + let label = if label_path_raw.is_file() { + let mut f = + File::open(&label_path_raw).expect(&format!("Unable to open `{label_path_raw:?}`")); + let mut label = String::new(); + f.read_to_string(&mut label) + .expect(&format!("Unable to read from `{label_path_raw:?}")); + Some(label.trim().to_owned()) + } else { + None + }; + let mut tags_vec = vec![ + ("name".to_owned(), name.to_owned()), + ("device".to_owned(), device.to_owned()), + ("sensor".to_owned(), sensor.to_owned()), + ]; + if let Some(label) = &label { + tags_vec.push(("label".to_owned(), label.clone())) + } + Self { + value_path, + label, + device: device.to_owned(), + sensor: sensor.to_owned(), + name: name.to_owned(), + tags: Arc::new(tags_vec), + } + } +} + +#[derive(Debug)] +struct HwmonReport { + values: Vec<(Arc, f64)>, +} + +impl SourceReportDetails for HwmonReport { + fn is_healthy(&self) -> bool { + //self.alarms.iter().any(|(_sensor, alarm)| *alarm) + true + } + fn to_metrics(&self) -> Vec { + let mut metrics = Vec::new(); + for (sensor, value) in &self.values { + metrics.push(SourceMetric::new_float( + "hwmon_value", + *value, + sensor.tags.clone(), + )) + } + // for (sensor, alarm) in &self.alarms { + // metrics.push(SourceMetric::new_bool( + // "hwmon_alarm", + // *alarm, + // sensor.tags.clone(), + // )) + // } + + metrics + } } const HWMON_ROOT: &str = "/sys/class/hwmon"; @@ -29,19 +106,14 @@ impl HwmonSource { .hwmon .sensors .iter() - .map(|(k, v)| HwmonSensor { - name: k.into(), - device: v.name.clone(), - sensor: v.sensor.clone(), - path: PathBuf::from(HWMON_ROOT).join(&v.name).join(&v.sensor), - }) + .map(|(k, v)| Arc::new(HwmonSensor::new(k, &v.name, &v.sensor))) .collect(); HwmonSource { config, sensors } } async fn get_raw_value(sensor: &HwmonSensor) -> Result { - tokio::fs::read_to_string(&sensor.path).await + tokio::fs::read_to_string(&sensor.value_path).await } } @@ -53,31 +125,38 @@ impl ChimemonSource for HwmonSource { tokio::time::interval(Duration::from_secs(self.config.sources.hwmon.interval)); loop { interval.tick().await; - let s = stream::iter(&self.sensors).then(|s| async { - let sensor_val = HwmonSource::get_raw_value(s) - .await - .expect("Unable to read sensor"); - debug!( - "hwmon {} raw value {}", - s.path.to_string_lossy(), - sensor_val - ); - let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap(); - let mut builder = DataPoint::builder(&self.config.sources.hwmon.measurement) - .timestamp(now.as_nanos().try_into().unwrap()); - for (key, value) in &self.config.influxdb.tags { - builder = builder.tag(key, value) + let mut values = Vec::new(); + for s in &self.sensors { + if let Ok(sensor_val) = HwmonSource::get_raw_value(s).await { + debug!( + "hwmon {} raw value {}", + s.value_path.to_string_lossy(), + sensor_val + ); + if let Ok(parsed) = sensor_val.trim().parse::() { + values.push((s.clone(), parsed)); + } else { + error!( + "Unable to parse sensor value {sensor_val} at {}", + s.value_path.to_string_lossy() + ); + } + } else { + error!("Unable to get hwmon sensor value"); } - builder = builder - .tag("name", &s.name) - .tag("sensor", &s.sensor) - .tag("device", &s.device) - .field("value", sensor_val.trim().parse::().unwrap()); - builder.build().unwrap() - }); + } + let report = SourceReport { + name: "hwmon".to_owned(), + status: SourceStatus::Healthy, + details: Arc::new(HwmonReport { values }), + }; info!("Writing hwmon data"); - chan.send(s.collect::>().await.into()) - .unwrap(); + match chan.send(report.into()) { + Ok(_) => {} + Err(e) => { + warn!("Unable to send to message channel ({e})") + } + } } } } diff --git a/src/lib.rs b/src/lib.rs index d83cbc2..f309d48 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,19 +1,27 @@ +pub mod chrony; +pub mod chrony_refclock; +pub mod gpsd; +pub mod hwmon; +pub mod uccm; + use async_trait::async_trait; use chrono::{DateTime, Utc}; use figment::{ + Figment, providers::{Format, Serialized, Toml}, util::map, value::Map, - Figment, }; use gethostname::gethostname; use influxdb2::models::DataPoint; use serde_derive::{Deserialize, Serialize}; -use std::path::Path; use tokio::sync::broadcast::*; +use std::{fmt::Debug, net::SocketAddr, path::Path, sync::Arc}; + #[derive(Serialize, Deserialize, Clone)] pub struct InfluxConfig { + pub enabled: bool, pub url: String, pub org: String, pub bucket: String, @@ -25,6 +33,7 @@ impl Default for InfluxConfig { fn default() -> Self { let host = gethostname().into_string().unwrap(); InfluxConfig { + enabled: false, url: "http://localhost:8086".into(), org: "default".into(), bucket: "default".into(), @@ -100,6 +109,24 @@ impl Default for HwmonConfig { } } } + +#[derive(Serialize, Deserialize, Clone)] +pub struct GpsdConfig { + pub enabled: bool, + pub interval: u64, + pub host: String, +} + +impl Default for GpsdConfig { + fn default() -> Self { + GpsdConfig { + enabled: false, + interval: 60, + host: "localhost:2947".into(), + } + } +} + #[derive(Clone, Debug)] pub struct TimeReport { pub system_time: DateTime, @@ -109,6 +136,67 @@ pub struct TimeReport { pub valid: bool, } +#[derive(Clone, Debug)] +pub enum SourceStatus { + Healthy, + LossOfSignal(Option), + LossOfSync(Option), + Other(Option), + Unknown, +} + +#[derive(Clone, Debug)] +pub enum MetricValue { + Int(i64), + Float(f64), + Bool(bool), +} + +#[derive(Clone, Debug)] +pub struct SourceMetric { + name: String, + value: MetricValue, + tags: Arc>, +} + +impl SourceMetric { + pub fn new_int(name: &str, value: i64, tags: Arc>) -> Self { + Self { + name: name.to_owned(), + value: MetricValue::Int(value), + tags, + } + } + + pub fn new_float(name: &str, value: f64, tags: Arc>) -> Self { + Self { + name: name.to_owned(), + value: MetricValue::Float(value), + tags, + } + } + + pub fn new_bool(name: &str, value: bool, tags: Arc>) -> Self { + Self { + name: name.to_owned(), + value: MetricValue::Bool(value), + tags, + } + } +} + +pub trait SourceReportDetails: Debug + Send + Sync { + fn to_metrics(&self) -> Vec; + fn is_healthy(&self) -> bool; +} + +#[derive(Clone, Debug)] +pub struct SourceReport { + pub name: String, + pub status: SourceStatus, + pub details: Arc, +} + #[derive(Serialize, Deserialize, Clone)] pub struct UCCMConfig { pub enabled: bool, @@ -137,6 +225,7 @@ pub struct SourcesConfig { pub chrony: ChronyConfig, pub hwmon: HwmonConfig, pub uccm: UCCMConfig, + pub gpsd: GpsdConfig, } #[derive(Serialize, Deserialize, Clone, Default)] @@ -155,11 +244,12 @@ pub fn load_config(filename: &Path) -> Figment { Figment::from(Serialized::defaults(Config::default())).merge(Toml::file(filename)) } -#[derive(Clone, Debug)] +#[derive(Debug, Clone)] pub enum ChimemonMessage { DataPoint(DataPoint), DataPoints(Vec), TimeReport(TimeReport), + SourceReport(SourceReport), } impl From for ChimemonMessage { @@ -179,6 +269,12 @@ impl From for ChimemonMessage { } } +impl From for ChimemonMessage { + fn from(sr: SourceReport) -> Self { + ChimemonMessage::SourceReport(sr) + } +} + pub type ChimemonSourceChannel = Sender; pub type ChimemonTargetChannel = Receiver; diff --git a/src/main.rs b/src/main.rs index 6f1d7a6..3098ea6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,17 +1,12 @@ -mod chrony; -mod chrony_refclock; -mod hwmon; -mod uccm; - use clap::{Parser, ValueEnum}; -use env_logger::{self, Env}; use futures::future::join_all; -use log::{debug, info}; use std::path::Path; use tokio::sync::broadcast; +use tracing::{Instrument, debug, error, info, info_span, warn}; +use tracing_subscriber; use crate::{chrony::*, chrony_refclock::ChronySockServer, hwmon::HwmonSource, uccm::UCCMMonitor}; -use chimemon::*; +use chimemon::{gpsd::GpsdSource, *}; const PROGRAM_NAME: &str = "chimemon"; const VERSION: &str = "0.0.1"; @@ -35,33 +30,51 @@ struct Args { #[tokio::main(flavor = "current_thread")] async fn main() -> Result<(), Box> { - let args = Args::parse(); - let loglevel = args - .log_level - .to_possible_value() - .unwrap() - .get_name() - .to_owned(); - env_logger::Builder::from_env(Env::default().default_filter_or(loglevel)).init(); + tracing_subscriber::fmt::init(); - info!("{} v{} starting...", PROGRAM_NAME, VERSION); + let args = Args::parse(); + + info!("{PROGRAM_NAME} v{VERSION} starting..."); let fig = load_config(Path::new(&args.config_file)); - debug!("{:?}", fig); + debug!("{fig:?}"); let config: Config = fig.extract()?; let mut tasks = Vec::new(); let (tx, _) = broadcast::channel(16); let sourcechan: ChimemonSourceChannel = tx; - info!( - "Connecting to influxdb {} org: {} using token", - &config.influxdb.url, &config.influxdb.org - ); - let influx = influxdb2::Client::new( - &config.influxdb.url, - &config.influxdb.org, - &config.influxdb.token, - ); + if config.influxdb.enabled { + info!( + "Connecting to influxdb {} org: {} using token", + &config.influxdb.url, &config.influxdb.org + ); + let config = config.clone(); + let influx = influxdb2::Client::new( + &config.influxdb.url, + &config.influxdb.org, + &config.influxdb.token, + ); + + let mut influxrx = sourcechan.subscribe(); + + tasks.push(tokio::spawn( + async move { + let stream = async_stream::stream! { + while let Ok(msg) = influxrx.recv().await { + match msg { ChimemonMessage::DataPoint(dp) => { + yield dp + }, ChimemonMessage::DataPoints(dps) => { + for p in dps { + yield p + } + }, _ => {} + } } + }; + influx.write(&config.influxdb.bucket, stream).await.unwrap(); + } + .instrument(info_span!("influx-task")), + )); + } let chrony = if config.sources.chrony.enabled { Some(ChronyClient::new(config.to_owned())) @@ -69,7 +82,10 @@ async fn main() -> Result<(), Box> { None }; if let Some(c) = chrony { - tasks.push(tokio::spawn(c.run(sourcechan.clone()))); + tasks.push(tokio::spawn( + c.run(sourcechan.clone()) + .instrument(info_span!("chrony-task")), + )); }; let hwmon = if config.sources.hwmon.enabled { @@ -78,18 +94,23 @@ async fn main() -> Result<(), Box> { None }; if let Some(hwmon) = hwmon { - tasks.push(tokio::spawn(hwmon.run(sourcechan.clone()))); + tasks.push(tokio::spawn( + hwmon + .run(sourcechan.clone()) + .instrument(info_span!("hwmon-task")), + )); }; let uccm = if config.sources.uccm.enabled { - info!("Spawning UCCMMonitor"); Some(UCCMMonitor::new(config.to_owned())) } else { - info!("UCCMMonitor not configured"); None }; if let Some(uccm) = uccm { - tasks.push(tokio::spawn(uccm.run(sourcechan.clone()))); + tasks.push(tokio::spawn( + uccm.run(sourcechan.clone()) + .instrument(info_span!("uccm-task")), + )); }; let chrony_refclock = if config.targets.chrony.enabled { @@ -98,33 +119,47 @@ async fn main() -> Result<(), Box> { None }; if let Some(chrony_refclock) = chrony_refclock { - tasks.push(tokio::spawn(chrony_refclock.run(sourcechan.subscribe()))); + tasks.push(tokio::spawn( + chrony_refclock + .run(sourcechan.subscribe()) + .instrument(info_span!("chrony-refclock-task")), + )); }; - let mut influxrx = sourcechan.subscribe(); + let gpsd = if config.sources.gpsd.enabled { + Some(GpsdSource::new(config.to_owned()).await.unwrap()) + } else { + None + }; + if let Some(gpsd) = gpsd { + tasks.push(tokio::spawn( + gpsd.run(sourcechan.clone()) + .instrument(info_span!("gpsd-task")), + )) + } - tasks.push(tokio::spawn(async move { - let stream = async_stream::stream! { - while let Ok(msg) = influxrx.recv().await { - match msg { ChimemonMessage::DataPoint(dp) => { - yield dp - }, ChimemonMessage::DataPoints(dps) => { - for p in dps { - yield p + if tasks.len() == 0 { + error!("No tasks configured, exiting."); + return Ok(()); // not an error, but exit before starting a dummy task + } + + if sourcechan.receiver_count() == 0 { + warn!("No consumers configured, events will be discarded"); + let mut chan = sourcechan.subscribe(); + // spawn a dummy task to reap the channel and keep the process alive + tasks.push(tokio::spawn( + async move { + loop { + while let Ok(m) = chan.recv().await { + info!("received {m:?}"); } - }, ChimemonMessage::TimeReport(_tr) => {} - } } - }; - influx.write(&config.influxdb.bucket, stream).await.unwrap(); - })); + } + } + .instrument(info_span!("dummy-receiver-task")), + )) + } - // let mut debugrx = sourcechan.subscribe(); - // tasks.push(tokio::spawn(async move { - // loop { - // let v = debugrx.recv().await; - // warn!("streamed: {:?}", v.unwrap()); - // } - // })); + debug!("Task setup complete, tasks: {}", tasks.len()); join_all(tasks).await; diff --git a/src/uccm.rs b/src/uccm.rs index 5fd60f8..9ee4e76 100644 --- a/src/uccm.rs +++ b/src/uccm.rs @@ -1,14 +1,13 @@ +use crate::{ChimemonMessage, ChimemonSource, ChimemonSourceChannel, Config, TimeReport}; use async_trait::async_trait; use bitflags::bitflags; use byteorder::{BigEndian, ReadBytesExt}; use bytes::{Buf, BytesMut}; -use chimemon::{ChimemonMessage, ChimemonSource, ChimemonSourceChannel, Config, TimeReport}; use chrono::{DateTime, Duration, NaiveDateTime, Utc}; use figment::value::Map; -use influxdb2::models::data_point::DataPointBuilder; use influxdb2::models::DataPoint; +use influxdb2::models::data_point::DataPointBuilder; use itertools::Itertools; -use log::{debug, info, warn}; use std::io::{BufRead, Cursor}; use std::str; use std::sync::Arc; @@ -17,6 +16,7 @@ use tokio::join; use tokio::sync::Mutex; use tokio::time::sleep; use tokio_serial::{SerialPort, SerialStream}; +use tracing::{debug, info, warn}; pub const GPS_EPOCH: i64 = 315964800; // Doesn't seem possible to have a const DateTime object pub type UccmEndian = BigEndian; @@ -257,7 +257,8 @@ impl TryFrom<&str> for UCCMLoopDiagReport { "No lines!", ))??; let ocxo_val = ocxo_line - .split(':').nth(1) + .split(':') + .nth(1) .ok_or(std::io::Error::new( std::io::ErrorKind::InvalidData, "no colon!", @@ -280,7 +281,10 @@ impl TryFrom<&str> for UCCMGPSSatsReport { "Invalid response (expected `NSATS CNOS`)", ))?; let nsats = nsats.parse::().map_err(|e| { - std::io::Error::new(std::io::ErrorKind::InvalidData, format!("Invalid number of sats ({e})")) + std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!("Invalid number of sats ({e})"), + ) })?; let tracked_svs = cnos .split(',') @@ -428,13 +432,14 @@ impl UCCMMonitor { >= Duration::from_std(self.config.sources.uccm.status_interval) .unwrap() { - let mut points = vec![tod - .as_builder( + let mut points = vec![ + tod.as_builder( &self.config.sources.uccm.measurement, &self.config.influxdb.tags, ) .build() - .unwrap()]; + .unwrap(), + ]; if let Some(loop_diag) = &last_loop_diag { points.push( loop_diag