refactoring, log->tracing, gpsd source

This commit is contained in:
2026-01-03 15:38:21 -08:00
parent a1c2e1d8cb
commit 4dabcbe985
9 changed files with 1034 additions and 266 deletions

179
Cargo.lock generated
View File

@@ -148,23 +148,26 @@ dependencies = [
"bytemuck", "bytemuck",
] ]
[[package]]
name = "atty"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8"
dependencies = [
"hermit-abi 0.1.19",
"libc",
"winapi",
]
[[package]] [[package]]
name = "autocfg" name = "autocfg"
version = "1.4.0" version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26"
[[package]]
name = "backoff"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b62ddb9cb1ec0a098ad4bbf9344d0713fa193ae1a80af55febcff2627b6a00c1"
dependencies = [
"futures-core",
"getrandom 0.2.16",
"instant",
"pin-project-lite",
"rand",
"tokio",
]
[[package]] [[package]]
name = "backtrace" name = "backtrace"
version = "0.3.74" version = "0.3.74"
@@ -255,25 +258,31 @@ version = "0.2.0"
dependencies = [ dependencies = [
"async-stream", "async-stream",
"async-trait", "async-trait",
"backoff",
"bitflags 1.3.2", "bitflags 1.3.2",
"byteorder", "byteorder",
"bytes", "bytes",
"chrono", "chrono",
"chrony-candm", "chrony-candm",
"clap", "clap",
"env_logger",
"figment", "figment",
"futures", "futures",
"gethostname", "gethostname",
"gpsd_proto",
"influxdb2", "influxdb2",
"itertools 0.14.0", "itertools 0.14.0",
"libc", "libc",
"log", "log",
"serde", "serde",
"serde_derive", "serde_derive",
"serde_json",
"serde_repr",
"tokio", "tokio",
"tokio-serial", "tokio-serial",
"tokio-stream", "tokio-stream",
"tokio-util",
"tracing",
"tracing-subscriber 0.3.22",
] ]
[[package]] [[package]]
@@ -461,19 +470,6 @@ dependencies = [
"cfg-if", "cfg-if",
] ]
[[package]]
name = "env_logger"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a12e6657c4c97ebab115a42dcee77225f7f482cdd841cf7088c657a42e9e00e7"
dependencies = [
"atty",
"humantime",
"log",
"regex",
"termcolor",
]
[[package]] [[package]]
name = "equivalent" name = "equivalent"
version = "1.0.2" version = "1.0.2"
@@ -664,6 +660,18 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "558b88954871f5e5b2af0e62e2e176c8bde7a6c2c4ed41b13d138d96da2e2cbd" checksum = "558b88954871f5e5b2af0e62e2e176c8bde7a6c2c4ed41b13d138d96da2e2cbd"
[[package]]
name = "gpsd_proto"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "247df1fffe6bc378377d6cf894dac08aef2194a6c1c9e2f173c3c10979fa5ca5"
dependencies = [
"log",
"serde",
"serde_derive",
"serde_json",
]
[[package]] [[package]]
name = "h2" name = "h2"
version = "0.3.26" version = "0.3.26"
@@ -695,15 +703,6 @@ version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
[[package]]
name = "hermit-abi"
version = "0.1.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33"
dependencies = [
"libc",
]
[[package]] [[package]]
name = "hermit-abi" name = "hermit-abi"
version = "0.3.9" version = "0.3.9"
@@ -750,12 +749,6 @@ version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9"
[[package]]
name = "humantime"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b112acc8b3adf4b107a8ec20977da0273a8c386765a3ec0229bd500a1443f9f"
[[package]] [[package]]
name = "hyper" name = "hyper"
version = "0.14.32" version = "0.14.32"
@@ -995,7 +988,7 @@ dependencies = [
"snafu", "snafu",
"tempfile", "tempfile",
"tracing", "tracing",
"tracing-subscriber", "tracing-subscriber 0.2.25",
"url", "url",
] ]
@@ -1231,6 +1224,15 @@ dependencies = [
"minimal-lexical", "minimal-lexical",
] ]
[[package]]
name = "nu-ansi-term"
version = "0.50.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5"
dependencies = [
"windows-sys 0.59.0",
]
[[package]] [[package]]
name = "num-traits" name = "num-traits"
version = "0.2.19" version = "0.2.19"
@@ -1246,7 +1248,7 @@ version = "1.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43"
dependencies = [ dependencies = [
"hermit-abi 0.3.9", "hermit-abi",
"libc", "libc",
] ]
@@ -1641,18 +1643,28 @@ dependencies = [
[[package]] [[package]]
name = "serde" name = "serde"
version = "1.0.219" version = "1.0.228"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f0e2c6ed6606019b4e29e69dbaba95b11854410e5347d525002456dbbb786b6" checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e"
dependencies = [
"serde_core",
"serde_derive",
]
[[package]]
name = "serde_core"
version = "1.0.228"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad"
dependencies = [ dependencies = [
"serde_derive", "serde_derive",
] ]
[[package]] [[package]]
name = "serde_derive" name = "serde_derive"
version = "1.0.219" version = "1.0.228"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00" checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@@ -1661,14 +1673,15 @@ dependencies = [
[[package]] [[package]]
name = "serde_json" name = "serde_json"
version = "1.0.140" version = "1.0.146"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "20068b6e96dc6c9bd23e01df8827e6c7e1f2fddd43c21810382803c136b99373" checksum = "217ca874ae0207aac254aa02c957ded05585a90892cc8d87f9e5fa49669dadd8"
dependencies = [ dependencies = [
"itoa", "itoa",
"memchr", "memchr",
"ryu", "ryu",
"serde", "serde",
"serde_core",
] ]
[[package]] [[package]]
@@ -1682,6 +1695,17 @@ dependencies = [
"thiserror 1.0.69", "thiserror 1.0.69",
] ]
[[package]]
name = "serde_repr"
version = "0.1.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "175ee3e80ae9982737ca543e96133087cbd9a485eecc3bc4de9c1a37b47ea59c"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.101",
]
[[package]] [[package]]
name = "serde_spanned" name = "serde_spanned"
version = "0.6.8" version = "0.6.8"
@@ -1873,15 +1897,6 @@ dependencies = [
"windows-sys 0.59.0", "windows-sys 0.59.0",
] ]
[[package]]
name = "termcolor"
version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06794f8f6c5c898b3275aebefa6b8a1cb24cd2c6c79397ab15774837a0bc5755"
dependencies = [
"winapi-util",
]
[[package]] [[package]]
name = "thiserror" name = "thiserror"
version = "1.0.69" version = "1.0.69"
@@ -2007,9 +2022,9 @@ dependencies = [
[[package]] [[package]]
name = "tokio-util" name = "tokio-util"
version = "0.7.15" version = "0.7.17"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "66a539a9ad6d5d281510d5bd368c973d636c02dbf8a67300bfb6b950696ad7df" checksum = "2efa149fe76073d6e8fd97ef4f4eca7b67f599660115591483572e406e165594"
dependencies = [ dependencies = [
"bytes", "bytes",
"futures-core", "futures-core",
@@ -2078,9 +2093,9 @@ checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3"
[[package]] [[package]]
name = "tracing" name = "tracing"
version = "0.1.41" version = "0.1.44"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100"
dependencies = [ dependencies = [
"pin-project-lite", "pin-project-lite",
"tracing-attributes", "tracing-attributes",
@@ -2089,9 +2104,9 @@ dependencies = [
[[package]] [[package]]
name = "tracing-attributes" name = "tracing-attributes"
version = "0.1.28" version = "0.1.31"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "395ae124c09f9e6918a2310af6038fba074bcf474ac352496d5910dd59a2226d" checksum = "7490cfa5ec963746568740651ac6781f701c9c5ea257c58e057f3ba8cf69e8da"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@@ -2100,14 +2115,25 @@ dependencies = [
[[package]] [[package]]
name = "tracing-core" name = "tracing-core"
version = "0.1.33" version = "0.1.36"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e672c95779cf947c5311f83787af4fa8fffd12fb27e4993211a84bdfd9610f9c" checksum = "db97caf9d906fbde555dd62fa95ddba9eecfd14cb388e4f491a66d74cd5fb79a"
dependencies = [ dependencies = [
"once_cell", "once_cell",
"valuable", "valuable",
] ]
[[package]]
name = "tracing-log"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3"
dependencies = [
"log",
"once_cell",
"tracing-core",
]
[[package]] [[package]]
name = "tracing-serde" name = "tracing-serde"
version = "0.1.3" version = "0.1.3"
@@ -2140,6 +2166,20 @@ dependencies = [
"tracing-serde", "tracing-serde",
] ]
[[package]]
name = "tracing-subscriber"
version = "0.3.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2f30143827ddab0d256fd843b7a66d164e9f271cfa0dde49142c5ca0ca291f1e"
dependencies = [
"nu-ansi-term",
"sharded-slab",
"smallvec",
"thread_local",
"tracing-core",
"tracing-log",
]
[[package]] [[package]]
name = "try-lock" name = "try-lock"
version = "0.2.5" version = "0.2.5"
@@ -2357,15 +2397,6 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]]
name = "winapi-util"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb"
dependencies = [
"windows-sys 0.59.0",
]
[[package]] [[package]]
name = "winapi-x86_64-pc-windows-gnu" name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0" version = "0.4.0"

View File

@@ -1,7 +1,11 @@
[package] [package]
name = "chimemon" name = "chimemon"
version = "0.2.0" version = "0.2.0"
edition = "2021" edition = "2024"
[features]
default = []
release-logs = ["tracing/max_level_info"]
[dependencies] [dependencies]
serde = "1.0" serde = "1.0"
@@ -14,7 +18,6 @@ clap = { version = "4.0", features = ["derive"] }
log = "0.4" log = "0.4"
figment = { version = "0.10", features = ["toml"] } figment = { version = "0.10", features = ["toml"] }
gethostname = "0.3" gethostname = "0.3"
env_logger = "0.9.1"
futures = "0.3.24" futures = "0.3.24"
async-trait = "0.1.58" async-trait = "0.1.58"
tokio-stream = { version = "0.1.11", features = ["sync"] } tokio-stream = { version = "0.1.11", features = ["sync"] }
@@ -26,6 +29,13 @@ chrono = "0.4.23"
libc = "0.2.137" libc = "0.2.137"
async-stream = "0.3.6" async-stream = "0.3.6"
itertools = "0.14.0" itertools = "0.14.0"
gpsd_proto = { version = "1.0.0" }
tokio-util = { version = "0.7.17", features = ["codec"] }
serde_json = "1.0.146"
backoff = { version = "0.4.0", features = ["tokio"] }
serde_repr = "0.1.20"
tracing = "0.1.44"
tracing-subscriber = { version = "0.3.22", features = ["fmt"] }
[dependencies.chrony-candm] [dependencies.chrony-candm]
git = "https://github.com/aws/chrony-candm" git = "https://github.com/aws/chrony-candm"

View File

@@ -1,13 +1,17 @@
use crate::{
ChimemonSource, ChimemonSourceChannel, Config, SourceMetric, SourceReport, SourceReportDetails,
SourceStatus,
};
use async_trait::async_trait; use async_trait::async_trait;
use chimemon::{ChimemonSource, ChimemonSourceChannel, Config}; use chrony_candm::reply::{self, ReplyBody, SourceMode, SourceState};
use chrony_candm::reply::{self, ReplyBody, SourceMode};
use chrony_candm::request::{self, RequestBody}; use chrony_candm::request::{self, RequestBody};
use chrony_candm::{blocking_query, ClientOptions}; use chrony_candm::{ClientOptions, blocking_query};
use influxdb2::models::DataPoint; use influxdb2::models::DataPoint;
use log::{info, warn};
use std::net::{SocketAddr, ToSocketAddrs}; use std::net::{SocketAddr, ToSocketAddrs};
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH}; use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::join; use tokio::join;
use tracing::{info, warn};
pub struct ChronyClient { pub struct ChronyClient {
pub server: SocketAddr, pub server: SocketAddr,
@@ -15,82 +19,136 @@ pub struct ChronyClient {
config: Config, config: Config,
} }
fn datapoint_from_tracking( #[derive(Debug)]
t: &reply::Tracking, pub struct ChronyTrackingReport {
config: &Config, tags: Arc<Vec<(String, String)>>,
) -> Result<DataPoint, Box<dyn std::error::Error>> { pub ref_id: i64,
let now = SystemTime::now().duration_since(UNIX_EPOCH)?; pub ref_ip_addr: String,
let measurement = config.sources.chrony.measurement_prefix.to_owned() pub stratum: i64,
+ &config.sources.chrony.tracking_measurement; pub leap_status: i64,
let mut builder = pub current_correction: f64,
DataPoint::builder(&measurement).timestamp(now.as_nanos().try_into().unwrap()); pub last_offset: f64,
for (key, value) in &config.influxdb.tags { pub rms_offset: f64,
builder = builder.tag(key, value); pub freq_ppm: f64,
} pub resid_freq_ppm: f64,
pub skew_ppm: f64,
let point = builder pub root_delay: f64,
.field("ref_id", t.ref_id as i64) pub root_dispersion: f64,
.field("ref_ip_addr", t.ip_addr.to_string()) pub last_update_interval: f64,
.field("stratum", t.stratum as i64)
.field("leap_status", t.leap_status as i64)
.field("current_correction", f64::from(t.current_correction))
.field("last_offset", f64::from(t.last_offset))
.field("rms_offset", f64::from(t.rms_offset))
.field("freq_ppm", f64::from(t.freq_ppm))
.field("resid_freq_ppm", f64::from(t.resid_freq_ppm))
.field("skew_ppm", f64::from(t.skew_ppm))
.field("root_delay", f64::from(t.root_delay))
.field("root_dispersion", f64::from(t.root_dispersion))
.field("last_update_interval", f64::from(t.last_update_interval))
.build()?;
Ok(point)
} }
pub fn datapoint_from_sourcedata( impl SourceReportDetails for ChronyTrackingReport {
d: &reply::SourceData, fn is_healthy(&self) -> bool {
config: &Config, true
) -> Result<DataPoint, Box<dyn std::error::Error>> {
let now = SystemTime::now().duration_since(UNIX_EPOCH)?;
let measurement = config.sources.chrony.measurement_prefix.to_owned()
+ &config.sources.chrony.sources_measurement;
let mut builder =
DataPoint::builder(&measurement).timestamp(now.as_nanos().try_into().unwrap());
for (key, value) in &config.influxdb.tags {
builder = builder.tag(key, value)
} }
builder = builder fn to_metrics(&self) -> Vec<SourceMetric> {
.tag("ref_id", d.ip_addr.to_string()) let tags = &self.tags;
.tag( vec![
"mode", SourceMetric::new_int("ref_id", self.ref_id, tags.clone()),
match d.mode { SourceMetric::new_int("stratum", self.stratum, tags.clone()),
SourceMode::Client => String::from("server"), SourceMetric::new_int("leap_status", self.leap_status, tags.clone()),
SourceMode::Peer => String::from("peer"), SourceMetric::new_float("current_correction", self.current_correction, tags.clone()),
SourceMode::Ref => String::from("refclock"), SourceMetric::new_float("last_offset", self.last_offset, tags.clone()),
}, SourceMetric::new_float("rms_offset", self.rms_offset, tags.clone()),
) SourceMetric::new_float("freq_ppm", self.freq_ppm, tags.clone()),
.tag( SourceMetric::new_float("resid_freq_ppm", self.resid_freq_ppm, tags.clone()),
"state", SourceMetric::new_float("skew_ppm", self.skew_ppm, tags.clone()),
match d.state { SourceMetric::new_float("root_delay", self.root_delay, tags.clone()),
reply::SourceState::Selected => String::from("best"), SourceMetric::new_float("root_dispersion", self.root_dispersion, tags.clone()),
reply::SourceState::NonSelectable => String::from("unusable"), SourceMetric::new_float(
reply::SourceState::Falseticker => String::from("falseticker"), "last_update_interval",
reply::SourceState::Jittery => String::from("jittery"), self.last_update_interval,
reply::SourceState::Unselected => String::from("combined"), tags.clone(),
reply::SourceState::Selectable => String::from("unused"), ),
}, ]
) }
.field("poll", d.poll as i64) }
.field("stratum", d.stratum as i64)
.field("flags", d.flags.bits() as i64)
.field("reachability", d.reachability.count_ones() as i64)
.field("since_sample", d.since_sample as i64)
.field("orig_latest_meas", f64::from(d.orig_latest_meas))
.field("latest_meas", f64::from(d.latest_meas))
.field("latest_meas_err", f64::from(d.latest_meas_err));
let point = builder.build()?;
Ok(point) #[derive(Debug)]
pub struct ChronySourcesReport {
pub sources: Vec<reply::SourceData>,
}
impl SourceReportDetails for ChronySourcesReport {
fn is_healthy(&self) -> bool {
//TODO: think about whether there is an idea of unhealthy sources
true
}
fn to_metrics(&self) -> Vec<SourceMetric> {
let mut metrics = Vec::with_capacity(8 * self.sources.len());
for source in &self.sources {
let tags = Arc::new(vec![
("ref_id".to_owned(), source.ip_addr.to_string()),
(
"mode".to_owned(),
match source.mode {
SourceMode::Client => String::from("server"),
SourceMode::Peer => String::from("peer"),
SourceMode::Ref => String::from("refclock"),
},
),
(
"state".to_owned(),
match source.state {
reply::SourceState::Selected => String::from("best"),
reply::SourceState::NonSelectable => String::from("unusable"),
reply::SourceState::Falseticker => String::from("falseticker"),
reply::SourceState::Jittery => String::from("jittery"),
reply::SourceState::Unselected => String::from("combined"),
reply::SourceState::Selectable => String::from("unused"),
},
),
]);
metrics.extend([
SourceMetric::new_int("poll", source.poll as i64, tags.clone()),
SourceMetric::new_int("stratum", source.stratum as i64, tags.clone()),
SourceMetric::new_int("flags", source.flags.bits() as i64, tags.clone()),
SourceMetric::new_int(
"reachability",
source.reachability.count_ones() as i64,
tags.clone(),
),
SourceMetric::new_int("since_sample", source.since_sample as i64, tags.clone()),
SourceMetric::new_float(
"orig_latest_meas",
source.orig_latest_meas.into(),
tags.clone(),
),
SourceMetric::new_float("latest_meas", source.latest_meas.into(), tags.clone()),
SourceMetric::new_float(
"latest_meas_err",
source.latest_meas_err.into(),
tags.clone(),
),
]);
}
metrics
}
}
fn report_from_tracking(
t: &reply::Tracking,
config: &Config,
) -> Result<ChronyTrackingReport, Box<dyn std::error::Error>> {
let report = ChronyTrackingReport {
tags: Arc::new(vec![]), //TODO: allow configuring tags in the source
ref_id: t.ref_id as i64,
ref_ip_addr: t.ip_addr.to_string(),
stratum: t.stratum as i64,
leap_status: t.leap_status as i64,
current_correction: t.current_correction.into(),
last_offset: t.last_offset.into(),
rms_offset: t.rms_offset.into(),
freq_ppm: t.freq_ppm.into(),
resid_freq_ppm: t.resid_freq_ppm.into(),
skew_ppm: t.skew_ppm.into(),
root_delay: t.root_delay.into(),
root_dispersion: t.root_dispersion.into(),
last_update_interval: t.last_update_interval.into(),
};
Ok(report)
} }
impl ChronyClient { impl ChronyClient {
@@ -206,13 +264,16 @@ impl ChronyClient {
) -> Result<(), Box<dyn std::error::Error>> { ) -> Result<(), Box<dyn std::error::Error>> {
let tracking = self.get_tracking().await?; let tracking = self.get_tracking().await?;
let tracking_data = datapoint_from_tracking(&tracking, &self.config)?; let tracking_data = report_from_tracking(&tracking, &self.config)?;
let report = SourceReport {
name: "chrony-tracking".to_owned(),
status: SourceStatus::Unknown,
details: Arc::new(tracking_data),
};
info!("Sending tracking data"); info!("Sending tracking data");
chan.send(tracking_data.into()) chan.send(report.into())?;
.expect("Unable to send tracking data to targets");
Ok(()) Ok(())
} }
@@ -221,14 +282,14 @@ impl ChronyClient {
chan: &ChimemonSourceChannel, chan: &ChimemonSourceChannel,
) -> Result<(), Box<dyn std::error::Error>> { ) -> Result<(), Box<dyn std::error::Error>> {
let sources = self.get_sources().await?; let sources = self.get_sources().await?;
let mut dps = Vec::with_capacity(sources.len()); let details = ChronySourcesReport { sources };
for ds in sources { let report = SourceReport {
let source_data = datapoint_from_sourcedata(&ds, &self.config)?; name: "chrony-sources".to_owned(),
dps.push(source_data); status: SourceStatus::Unknown,
} details: Arc::new(details),
};
info!("Sending source data"); info!("Sending source data");
chan.send(dps.into()) chan.send(report.into())?;
.expect("Unable to send source data to targets");
Ok(()) Ok(())
} }
} }

View File

@@ -1,10 +1,10 @@
use crate::{ChimemonMessage, ChimemonTarget, ChimemonTargetChannel, ChronySockConfig};
use async_trait::async_trait; use async_trait::async_trait;
use chimemon::{ChimemonMessage, ChimemonTarget, ChimemonTargetChannel, ChronySockConfig};
use libc::{c_double, c_int, timeval}; use libc::{c_double, c_int, timeval};
use log::debug;
use std::mem; use std::mem;
use std::os::unix::net::UnixDatagram; use std::os::unix::net::UnixDatagram;
use std::path::PathBuf; use std::path::PathBuf;
use tracing::debug;
const CHRONY_MAGIC: c_int = 0x534f434b; const CHRONY_MAGIC: c_int = 0x534f434b;

451
src/gpsd.rs Normal file
View File

@@ -0,0 +1,451 @@
use crate::{
ChimemonMessage, ChimemonSource, ChimemonSourceChannel, Config, SourceMetric, SourceReport,
SourceReportDetails, SourceStatus,
};
use std::collections::HashMap;
use std::f64;
use std::fmt::Debug;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use backoff::ExponentialBackoff;
use futures::{SinkExt, Stream};
use futures::{StreamExt, task::Context};
use gpsd_proto::{
Device, Gst, Mode, Pps, ResponseHandshake, Sky, Tpv, UnifiedResponse, Version, Watch,
};
use serde::{Deserialize, Deserializer, Serialize};
use serde_json;
use tokio::net::{TcpStream, ToSocketAddrs, lookup_host};
use tokio::time::{Interval, interval, timeout};
use tokio_util::codec::{Framed, LinesCodec};
use tracing::{debug, debug_span, info, warn};
pub struct GpsdSource {
pub config: Config,
conn: GpsdTransport,
devices: HashMap<String, Device>,
last_gst: Option<Gst>,
last_pps: Option<Pps>,
last_tpv: Option<Tpv>,
last_sky: Option<Sky>,
}
#[derive(Eq, PartialEq, Clone, Copy, Debug)]
pub enum GpsdFixType {
Unknown,
NoFix,
Fix2D,
Fix3D,
Surveyed,
}
impl From<u32> for GpsdFixType {
fn from(value: u32) -> Self {
match value {
0 => Self::Unknown,
1 => Self::NoFix,
2 => Self::Fix2D,
3 => Self::Fix3D,
_ => panic!("Invalid fix mode {value}"),
}
}
}
impl From<Mode> for GpsdFixType {
fn from(value: Mode) -> Self {
match value {
Mode::NoFix => Self::NoFix,
Mode::Fix2d => Self::Fix2D,
Mode::Fix3d => Self::Fix3D,
}
}
}
#[derive(Clone, Debug)]
pub struct GpsdSourceReport {
fix_type: GpsdFixType,
sats_visible: u8,
sats_tracked: u8,
tdop: f64,
}
impl SourceReportDetails for GpsdSourceReport {
fn is_healthy(&self) -> bool {
self.fix_type != GpsdFixType::Unknown && self.fix_type != GpsdFixType::NoFix
}
fn to_metrics(&self) -> Vec<SourceMetric> {
let no_tags = Arc::new(vec![]);
vec![
SourceMetric::new_int("sats_visible", self.sats_visible as i64, no_tags.clone()),
SourceMetric::new_int("sats_tracked", self.sats_tracked as i64, no_tags.clone()),
SourceMetric::new_float("tdop", self.tdop, no_tags.clone()),
]
}
}
impl GpsdSource {
pub async fn new(config: Config) -> Result<Self, std::io::Error> {
let conn = GpsdTransport::new(&config.sources.gpsd.host).await?;
Ok(Self {
config,
conn,
devices: HashMap::new(),
last_gst: None,
last_pps: None,
last_tpv: None,
last_sky: None,
})
}
}
impl GpsdSource {
async fn send_status(&self, chan: &mut ChimemonSourceChannel) {
let sky = self.last_sky.as_ref();
let tpv = self.last_tpv.as_ref();
let (sats_tracked, sats_visible) = sky.map_or((0, 0), |sky| {
let sats = sky.satellites.as_deref().unwrap_or_default();
(
sats.iter().filter(|s| s.used).count() as u8,
sats.len() as u8,
)
});
let tdop = sky
.and_then(|sky| sky.tdop)
.map_or(f64::INFINITY, |tdop| tdop as f64);
chan.send(ChimemonMessage::SourceReport(SourceReport {
name: "gpsd".into(),
status: SourceStatus::Unknown,
details: Arc::new(GpsdSourceReport {
fix_type: tpv.map_or(GpsdFixType::Unknown, |tpv| tpv.mode.into()),
sats_tracked,
sats_visible,
tdop,
}),
}))
.unwrap();
}
fn handle_msg(&mut self, msg: String) -> Result<(), Box<dyn std::error::Error>> {
let _span = debug_span!("handle_msg").entered();
let parsed = serde_json::from_str::<UnifiedResponse>(&msg)?;
debug!("Received {parsed:?}");
match parsed {
UnifiedResponse::Device(d) => {
if let Some(path) = &d.path {
self.devices.insert(path.to_owned(), d);
} else {
warn!("No path on DEVICE response, ignoring.");
}
}
UnifiedResponse::Gst(g) => self.last_gst = Some(g),
UnifiedResponse::Pps(p) => self.last_pps = Some(p),
UnifiedResponse::Sky(s) => {
self.last_sky = Some({
let mut s = s;
if s.satellites.is_none() {
s.satellites = self.last_sky.as_mut().and_then(|old| old.satellites.take());
}
s
})
}
UnifiedResponse::Tpv(t) => self.last_tpv = Some(t),
_ => warn!("Unhandled response `{parsed:?}`"),
}
Ok(())
}
}
#[async_trait]
impl ChimemonSource for GpsdSource {
async fn run(mut self, mut chan: ChimemonSourceChannel) {
info!("gpsd task started");
self.conn.ensure_connection().await.unwrap();
let mut ticker = interval(Duration::from_secs(self.config.sources.gpsd.interval));
let mut params = WatchParams::default();
params.json = Some(true);
self.conn
.cmd_response(&GpsdCommand::Watch(Some(params)))
.await
.unwrap();
loop {
let framed = self.conn.framed.as_mut().expect("must be connected");
tokio::select! {
_ = ticker.tick() => {
self.send_status(&mut chan).await
},
maybe_msg = framed.next() => {
if let Some(Ok(msg)) = maybe_msg {
self.handle_msg(msg).unwrap()
}
}
}
}
}
}
struct GpsdTransport {
host: SocketAddr,
framed: Option<Framed<TcpStream, LinesCodec>>,
conn_backoff: ExponentialBackoff,
}
#[derive(Clone, Copy, Debug, Serialize)]
#[repr(u8)]
pub enum RawMode {
Off = 0,
RawHex = 1,
RawBin = 2,
}
#[derive(Clone, Copy, Debug, Serialize)]
#[repr(u8)]
pub enum NativeMode {
Nmea = 0,
Alt = 1,
}
#[derive(Clone, Copy, Debug, Serialize)]
#[repr(u8)]
pub enum ParityMode {
None = b'N',
Odd = b'O',
Even = b'E',
}
#[derive(Clone, Debug, Serialize)]
struct WatchParams {
#[serde(skip_serializing_if = "Option::is_none")]
enable: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
json: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
nmea: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
raw: Option<RawMode>,
#[serde(skip_serializing_if = "Option::is_none")]
scaled: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
split24: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pps: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
device: Option<String>,
}
impl Default for WatchParams {
fn default() -> Self {
WatchParams {
enable: Some(true),
json: Some(false),
nmea: None,
raw: None,
scaled: None,
split24: None,
pps: None,
device: None,
}
}
}
#[derive(Clone, Debug, Serialize)]
struct DeviceParams {
#[serde(skip_serializing_if = "Option::is_none")]
bps: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
cycle: Option<f64>,
#[serde(skip_serializing_if = "Option::is_none")]
flags: Option<u8>,
#[serde(skip_serializing_if = "Option::is_none")]
hexdata: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
native: Option<NativeMode>,
#[serde(skip_serializing_if = "Option::is_none")]
parity: Option<ParityMode>,
#[serde(skip_serializing_if = "Option::is_none")]
path: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
readonly: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
sernum: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
stopbits: Option<u8>,
}
#[derive(Clone, Debug)]
enum GpsdCommand {
Version, // no params
Devices, // no params
Watch(Option<WatchParams>),
Poll, // I don't understand the protocol for this one
Device(Option<DeviceParams>),
}
impl GpsdCommand {
fn command_string(&self) -> &str {
match self {
Self::Version => "?VERSION",
Self::Devices => "?DEVICES",
Self::Watch(_) => "?WATCH",
Self::Poll => "?POLL",
Self::Device(_) => "?DEVICE",
}
}
fn expected_responses(&self) -> usize {
match self {
Self::Version => 1,
Self::Devices => 1,
Self::Watch(_) => 2,
Self::Poll => 1,
Self::Device(_) => 1,
}
}
}
impl ToString for GpsdCommand {
fn to_string(&self) -> String {
let s = self.command_string().to_owned();
match self {
Self::Version | Self::Devices | Self::Poll | Self::Watch(None) | Self::Device(None) => {
s + ";"
}
Self::Watch(Some(w)) => s + "=" + &serde_json::to_string(w).unwrap() + ";",
Self::Device(Some(d)) => s + "=" + &serde_json::to_string(d).unwrap() + ";",
}
}
}
impl GpsdTransport {
async fn new<T: ToSocketAddrs + Debug>(host: &T) -> Result<Self, std::io::Error> {
// TODO: implement proper handling of multiple responses
let host_addr = lookup_host(host).await?.next().ok_or(std::io::Error::new(
std::io::ErrorKind::NotFound,
format!("No response looking up `{:?}`", host),
))?;
Ok(Self {
host: host_addr,
framed: None,
conn_backoff: ExponentialBackoff::default(),
})
}
async fn connect(&mut self) -> Result<(), Box<dyn std::error::Error>> {
info!("Connecting to gpsd @ {}", self.host);
let mut framed = backoff::future::retry_notify(
self.conn_backoff.clone(),
async || {
Ok(Framed::new(
TcpStream::connect(self.host).await?,
LinesCodec::new(),
))
},
|e, d| warn!("Failed to connect to {} after {:?}: `{}`", self.host, d, e),
)
.await?;
debug!("Waiting for initial VERSION");
if let Ok(Some(Ok(r))) = timeout(Duration::from_secs(5), framed.next()).await {
if let Ok(version) = serde_json::from_str::<Version>(&r) {
info!(
"Connected to gpsd @ {}, release {}",
self.host, version.release
)
} else {
warn!("Got unexpected non-VERSION response after connection (`{r}`)")
}
self.framed = Some(framed);
Ok(())
} else {
Err("Unexpected failure to receive initial handshake response".into())
}
}
async fn ensure_connection(&mut self) -> Result<(), Box<dyn std::error::Error>> {
if let Some(conn) = &self.framed {
Ok(())
} else {
self.connect().await
}
}
async fn cmd_response(
&mut self,
cmd: &GpsdCommand,
) -> Result<Vec<UnifiedResponse>, Box<dyn std::error::Error>> {
debug!("Command: `{cmd:?}`");
self.ensure_connection().await?;
let mut responses = Vec::new();
if let Some(conn) = &mut self.framed {
debug!("Raw command: `{}`", cmd.to_string());
conn.send(cmd.to_string()).await?;
for _ in 0..cmd.expected_responses() {
match conn.next().await {
None => return Err("Connection lost".into()),
Some(Err(e)) => return Err(format!("Unable to parse response {e}").into()),
Some(Ok(r)) => {
debug!("Raw response: `{r}`");
responses.push(serde_json::from_str::<UnifiedResponse>(&r)?)
}
}
}
} else {
return Err("Missing connection despite ensure".into());
}
Ok(responses)
}
async fn stream(
&mut self,
) -> Result<
impl Stream<Item = Result<UnifiedResponse, Box<dyn std::error::Error>>>,
Box<dyn std::error::Error>,
> {
self.ensure_connection().await?;
if let Some(conn) = &mut self.framed {
Ok(conn.map(|line| Ok(serde_json::from_str::<UnifiedResponse>(&line?)?)))
} else {
Err("No connection after connecting.".into())
}
}
}
mod tests {
use gpsd_proto::{ResponseData, UnifiedResponse};
use tokio_stream::StreamExt;
use crate::gpsd::{GpsdCommand, GpsdTransport, WatchParams};
use std::sync::Once;
static INIT: Once = Once::new();
fn init_logger() {
INIT.call_once(|| tracing_subscriber::fmt::init());
}
#[tokio::test]
async fn test_gpsd() {
init_logger();
let mut gpsd = GpsdTransport::new(&"192.168.65.93:2947").await.unwrap();
gpsd.connect().await.unwrap();
let mut params = WatchParams::default();
params.enable = Some(true);
params.json = Some(true);
let res = {
gpsd.cmd_response(&GpsdCommand::Watch(Some(params)))
.await
.unwrap()
};
println!("{res:?}");
while let Some(Ok(s)) = gpsd.framed.as_mut().unwrap().next().await {
let res2 = serde_json::from_str::<UnifiedResponse>(&s);
println!("{res2:?}");
}
}
}

View File

@@ -1,23 +1,100 @@
use crate::{
ChimemonSource, ChimemonSourceChannel, Config, SourceMetric, SourceReport, SourceReportDetails,
SourceStatus,
};
use async_trait::async_trait; use async_trait::async_trait;
use chimemon::{ChimemonSource, ChimemonSourceChannel, Config}; use futures::{StreamExt, stream};
use futures::{stream, StreamExt};
use influxdb2::models::DataPoint; use influxdb2::models::DataPoint;
use log::{debug, info};
use std::{ use std::{
fs::File,
io::Read,
path::PathBuf, path::PathBuf,
sync::Arc,
time::{Duration, SystemTime, UNIX_EPOCH}, time::{Duration, SystemTime, UNIX_EPOCH},
}; };
use tracing::{Instrument, debug, error, info, info_span, warn};
pub struct HwmonSource { pub struct HwmonSource {
config: Config, config: Config,
sensors: Vec<HwmonSensor>, sensors: Vec<Arc<HwmonSensor>>,
} }
#[derive(Debug, Clone)]
struct HwmonSensor { struct HwmonSensor {
path: PathBuf, name: String,
value_path: PathBuf,
device: String, device: String,
sensor: String, sensor: String,
name: String, label: Option<String>,
tags: Arc<Vec<(String, String)>>,
}
impl HwmonSensor {
fn new(name: &str, device: &str, sensor: &str) -> Self {
let value_path = PathBuf::from(HWMON_ROOT)
.join(device)
.join(sensor.to_owned() + "_input");
let label_path_raw = PathBuf::from(HWMON_ROOT)
.join(device)
.join(sensor.to_owned() + "_label");
let label = if label_path_raw.is_file() {
let mut f =
File::open(&label_path_raw).expect(&format!("Unable to open `{label_path_raw:?}`"));
let mut label = String::new();
f.read_to_string(&mut label)
.expect(&format!("Unable to read from `{label_path_raw:?}"));
Some(label.trim().to_owned())
} else {
None
};
let mut tags_vec = vec![
("name".to_owned(), name.to_owned()),
("device".to_owned(), device.to_owned()),
("sensor".to_owned(), sensor.to_owned()),
];
if let Some(label) = &label {
tags_vec.push(("label".to_owned(), label.clone()))
}
Self {
value_path,
label,
device: device.to_owned(),
sensor: sensor.to_owned(),
name: name.to_owned(),
tags: Arc::new(tags_vec),
}
}
}
#[derive(Debug)]
struct HwmonReport {
values: Vec<(Arc<HwmonSensor>, f64)>,
}
impl SourceReportDetails for HwmonReport {
fn is_healthy(&self) -> bool {
//self.alarms.iter().any(|(_sensor, alarm)| *alarm)
true
}
fn to_metrics(&self) -> Vec<SourceMetric> {
let mut metrics = Vec::new();
for (sensor, value) in &self.values {
metrics.push(SourceMetric::new_float(
"hwmon_value",
*value,
sensor.tags.clone(),
))
}
// for (sensor, alarm) in &self.alarms {
// metrics.push(SourceMetric::new_bool(
// "hwmon_alarm",
// *alarm,
// sensor.tags.clone(),
// ))
// }
metrics
}
} }
const HWMON_ROOT: &str = "/sys/class/hwmon"; const HWMON_ROOT: &str = "/sys/class/hwmon";
@@ -29,19 +106,14 @@ impl HwmonSource {
.hwmon .hwmon
.sensors .sensors
.iter() .iter()
.map(|(k, v)| HwmonSensor { .map(|(k, v)| Arc::new(HwmonSensor::new(k, &v.name, &v.sensor)))
name: k.into(),
device: v.name.clone(),
sensor: v.sensor.clone(),
path: PathBuf::from(HWMON_ROOT).join(&v.name).join(&v.sensor),
})
.collect(); .collect();
HwmonSource { config, sensors } HwmonSource { config, sensors }
} }
async fn get_raw_value(sensor: &HwmonSensor) -> Result<String, std::io::Error> { async fn get_raw_value(sensor: &HwmonSensor) -> Result<String, std::io::Error> {
tokio::fs::read_to_string(&sensor.path).await tokio::fs::read_to_string(&sensor.value_path).await
} }
} }
@@ -53,31 +125,38 @@ impl ChimemonSource for HwmonSource {
tokio::time::interval(Duration::from_secs(self.config.sources.hwmon.interval)); tokio::time::interval(Duration::from_secs(self.config.sources.hwmon.interval));
loop { loop {
interval.tick().await; interval.tick().await;
let s = stream::iter(&self.sensors).then(|s| async { let mut values = Vec::new();
let sensor_val = HwmonSource::get_raw_value(s) for s in &self.sensors {
.await if let Ok(sensor_val) = HwmonSource::get_raw_value(s).await {
.expect("Unable to read sensor"); debug!(
debug!( "hwmon {} raw value {}",
"hwmon {} raw value {}", s.value_path.to_string_lossy(),
s.path.to_string_lossy(), sensor_val
sensor_val );
); if let Ok(parsed) = sensor_val.trim().parse::<f64>() {
let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap(); values.push((s.clone(), parsed));
let mut builder = DataPoint::builder(&self.config.sources.hwmon.measurement) } else {
.timestamp(now.as_nanos().try_into().unwrap()); error!(
for (key, value) in &self.config.influxdb.tags { "Unable to parse sensor value {sensor_val} at {}",
builder = builder.tag(key, value) s.value_path.to_string_lossy()
);
}
} else {
error!("Unable to get hwmon sensor value");
} }
builder = builder }
.tag("name", &s.name) let report = SourceReport {
.tag("sensor", &s.sensor) name: "hwmon".to_owned(),
.tag("device", &s.device) status: SourceStatus::Healthy,
.field("value", sensor_val.trim().parse::<i64>().unwrap()); details: Arc::new(HwmonReport { values }),
builder.build().unwrap() };
});
info!("Writing hwmon data"); info!("Writing hwmon data");
chan.send(s.collect::<Vec<DataPoint>>().await.into()) match chan.send(report.into()) {
.unwrap(); Ok(_) => {}
Err(e) => {
warn!("Unable to send to message channel ({e})")
}
}
} }
} }
} }

View File

@@ -1,19 +1,27 @@
pub mod chrony;
pub mod chrony_refclock;
pub mod gpsd;
pub mod hwmon;
pub mod uccm;
use async_trait::async_trait; use async_trait::async_trait;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use figment::{ use figment::{
Figment,
providers::{Format, Serialized, Toml}, providers::{Format, Serialized, Toml},
util::map, util::map,
value::Map, value::Map,
Figment,
}; };
use gethostname::gethostname; use gethostname::gethostname;
use influxdb2::models::DataPoint; use influxdb2::models::DataPoint;
use serde_derive::{Deserialize, Serialize}; use serde_derive::{Deserialize, Serialize};
use std::path::Path;
use tokio::sync::broadcast::*; use tokio::sync::broadcast::*;
use std::{fmt::Debug, net::SocketAddr, path::Path, sync::Arc};
#[derive(Serialize, Deserialize, Clone)] #[derive(Serialize, Deserialize, Clone)]
pub struct InfluxConfig { pub struct InfluxConfig {
pub enabled: bool,
pub url: String, pub url: String,
pub org: String, pub org: String,
pub bucket: String, pub bucket: String,
@@ -25,6 +33,7 @@ impl Default for InfluxConfig {
fn default() -> Self { fn default() -> Self {
let host = gethostname().into_string().unwrap(); let host = gethostname().into_string().unwrap();
InfluxConfig { InfluxConfig {
enabled: false,
url: "http://localhost:8086".into(), url: "http://localhost:8086".into(),
org: "default".into(), org: "default".into(),
bucket: "default".into(), bucket: "default".into(),
@@ -100,6 +109,24 @@ impl Default for HwmonConfig {
} }
} }
} }
#[derive(Serialize, Deserialize, Clone)]
pub struct GpsdConfig {
pub enabled: bool,
pub interval: u64,
pub host: String,
}
impl Default for GpsdConfig {
fn default() -> Self {
GpsdConfig {
enabled: false,
interval: 60,
host: "localhost:2947".into(),
}
}
}
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct TimeReport { pub struct TimeReport {
pub system_time: DateTime<Utc>, pub system_time: DateTime<Utc>,
@@ -109,6 +136,67 @@ pub struct TimeReport {
pub valid: bool, pub valid: bool,
} }
#[derive(Clone, Debug)]
pub enum SourceStatus {
Healthy,
LossOfSignal(Option<String>),
LossOfSync(Option<String>),
Other(Option<String>),
Unknown,
}
#[derive(Clone, Debug)]
pub enum MetricValue {
Int(i64),
Float(f64),
Bool(bool),
}
#[derive(Clone, Debug)]
pub struct SourceMetric {
name: String,
value: MetricValue,
tags: Arc<Vec<(String, String)>>,
}
impl SourceMetric {
pub fn new_int(name: &str, value: i64, tags: Arc<Vec<(String, String)>>) -> Self {
Self {
name: name.to_owned(),
value: MetricValue::Int(value),
tags,
}
}
pub fn new_float(name: &str, value: f64, tags: Arc<Vec<(String, String)>>) -> Self {
Self {
name: name.to_owned(),
value: MetricValue::Float(value),
tags,
}
}
pub fn new_bool(name: &str, value: bool, tags: Arc<Vec<(String, String)>>) -> Self {
Self {
name: name.to_owned(),
value: MetricValue::Bool(value),
tags,
}
}
}
pub trait SourceReportDetails: Debug + Send + Sync {
fn to_metrics(&self) -> Vec<SourceMetric>;
fn is_healthy(&self) -> bool;
}
#[derive(Clone, Debug)]
pub struct SourceReport {
pub name: String,
pub status: SourceStatus,
pub details: Arc<dyn SourceReportDetails>,
}
#[derive(Serialize, Deserialize, Clone)] #[derive(Serialize, Deserialize, Clone)]
pub struct UCCMConfig { pub struct UCCMConfig {
pub enabled: bool, pub enabled: bool,
@@ -137,6 +225,7 @@ pub struct SourcesConfig {
pub chrony: ChronyConfig, pub chrony: ChronyConfig,
pub hwmon: HwmonConfig, pub hwmon: HwmonConfig,
pub uccm: UCCMConfig, pub uccm: UCCMConfig,
pub gpsd: GpsdConfig,
} }
#[derive(Serialize, Deserialize, Clone, Default)] #[derive(Serialize, Deserialize, Clone, Default)]
@@ -155,11 +244,12 @@ pub fn load_config(filename: &Path) -> Figment {
Figment::from(Serialized::defaults(Config::default())).merge(Toml::file(filename)) Figment::from(Serialized::defaults(Config::default())).merge(Toml::file(filename))
} }
#[derive(Clone, Debug)] #[derive(Debug, Clone)]
pub enum ChimemonMessage { pub enum ChimemonMessage {
DataPoint(DataPoint), DataPoint(DataPoint),
DataPoints(Vec<DataPoint>), DataPoints(Vec<DataPoint>),
TimeReport(TimeReport), TimeReport(TimeReport),
SourceReport(SourceReport),
} }
impl From<DataPoint> for ChimemonMessage { impl From<DataPoint> for ChimemonMessage {
@@ -179,6 +269,12 @@ impl From<TimeReport> for ChimemonMessage {
} }
} }
impl From<SourceReport> for ChimemonMessage {
fn from(sr: SourceReport) -> Self {
ChimemonMessage::SourceReport(sr)
}
}
pub type ChimemonSourceChannel = Sender<ChimemonMessage>; pub type ChimemonSourceChannel = Sender<ChimemonMessage>;
pub type ChimemonTargetChannel = Receiver<ChimemonMessage>; pub type ChimemonTargetChannel = Receiver<ChimemonMessage>;

View File

@@ -1,17 +1,12 @@
mod chrony;
mod chrony_refclock;
mod hwmon;
mod uccm;
use clap::{Parser, ValueEnum}; use clap::{Parser, ValueEnum};
use env_logger::{self, Env};
use futures::future::join_all; use futures::future::join_all;
use log::{debug, info};
use std::path::Path; use std::path::Path;
use tokio::sync::broadcast; use tokio::sync::broadcast;
use tracing::{Instrument, debug, error, info, info_span, warn};
use tracing_subscriber;
use crate::{chrony::*, chrony_refclock::ChronySockServer, hwmon::HwmonSource, uccm::UCCMMonitor}; use crate::{chrony::*, chrony_refclock::ChronySockServer, hwmon::HwmonSource, uccm::UCCMMonitor};
use chimemon::*; use chimemon::{gpsd::GpsdSource, *};
const PROGRAM_NAME: &str = "chimemon"; const PROGRAM_NAME: &str = "chimemon";
const VERSION: &str = "0.0.1"; const VERSION: &str = "0.0.1";
@@ -35,33 +30,51 @@ struct Args {
#[tokio::main(flavor = "current_thread")] #[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error>> { async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args = Args::parse(); tracing_subscriber::fmt::init();
let loglevel = args
.log_level
.to_possible_value()
.unwrap()
.get_name()
.to_owned();
env_logger::Builder::from_env(Env::default().default_filter_or(loglevel)).init();
info!("{} v{} starting...", PROGRAM_NAME, VERSION); let args = Args::parse();
info!("{PROGRAM_NAME} v{VERSION} starting...");
let fig = load_config(Path::new(&args.config_file)); let fig = load_config(Path::new(&args.config_file));
debug!("{:?}", fig); debug!("{fig:?}");
let config: Config = fig.extract()?; let config: Config = fig.extract()?;
let mut tasks = Vec::new(); let mut tasks = Vec::new();
let (tx, _) = broadcast::channel(16); let (tx, _) = broadcast::channel(16);
let sourcechan: ChimemonSourceChannel = tx; let sourcechan: ChimemonSourceChannel = tx;
info!( if config.influxdb.enabled {
"Connecting to influxdb {} org: {} using token", info!(
&config.influxdb.url, &config.influxdb.org "Connecting to influxdb {} org: {} using token",
); &config.influxdb.url, &config.influxdb.org
let influx = influxdb2::Client::new( );
&config.influxdb.url, let config = config.clone();
&config.influxdb.org, let influx = influxdb2::Client::new(
&config.influxdb.token, &config.influxdb.url,
); &config.influxdb.org,
&config.influxdb.token,
);
let mut influxrx = sourcechan.subscribe();
tasks.push(tokio::spawn(
async move {
let stream = async_stream::stream! {
while let Ok(msg) = influxrx.recv().await {
match msg { ChimemonMessage::DataPoint(dp) => {
yield dp
}, ChimemonMessage::DataPoints(dps) => {
for p in dps {
yield p
}
}, _ => {}
} }
};
influx.write(&config.influxdb.bucket, stream).await.unwrap();
}
.instrument(info_span!("influx-task")),
));
}
let chrony = if config.sources.chrony.enabled { let chrony = if config.sources.chrony.enabled {
Some(ChronyClient::new(config.to_owned())) Some(ChronyClient::new(config.to_owned()))
@@ -69,7 +82,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
None None
}; };
if let Some(c) = chrony { if let Some(c) = chrony {
tasks.push(tokio::spawn(c.run(sourcechan.clone()))); tasks.push(tokio::spawn(
c.run(sourcechan.clone())
.instrument(info_span!("chrony-task")),
));
}; };
let hwmon = if config.sources.hwmon.enabled { let hwmon = if config.sources.hwmon.enabled {
@@ -78,18 +94,23 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
None None
}; };
if let Some(hwmon) = hwmon { if let Some(hwmon) = hwmon {
tasks.push(tokio::spawn(hwmon.run(sourcechan.clone()))); tasks.push(tokio::spawn(
hwmon
.run(sourcechan.clone())
.instrument(info_span!("hwmon-task")),
));
}; };
let uccm = if config.sources.uccm.enabled { let uccm = if config.sources.uccm.enabled {
info!("Spawning UCCMMonitor");
Some(UCCMMonitor::new(config.to_owned())) Some(UCCMMonitor::new(config.to_owned()))
} else { } else {
info!("UCCMMonitor not configured");
None None
}; };
if let Some(uccm) = uccm { if let Some(uccm) = uccm {
tasks.push(tokio::spawn(uccm.run(sourcechan.clone()))); tasks.push(tokio::spawn(
uccm.run(sourcechan.clone())
.instrument(info_span!("uccm-task")),
));
}; };
let chrony_refclock = if config.targets.chrony.enabled { let chrony_refclock = if config.targets.chrony.enabled {
@@ -98,33 +119,47 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
None None
}; };
if let Some(chrony_refclock) = chrony_refclock { if let Some(chrony_refclock) = chrony_refclock {
tasks.push(tokio::spawn(chrony_refclock.run(sourcechan.subscribe()))); tasks.push(tokio::spawn(
chrony_refclock
.run(sourcechan.subscribe())
.instrument(info_span!("chrony-refclock-task")),
));
}; };
let mut influxrx = sourcechan.subscribe(); let gpsd = if config.sources.gpsd.enabled {
Some(GpsdSource::new(config.to_owned()).await.unwrap())
} else {
None
};
if let Some(gpsd) = gpsd {
tasks.push(tokio::spawn(
gpsd.run(sourcechan.clone())
.instrument(info_span!("gpsd-task")),
))
}
tasks.push(tokio::spawn(async move { if tasks.len() == 0 {
let stream = async_stream::stream! { error!("No tasks configured, exiting.");
while let Ok(msg) = influxrx.recv().await { return Ok(()); // not an error, but exit before starting a dummy task
match msg { ChimemonMessage::DataPoint(dp) => { }
yield dp
}, ChimemonMessage::DataPoints(dps) => { if sourcechan.receiver_count() == 0 {
for p in dps { warn!("No consumers configured, events will be discarded");
yield p let mut chan = sourcechan.subscribe();
// spawn a dummy task to reap the channel and keep the process alive
tasks.push(tokio::spawn(
async move {
loop {
while let Ok(m) = chan.recv().await {
info!("received {m:?}");
} }
}, ChimemonMessage::TimeReport(_tr) => {} }
} } }
}; .instrument(info_span!("dummy-receiver-task")),
influx.write(&config.influxdb.bucket, stream).await.unwrap(); ))
})); }
// let mut debugrx = sourcechan.subscribe(); debug!("Task setup complete, tasks: {}", tasks.len());
// tasks.push(tokio::spawn(async move {
// loop {
// let v = debugrx.recv().await;
// warn!("streamed: {:?}", v.unwrap());
// }
// }));
join_all(tasks).await; join_all(tasks).await;

View File

@@ -1,14 +1,13 @@
use crate::{ChimemonMessage, ChimemonSource, ChimemonSourceChannel, Config, TimeReport};
use async_trait::async_trait; use async_trait::async_trait;
use bitflags::bitflags; use bitflags::bitflags;
use byteorder::{BigEndian, ReadBytesExt}; use byteorder::{BigEndian, ReadBytesExt};
use bytes::{Buf, BytesMut}; use bytes::{Buf, BytesMut};
use chimemon::{ChimemonMessage, ChimemonSource, ChimemonSourceChannel, Config, TimeReport};
use chrono::{DateTime, Duration, NaiveDateTime, Utc}; use chrono::{DateTime, Duration, NaiveDateTime, Utc};
use figment::value::Map; use figment::value::Map;
use influxdb2::models::data_point::DataPointBuilder;
use influxdb2::models::DataPoint; use influxdb2::models::DataPoint;
use influxdb2::models::data_point::DataPointBuilder;
use itertools::Itertools; use itertools::Itertools;
use log::{debug, info, warn};
use std::io::{BufRead, Cursor}; use std::io::{BufRead, Cursor};
use std::str; use std::str;
use std::sync::Arc; use std::sync::Arc;
@@ -17,6 +16,7 @@ use tokio::join;
use tokio::sync::Mutex; use tokio::sync::Mutex;
use tokio::time::sleep; use tokio::time::sleep;
use tokio_serial::{SerialPort, SerialStream}; use tokio_serial::{SerialPort, SerialStream};
use tracing::{debug, info, warn};
pub const GPS_EPOCH: i64 = 315964800; // Doesn't seem possible to have a const DateTime object pub const GPS_EPOCH: i64 = 315964800; // Doesn't seem possible to have a const DateTime object
pub type UccmEndian = BigEndian; pub type UccmEndian = BigEndian;
@@ -257,7 +257,8 @@ impl TryFrom<&str> for UCCMLoopDiagReport {
"No lines!", "No lines!",
))??; ))??;
let ocxo_val = ocxo_line let ocxo_val = ocxo_line
.split(':').nth(1) .split(':')
.nth(1)
.ok_or(std::io::Error::new( .ok_or(std::io::Error::new(
std::io::ErrorKind::InvalidData, std::io::ErrorKind::InvalidData,
"no colon!", "no colon!",
@@ -280,7 +281,10 @@ impl TryFrom<&str> for UCCMGPSSatsReport {
"Invalid response (expected `NSATS CNOS`)", "Invalid response (expected `NSATS CNOS`)",
))?; ))?;
let nsats = nsats.parse::<u8>().map_err(|e| { let nsats = nsats.parse::<u8>().map_err(|e| {
std::io::Error::new(std::io::ErrorKind::InvalidData, format!("Invalid number of sats ({e})")) std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("Invalid number of sats ({e})"),
)
})?; })?;
let tracked_svs = cnos let tracked_svs = cnos
.split(',') .split(',')
@@ -428,13 +432,14 @@ impl UCCMMonitor {
>= Duration::from_std(self.config.sources.uccm.status_interval) >= Duration::from_std(self.config.sources.uccm.status_interval)
.unwrap() .unwrap()
{ {
let mut points = vec![tod let mut points = vec![
.as_builder( tod.as_builder(
&self.config.sources.uccm.measurement, &self.config.sources.uccm.measurement,
&self.config.influxdb.tags, &self.config.influxdb.tags,
) )
.build() .build()
.unwrap()]; .unwrap(),
];
if let Some(loop_diag) = &last_loop_diag { if let Some(loop_diag) = &last_loop_diag {
points.push( points.push(
loop_diag loop_diag