Compare commits
4 Commits
main
...
7f24bf5a91
| Author | SHA1 | Date | |
|---|---|---|---|
|
7f24bf5a91
|
|||
|
adbe09b9d2
|
|||
| ea412f4a66 | |||
| 4dabcbe985 |
190
Cargo.lock
generated
190
Cargo.lock
generated
@@ -148,23 +148,26 @@ dependencies = [
|
||||
"bytemuck",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "atty"
|
||||
version = "0.2.14"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8"
|
||||
dependencies = [
|
||||
"hermit-abi 0.1.19",
|
||||
"libc",
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "autocfg"
|
||||
version = "1.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26"
|
||||
|
||||
[[package]]
|
||||
name = "backoff"
|
||||
version = "0.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b62ddb9cb1ec0a098ad4bbf9344d0713fa193ae1a80af55febcff2627b6a00c1"
|
||||
dependencies = [
|
||||
"futures-core",
|
||||
"getrandom 0.2.16",
|
||||
"instant",
|
||||
"pin-project-lite",
|
||||
"rand",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "backtrace"
|
||||
version = "0.3.74"
|
||||
@@ -192,6 +195,15 @@ version = "0.21.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567"
|
||||
|
||||
[[package]]
|
||||
name = "bit-struct"
|
||||
version = "0.3.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "270fbbb014407467f7a2c9b1fa0b74057d5cbc452f18bac3bb5aad601e590521"
|
||||
dependencies = [
|
||||
"num-traits",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "bitflags"
|
||||
version = "1.3.2"
|
||||
@@ -255,25 +267,31 @@ version = "0.2.0"
|
||||
dependencies = [
|
||||
"async-stream",
|
||||
"async-trait",
|
||||
"backoff",
|
||||
"bit-struct",
|
||||
"bitflags 1.3.2",
|
||||
"byteorder",
|
||||
"bytes",
|
||||
"chrono",
|
||||
"chrony-candm",
|
||||
"clap",
|
||||
"env_logger",
|
||||
"figment",
|
||||
"futures",
|
||||
"gethostname",
|
||||
"gpsd_proto",
|
||||
"influxdb2",
|
||||
"itertools 0.14.0",
|
||||
"libc",
|
||||
"log",
|
||||
"serde",
|
||||
"serde_derive",
|
||||
"serde_json",
|
||||
"serde_repr",
|
||||
"tokio",
|
||||
"tokio-serial",
|
||||
"tokio-stream",
|
||||
"tokio-util",
|
||||
"tracing",
|
||||
"tracing-subscriber 0.3.22",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -461,19 +479,6 @@ dependencies = [
|
||||
"cfg-if",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "env_logger"
|
||||
version = "0.9.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a12e6657c4c97ebab115a42dcee77225f7f482cdd841cf7088c657a42e9e00e7"
|
||||
dependencies = [
|
||||
"atty",
|
||||
"humantime",
|
||||
"log",
|
||||
"regex",
|
||||
"termcolor",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "equivalent"
|
||||
version = "1.0.2"
|
||||
@@ -664,6 +669,18 @@ version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "558b88954871f5e5b2af0e62e2e176c8bde7a6c2c4ed41b13d138d96da2e2cbd"
|
||||
|
||||
[[package]]
|
||||
name = "gpsd_proto"
|
||||
version = "1.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "247df1fffe6bc378377d6cf894dac08aef2194a6c1c9e2f173c3c10979fa5ca5"
|
||||
dependencies = [
|
||||
"log",
|
||||
"serde",
|
||||
"serde_derive",
|
||||
"serde_json",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "h2"
|
||||
version = "0.3.26"
|
||||
@@ -695,15 +712,6 @@ version = "0.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
|
||||
|
||||
[[package]]
|
||||
name = "hermit-abi"
|
||||
version = "0.1.19"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33"
|
||||
dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hermit-abi"
|
||||
version = "0.3.9"
|
||||
@@ -750,12 +758,6 @@ version = "1.0.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9"
|
||||
|
||||
[[package]]
|
||||
name = "humantime"
|
||||
version = "2.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9b112acc8b3adf4b107a8ec20977da0273a8c386765a3ec0229bd500a1443f9f"
|
||||
|
||||
[[package]]
|
||||
name = "hyper"
|
||||
version = "0.14.32"
|
||||
@@ -995,7 +997,7 @@ dependencies = [
|
||||
"snafu",
|
||||
"tempfile",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
"tracing-subscriber 0.2.25",
|
||||
"url",
|
||||
]
|
||||
|
||||
@@ -1231,6 +1233,15 @@ dependencies = [
|
||||
"minimal-lexical",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nu-ansi-term"
|
||||
version = "0.50.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5"
|
||||
dependencies = [
|
||||
"windows-sys 0.59.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num-traits"
|
||||
version = "0.2.19"
|
||||
@@ -1246,7 +1257,7 @@ version = "1.16.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43"
|
||||
dependencies = [
|
||||
"hermit-abi 0.3.9",
|
||||
"hermit-abi",
|
||||
"libc",
|
||||
]
|
||||
|
||||
@@ -1641,18 +1652,28 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "serde"
|
||||
version = "1.0.219"
|
||||
version = "1.0.228"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5f0e2c6ed6606019b4e29e69dbaba95b11854410e5347d525002456dbbb786b6"
|
||||
checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e"
|
||||
dependencies = [
|
||||
"serde_core",
|
||||
"serde_derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serde_core"
|
||||
version = "1.0.228"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad"
|
||||
dependencies = [
|
||||
"serde_derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serde_derive"
|
||||
version = "1.0.219"
|
||||
version = "1.0.228"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00"
|
||||
checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
@@ -1661,14 +1682,15 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "serde_json"
|
||||
version = "1.0.140"
|
||||
version = "1.0.146"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "20068b6e96dc6c9bd23e01df8827e6c7e1f2fddd43c21810382803c136b99373"
|
||||
checksum = "217ca874ae0207aac254aa02c957ded05585a90892cc8d87f9e5fa49669dadd8"
|
||||
dependencies = [
|
||||
"itoa",
|
||||
"memchr",
|
||||
"ryu",
|
||||
"serde",
|
||||
"serde_core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -1682,6 +1704,17 @@ dependencies = [
|
||||
"thiserror 1.0.69",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serde_repr"
|
||||
version = "0.1.20"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "175ee3e80ae9982737ca543e96133087cbd9a485eecc3bc4de9c1a37b47ea59c"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.101",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serde_spanned"
|
||||
version = "0.6.8"
|
||||
@@ -1873,15 +1906,6 @@ dependencies = [
|
||||
"windows-sys 0.59.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "termcolor"
|
||||
version = "1.4.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "06794f8f6c5c898b3275aebefa6b8a1cb24cd2c6c79397ab15774837a0bc5755"
|
||||
dependencies = [
|
||||
"winapi-util",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "thiserror"
|
||||
version = "1.0.69"
|
||||
@@ -2007,9 +2031,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tokio-util"
|
||||
version = "0.7.15"
|
||||
version = "0.7.17"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "66a539a9ad6d5d281510d5bd368c973d636c02dbf8a67300bfb6b950696ad7df"
|
||||
checksum = "2efa149fe76073d6e8fd97ef4f4eca7b67f599660115591483572e406e165594"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"futures-core",
|
||||
@@ -2078,9 +2102,9 @@ checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3"
|
||||
|
||||
[[package]]
|
||||
name = "tracing"
|
||||
version = "0.1.41"
|
||||
version = "0.1.44"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0"
|
||||
checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100"
|
||||
dependencies = [
|
||||
"pin-project-lite",
|
||||
"tracing-attributes",
|
||||
@@ -2089,9 +2113,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tracing-attributes"
|
||||
version = "0.1.28"
|
||||
version = "0.1.31"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "395ae124c09f9e6918a2310af6038fba074bcf474ac352496d5910dd59a2226d"
|
||||
checksum = "7490cfa5ec963746568740651ac6781f701c9c5ea257c58e057f3ba8cf69e8da"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
@@ -2100,14 +2124,25 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tracing-core"
|
||||
version = "0.1.33"
|
||||
version = "0.1.36"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e672c95779cf947c5311f83787af4fa8fffd12fb27e4993211a84bdfd9610f9c"
|
||||
checksum = "db97caf9d906fbde555dd62fa95ddba9eecfd14cb388e4f491a66d74cd5fb79a"
|
||||
dependencies = [
|
||||
"once_cell",
|
||||
"valuable",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing-log"
|
||||
version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3"
|
||||
dependencies = [
|
||||
"log",
|
||||
"once_cell",
|
||||
"tracing-core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing-serde"
|
||||
version = "0.1.3"
|
||||
@@ -2140,6 +2175,20 @@ dependencies = [
|
||||
"tracing-serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing-subscriber"
|
||||
version = "0.3.22"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2f30143827ddab0d256fd843b7a66d164e9f271cfa0dde49142c5ca0ca291f1e"
|
||||
dependencies = [
|
||||
"nu-ansi-term",
|
||||
"sharded-slab",
|
||||
"smallvec",
|
||||
"thread_local",
|
||||
"tracing-core",
|
||||
"tracing-log",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "try-lock"
|
||||
version = "0.2.5"
|
||||
@@ -2357,15 +2406,6 @@ version = "0.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
|
||||
|
||||
[[package]]
|
||||
name = "winapi-util"
|
||||
version = "0.1.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb"
|
||||
dependencies = [
|
||||
"windows-sys 0.59.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "winapi-x86_64-pc-windows-gnu"
|
||||
version = "0.4.0"
|
||||
|
||||
16
Cargo.toml
16
Cargo.toml
@@ -1,7 +1,11 @@
|
||||
[package]
|
||||
name = "chimemon"
|
||||
version = "0.2.0"
|
||||
edition = "2021"
|
||||
edition = "2024"
|
||||
|
||||
[features]
|
||||
default = []
|
||||
release-logs = ["tracing/max_level_info"]
|
||||
|
||||
[dependencies]
|
||||
serde = "1.0"
|
||||
@@ -11,10 +15,8 @@ influxdb2 = { version = "0.3.3", features = [
|
||||
], default-features = false }
|
||||
tokio = { version = "1", features = ["rt", "io-util"] }
|
||||
clap = { version = "4.0", features = ["derive"] }
|
||||
log = "0.4"
|
||||
figment = { version = "0.10", features = ["toml"] }
|
||||
gethostname = "0.3"
|
||||
env_logger = "0.9.1"
|
||||
futures = "0.3.24"
|
||||
async-trait = "0.1.58"
|
||||
tokio-stream = { version = "0.1.11", features = ["sync"] }
|
||||
@@ -26,6 +28,14 @@ chrono = "0.4.23"
|
||||
libc = "0.2.137"
|
||||
async-stream = "0.3.6"
|
||||
itertools = "0.14.0"
|
||||
gpsd_proto = { version = "1.0.0" }
|
||||
tokio-util = { version = "0.7.17", features = ["codec"] }
|
||||
serde_json = "1.0.146"
|
||||
backoff = { version = "0.4.0", features = ["tokio"] }
|
||||
serde_repr = "0.1.20"
|
||||
tracing = "0.1.44"
|
||||
tracing-subscriber = { version = "0.3.22", features = ["fmt"] }
|
||||
bit-struct = { version = "0.3.2", default-features = false }
|
||||
|
||||
[dependencies.chrony-candm]
|
||||
git = "https://github.com/aws/chrony-candm"
|
||||
|
||||
@@ -24,6 +24,13 @@
|
||||
status_interval = 10
|
||||
measurement = "uccm_gpsdo"
|
||||
|
||||
[sources.prs10]
|
||||
enabled = true
|
||||
port = "/dev/ttyUSB0"
|
||||
status_interval = 10
|
||||
stats_interval = 10
|
||||
measurement = "prs10_gpsdo"
|
||||
|
||||
|
||||
[targets]
|
||||
[targets.chrony]
|
||||
|
||||
235
src/chrony.rs
235
src/chrony.rs
@@ -1,13 +1,17 @@
|
||||
use crate::{
|
||||
ChimemonSource, ChimemonSourceChannel, Config, SourceMetric, SourceReport, SourceReportDetails,
|
||||
SourceStatus,
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use chimemon::{ChimemonSource, ChimemonSourceChannel, Config};
|
||||
use chrony_candm::reply::{self, ReplyBody, SourceMode};
|
||||
use chrony_candm::reply::{self, ReplyBody, SourceMode, SourceState};
|
||||
use chrony_candm::request::{self, RequestBody};
|
||||
use chrony_candm::{blocking_query, ClientOptions};
|
||||
use chrony_candm::{ClientOptions, blocking_query};
|
||||
use influxdb2::models::DataPoint;
|
||||
use log::{info, warn};
|
||||
use std::net::{SocketAddr, ToSocketAddrs};
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||
use tokio::join;
|
||||
use tracing::{info, warn};
|
||||
|
||||
pub struct ChronyClient {
|
||||
pub server: SocketAddr,
|
||||
@@ -15,82 +19,136 @@ pub struct ChronyClient {
|
||||
config: Config,
|
||||
}
|
||||
|
||||
fn datapoint_from_tracking(
|
||||
t: &reply::Tracking,
|
||||
config: &Config,
|
||||
) -> Result<DataPoint, Box<dyn std::error::Error>> {
|
||||
let now = SystemTime::now().duration_since(UNIX_EPOCH)?;
|
||||
let measurement = config.sources.chrony.measurement_prefix.to_owned()
|
||||
+ &config.sources.chrony.tracking_measurement;
|
||||
let mut builder =
|
||||
DataPoint::builder(&measurement).timestamp(now.as_nanos().try_into().unwrap());
|
||||
for (key, value) in &config.influxdb.tags {
|
||||
builder = builder.tag(key, value);
|
||||
}
|
||||
|
||||
let point = builder
|
||||
.field("ref_id", t.ref_id as i64)
|
||||
.field("ref_ip_addr", t.ip_addr.to_string())
|
||||
.field("stratum", t.stratum as i64)
|
||||
.field("leap_status", t.leap_status as i64)
|
||||
.field("current_correction", f64::from(t.current_correction))
|
||||
.field("last_offset", f64::from(t.last_offset))
|
||||
.field("rms_offset", f64::from(t.rms_offset))
|
||||
.field("freq_ppm", f64::from(t.freq_ppm))
|
||||
.field("resid_freq_ppm", f64::from(t.resid_freq_ppm))
|
||||
.field("skew_ppm", f64::from(t.skew_ppm))
|
||||
.field("root_delay", f64::from(t.root_delay))
|
||||
.field("root_dispersion", f64::from(t.root_dispersion))
|
||||
.field("last_update_interval", f64::from(t.last_update_interval))
|
||||
.build()?;
|
||||
|
||||
Ok(point)
|
||||
#[derive(Debug)]
|
||||
pub struct ChronyTrackingReport {
|
||||
tags: Arc<Vec<(String, String)>>,
|
||||
pub ref_id: i64,
|
||||
pub ref_ip_addr: String,
|
||||
pub stratum: i64,
|
||||
pub leap_status: i64,
|
||||
pub current_correction: f64,
|
||||
pub last_offset: f64,
|
||||
pub rms_offset: f64,
|
||||
pub freq_ppm: f64,
|
||||
pub resid_freq_ppm: f64,
|
||||
pub skew_ppm: f64,
|
||||
pub root_delay: f64,
|
||||
pub root_dispersion: f64,
|
||||
pub last_update_interval: f64,
|
||||
}
|
||||
|
||||
pub fn datapoint_from_sourcedata(
|
||||
d: &reply::SourceData,
|
||||
config: &Config,
|
||||
) -> Result<DataPoint, Box<dyn std::error::Error>> {
|
||||
let now = SystemTime::now().duration_since(UNIX_EPOCH)?;
|
||||
let measurement = config.sources.chrony.measurement_prefix.to_owned()
|
||||
+ &config.sources.chrony.sources_measurement;
|
||||
let mut builder =
|
||||
DataPoint::builder(&measurement).timestamp(now.as_nanos().try_into().unwrap());
|
||||
for (key, value) in &config.influxdb.tags {
|
||||
builder = builder.tag(key, value)
|
||||
impl SourceReportDetails for ChronyTrackingReport {
|
||||
fn is_healthy(&self) -> bool {
|
||||
true
|
||||
}
|
||||
builder = builder
|
||||
.tag("ref_id", d.ip_addr.to_string())
|
||||
.tag(
|
||||
"mode",
|
||||
match d.mode {
|
||||
SourceMode::Client => String::from("server"),
|
||||
SourceMode::Peer => String::from("peer"),
|
||||
SourceMode::Ref => String::from("refclock"),
|
||||
},
|
||||
)
|
||||
.tag(
|
||||
"state",
|
||||
match d.state {
|
||||
reply::SourceState::Selected => String::from("best"),
|
||||
reply::SourceState::NonSelectable => String::from("unusable"),
|
||||
reply::SourceState::Falseticker => String::from("falseticker"),
|
||||
reply::SourceState::Jittery => String::from("jittery"),
|
||||
reply::SourceState::Unselected => String::from("combined"),
|
||||
reply::SourceState::Selectable => String::from("unused"),
|
||||
},
|
||||
)
|
||||
.field("poll", d.poll as i64)
|
||||
.field("stratum", d.stratum as i64)
|
||||
.field("flags", d.flags.bits() as i64)
|
||||
.field("reachability", d.reachability.count_ones() as i64)
|
||||
.field("since_sample", d.since_sample as i64)
|
||||
.field("orig_latest_meas", f64::from(d.orig_latest_meas))
|
||||
.field("latest_meas", f64::from(d.latest_meas))
|
||||
.field("latest_meas_err", f64::from(d.latest_meas_err));
|
||||
let point = builder.build()?;
|
||||
fn to_metrics(&self) -> Vec<SourceMetric> {
|
||||
let tags = &self.tags;
|
||||
vec![
|
||||
SourceMetric::new_int("ref_id", self.ref_id, tags.clone()),
|
||||
SourceMetric::new_int("stratum", self.stratum, tags.clone()),
|
||||
SourceMetric::new_int("leap_status", self.leap_status, tags.clone()),
|
||||
SourceMetric::new_float("current_correction", self.current_correction, tags.clone()),
|
||||
SourceMetric::new_float("last_offset", self.last_offset, tags.clone()),
|
||||
SourceMetric::new_float("rms_offset", self.rms_offset, tags.clone()),
|
||||
SourceMetric::new_float("freq_ppm", self.freq_ppm, tags.clone()),
|
||||
SourceMetric::new_float("resid_freq_ppm", self.resid_freq_ppm, tags.clone()),
|
||||
SourceMetric::new_float("skew_ppm", self.skew_ppm, tags.clone()),
|
||||
SourceMetric::new_float("root_delay", self.root_delay, tags.clone()),
|
||||
SourceMetric::new_float("root_dispersion", self.root_dispersion, tags.clone()),
|
||||
SourceMetric::new_float(
|
||||
"last_update_interval",
|
||||
self.last_update_interval,
|
||||
tags.clone(),
|
||||
),
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
Ok(point)
|
||||
#[derive(Debug)]
|
||||
pub struct ChronySourcesReport {
|
||||
pub sources: Vec<reply::SourceData>,
|
||||
}
|
||||
|
||||
impl SourceReportDetails for ChronySourcesReport {
|
||||
fn is_healthy(&self) -> bool {
|
||||
//TODO: think about whether there is an idea of unhealthy sources
|
||||
true
|
||||
}
|
||||
fn to_metrics(&self) -> Vec<SourceMetric> {
|
||||
let mut metrics = Vec::with_capacity(8 * self.sources.len());
|
||||
|
||||
for source in &self.sources {
|
||||
let tags = Arc::new(vec![
|
||||
("ref_id".to_owned(), source.ip_addr.to_string()),
|
||||
(
|
||||
"mode".to_owned(),
|
||||
match source.mode {
|
||||
SourceMode::Client => String::from("server"),
|
||||
SourceMode::Peer => String::from("peer"),
|
||||
SourceMode::Ref => String::from("refclock"),
|
||||
},
|
||||
),
|
||||
(
|
||||
"state".to_owned(),
|
||||
match source.state {
|
||||
reply::SourceState::Selected => String::from("best"),
|
||||
reply::SourceState::NonSelectable => String::from("unusable"),
|
||||
reply::SourceState::Falseticker => String::from("falseticker"),
|
||||
reply::SourceState::Jittery => String::from("jittery"),
|
||||
reply::SourceState::Unselected => String::from("combined"),
|
||||
reply::SourceState::Selectable => String::from("unused"),
|
||||
},
|
||||
),
|
||||
]);
|
||||
metrics.extend([
|
||||
SourceMetric::new_int("poll", source.poll as i64, tags.clone()),
|
||||
SourceMetric::new_int("stratum", source.stratum as i64, tags.clone()),
|
||||
SourceMetric::new_int("flags", source.flags.bits() as i64, tags.clone()),
|
||||
SourceMetric::new_int(
|
||||
"reachability",
|
||||
source.reachability.count_ones() as i64,
|
||||
tags.clone(),
|
||||
),
|
||||
SourceMetric::new_int("since_sample", source.since_sample as i64, tags.clone()),
|
||||
SourceMetric::new_float(
|
||||
"orig_latest_meas",
|
||||
source.orig_latest_meas.into(),
|
||||
tags.clone(),
|
||||
),
|
||||
SourceMetric::new_float("latest_meas", source.latest_meas.into(), tags.clone()),
|
||||
SourceMetric::new_float(
|
||||
"latest_meas_err",
|
||||
source.latest_meas_err.into(),
|
||||
tags.clone(),
|
||||
),
|
||||
]);
|
||||
}
|
||||
|
||||
metrics
|
||||
}
|
||||
}
|
||||
|
||||
fn report_from_tracking(
|
||||
t: &reply::Tracking,
|
||||
config: &Config,
|
||||
) -> Result<ChronyTrackingReport, Box<dyn std::error::Error>> {
|
||||
let report = ChronyTrackingReport {
|
||||
tags: Arc::new(vec![]), //TODO: allow configuring tags in the source
|
||||
ref_id: t.ref_id as i64,
|
||||
ref_ip_addr: t.ip_addr.to_string(),
|
||||
stratum: t.stratum as i64,
|
||||
leap_status: t.leap_status as i64,
|
||||
current_correction: t.current_correction.into(),
|
||||
last_offset: t.last_offset.into(),
|
||||
rms_offset: t.rms_offset.into(),
|
||||
freq_ppm: t.freq_ppm.into(),
|
||||
resid_freq_ppm: t.resid_freq_ppm.into(),
|
||||
skew_ppm: t.skew_ppm.into(),
|
||||
root_delay: t.root_delay.into(),
|
||||
root_dispersion: t.root_dispersion.into(),
|
||||
last_update_interval: t.last_update_interval.into(),
|
||||
};
|
||||
Ok(report)
|
||||
}
|
||||
|
||||
impl ChronyClient {
|
||||
@@ -206,13 +264,16 @@ impl ChronyClient {
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let tracking = self.get_tracking().await?;
|
||||
|
||||
let tracking_data = datapoint_from_tracking(&tracking, &self.config)?;
|
||||
let tracking_data = report_from_tracking(&tracking, &self.config)?;
|
||||
let report = SourceReport {
|
||||
name: "chrony-tracking".to_owned(),
|
||||
status: SourceStatus::Unknown,
|
||||
details: Arc::new(tracking_data),
|
||||
};
|
||||
|
||||
info!("Sending tracking data");
|
||||
|
||||
chan.send(tracking_data.into())
|
||||
.expect("Unable to send tracking data to targets");
|
||||
|
||||
chan.send(report.into())?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -221,14 +282,14 @@ impl ChronyClient {
|
||||
chan: &ChimemonSourceChannel,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let sources = self.get_sources().await?;
|
||||
let mut dps = Vec::with_capacity(sources.len());
|
||||
for ds in sources {
|
||||
let source_data = datapoint_from_sourcedata(&ds, &self.config)?;
|
||||
dps.push(source_data);
|
||||
}
|
||||
let details = ChronySourcesReport { sources };
|
||||
let report = SourceReport {
|
||||
name: "chrony-sources".to_owned(),
|
||||
status: SourceStatus::Unknown,
|
||||
details: Arc::new(details),
|
||||
};
|
||||
info!("Sending source data");
|
||||
chan.send(dps.into())
|
||||
.expect("Unable to send source data to targets");
|
||||
chan.send(report.into())?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
use crate::{ChimemonMessage, ChimemonTarget, ChimemonTargetChannel, ChronySockConfig};
|
||||
use async_trait::async_trait;
|
||||
use chimemon::{ChimemonMessage, ChimemonTarget, ChimemonTargetChannel, ChronySockConfig};
|
||||
use libc::{c_double, c_int, timeval};
|
||||
use log::debug;
|
||||
use std::mem;
|
||||
use std::os::unix::net::UnixDatagram;
|
||||
use std::path::PathBuf;
|
||||
use tracing::debug;
|
||||
|
||||
const CHRONY_MAGIC: c_int = 0x534f434b;
|
||||
|
||||
|
||||
451
src/gpsd.rs
Normal file
451
src/gpsd.rs
Normal file
@@ -0,0 +1,451 @@
|
||||
use crate::{
|
||||
ChimemonMessage, ChimemonSource, ChimemonSourceChannel, Config, SourceMetric, SourceReport,
|
||||
SourceReportDetails, SourceStatus,
|
||||
};
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::f64;
|
||||
use std::fmt::Debug;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use backoff::ExponentialBackoff;
|
||||
use futures::{SinkExt, Stream};
|
||||
use futures::{StreamExt, task::Context};
|
||||
use gpsd_proto::{
|
||||
Device, Gst, Mode, Pps, ResponseHandshake, Sky, Tpv, UnifiedResponse, Version, Watch,
|
||||
};
|
||||
use serde::{Deserialize, Deserializer, Serialize};
|
||||
use serde_json;
|
||||
use tokio::net::{TcpStream, ToSocketAddrs, lookup_host};
|
||||
use tokio::time::{Interval, interval, timeout};
|
||||
use tokio_util::codec::{Framed, LinesCodec};
|
||||
use tracing::{debug, debug_span, info, warn};
|
||||
|
||||
pub struct GpsdSource {
|
||||
pub config: Config,
|
||||
conn: GpsdTransport,
|
||||
devices: HashMap<String, Device>,
|
||||
last_gst: Option<Gst>,
|
||||
last_pps: Option<Pps>,
|
||||
last_tpv: Option<Tpv>,
|
||||
last_sky: Option<Sky>,
|
||||
}
|
||||
|
||||
#[derive(Eq, PartialEq, Clone, Copy, Debug)]
|
||||
pub enum GpsdFixType {
|
||||
Unknown,
|
||||
NoFix,
|
||||
Fix2D,
|
||||
Fix3D,
|
||||
Surveyed,
|
||||
}
|
||||
|
||||
impl From<u32> for GpsdFixType {
|
||||
fn from(value: u32) -> Self {
|
||||
match value {
|
||||
0 => Self::Unknown,
|
||||
1 => Self::NoFix,
|
||||
2 => Self::Fix2D,
|
||||
3 => Self::Fix3D,
|
||||
_ => panic!("Invalid fix mode {value}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Mode> for GpsdFixType {
|
||||
fn from(value: Mode) -> Self {
|
||||
match value {
|
||||
Mode::NoFix => Self::NoFix,
|
||||
Mode::Fix2d => Self::Fix2D,
|
||||
Mode::Fix3d => Self::Fix3D,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct GpsdSourceReport {
|
||||
fix_type: GpsdFixType,
|
||||
sats_visible: u8,
|
||||
sats_tracked: u8,
|
||||
tdop: f64,
|
||||
}
|
||||
|
||||
impl SourceReportDetails for GpsdSourceReport {
|
||||
fn is_healthy(&self) -> bool {
|
||||
self.fix_type != GpsdFixType::Unknown && self.fix_type != GpsdFixType::NoFix
|
||||
}
|
||||
fn to_metrics(&self) -> Vec<SourceMetric> {
|
||||
let no_tags = Arc::new(vec![]);
|
||||
vec![
|
||||
SourceMetric::new_int("sats_visible", self.sats_visible as i64, no_tags.clone()),
|
||||
SourceMetric::new_int("sats_tracked", self.sats_tracked as i64, no_tags.clone()),
|
||||
SourceMetric::new_float("tdop", self.tdop, no_tags.clone()),
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
impl GpsdSource {
|
||||
pub async fn new(config: Config) -> Result<Self, std::io::Error> {
|
||||
let conn = GpsdTransport::new(&config.sources.gpsd.host).await?;
|
||||
Ok(Self {
|
||||
config,
|
||||
conn,
|
||||
devices: HashMap::new(),
|
||||
last_gst: None,
|
||||
last_pps: None,
|
||||
last_tpv: None,
|
||||
last_sky: None,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl GpsdSource {
|
||||
async fn send_status(&self, chan: &mut ChimemonSourceChannel) {
|
||||
let sky = self.last_sky.as_ref();
|
||||
let tpv = self.last_tpv.as_ref();
|
||||
|
||||
let (sats_tracked, sats_visible) = sky.map_or((0, 0), |sky| {
|
||||
let sats = sky.satellites.as_deref().unwrap_or_default();
|
||||
(
|
||||
sats.iter().filter(|s| s.used).count() as u8,
|
||||
sats.len() as u8,
|
||||
)
|
||||
});
|
||||
|
||||
let tdop = sky
|
||||
.and_then(|sky| sky.tdop)
|
||||
.map_or(f64::INFINITY, |tdop| tdop as f64);
|
||||
|
||||
chan.send(ChimemonMessage::SourceReport(SourceReport {
|
||||
name: "gpsd".into(),
|
||||
status: SourceStatus::Unknown,
|
||||
details: Arc::new(GpsdSourceReport {
|
||||
fix_type: tpv.map_or(GpsdFixType::Unknown, |tpv| tpv.mode.into()),
|
||||
sats_tracked,
|
||||
sats_visible,
|
||||
tdop,
|
||||
}),
|
||||
}))
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
fn handle_msg(&mut self, msg: String) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let _span = debug_span!("handle_msg").entered();
|
||||
let parsed = serde_json::from_str::<UnifiedResponse>(&msg)?;
|
||||
debug!("Received {parsed:?}");
|
||||
match parsed {
|
||||
UnifiedResponse::Device(d) => {
|
||||
if let Some(path) = &d.path {
|
||||
self.devices.insert(path.to_owned(), d);
|
||||
} else {
|
||||
warn!("No path on DEVICE response, ignoring.");
|
||||
}
|
||||
}
|
||||
UnifiedResponse::Gst(g) => self.last_gst = Some(g),
|
||||
UnifiedResponse::Pps(p) => self.last_pps = Some(p),
|
||||
UnifiedResponse::Sky(s) => {
|
||||
self.last_sky = Some({
|
||||
let mut s = s;
|
||||
if s.satellites.is_none() {
|
||||
s.satellites = self.last_sky.as_mut().and_then(|old| old.satellites.take());
|
||||
}
|
||||
s
|
||||
})
|
||||
}
|
||||
UnifiedResponse::Tpv(t) => self.last_tpv = Some(t),
|
||||
_ => warn!("Unhandled response `{parsed:?}`"),
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ChimemonSource for GpsdSource {
|
||||
async fn run(mut self, mut chan: ChimemonSourceChannel) {
|
||||
info!("gpsd task started");
|
||||
self.conn.ensure_connection().await.unwrap();
|
||||
let mut ticker = interval(Duration::from_secs(self.config.sources.gpsd.interval));
|
||||
|
||||
let mut params = WatchParams::default();
|
||||
params.json = Some(true);
|
||||
self.conn
|
||||
.cmd_response(&GpsdCommand::Watch(Some(params)))
|
||||
.await
|
||||
.unwrap();
|
||||
loop {
|
||||
let framed = self.conn.framed.as_mut().expect("must be connected");
|
||||
tokio::select! {
|
||||
_ = ticker.tick() => {
|
||||
self.send_status(&mut chan).await
|
||||
},
|
||||
maybe_msg = framed.next() => {
|
||||
if let Some(Ok(msg)) = maybe_msg {
|
||||
self.handle_msg(msg).unwrap()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct GpsdTransport {
|
||||
host: SocketAddr,
|
||||
framed: Option<Framed<TcpStream, LinesCodec>>,
|
||||
conn_backoff: ExponentialBackoff,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Serialize)]
|
||||
#[repr(u8)]
|
||||
pub enum RawMode {
|
||||
Off = 0,
|
||||
RawHex = 1,
|
||||
RawBin = 2,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Serialize)]
|
||||
#[repr(u8)]
|
||||
pub enum NativeMode {
|
||||
Nmea = 0,
|
||||
Alt = 1,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Serialize)]
|
||||
#[repr(u8)]
|
||||
pub enum ParityMode {
|
||||
None = b'N',
|
||||
Odd = b'O',
|
||||
Even = b'E',
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize)]
|
||||
struct WatchParams {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
enable: Option<bool>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
json: Option<bool>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
nmea: Option<bool>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
raw: Option<RawMode>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
scaled: Option<bool>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
split24: Option<bool>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pps: Option<bool>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
device: Option<String>,
|
||||
}
|
||||
|
||||
impl Default for WatchParams {
|
||||
fn default() -> Self {
|
||||
WatchParams {
|
||||
enable: Some(true),
|
||||
json: Some(false),
|
||||
nmea: None,
|
||||
raw: None,
|
||||
scaled: None,
|
||||
split24: None,
|
||||
pps: None,
|
||||
device: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize)]
|
||||
struct DeviceParams {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
bps: Option<u64>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
cycle: Option<f64>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
flags: Option<u8>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
hexdata: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
native: Option<NativeMode>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
parity: Option<ParityMode>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
path: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
readonly: Option<bool>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
sernum: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
stopbits: Option<u8>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
enum GpsdCommand {
|
||||
Version, // no params
|
||||
Devices, // no params
|
||||
Watch(Option<WatchParams>),
|
||||
Poll, // I don't understand the protocol for this one
|
||||
Device(Option<DeviceParams>),
|
||||
}
|
||||
|
||||
impl GpsdCommand {
|
||||
fn command_string(&self) -> &str {
|
||||
match self {
|
||||
Self::Version => "?VERSION",
|
||||
Self::Devices => "?DEVICES",
|
||||
Self::Watch(_) => "?WATCH",
|
||||
Self::Poll => "?POLL",
|
||||
Self::Device(_) => "?DEVICE",
|
||||
}
|
||||
}
|
||||
|
||||
fn expected_responses(&self) -> usize {
|
||||
match self {
|
||||
Self::Version => 1,
|
||||
Self::Devices => 1,
|
||||
Self::Watch(_) => 2,
|
||||
Self::Poll => 1,
|
||||
Self::Device(_) => 1,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ToString for GpsdCommand {
|
||||
fn to_string(&self) -> String {
|
||||
let s = self.command_string().to_owned();
|
||||
match self {
|
||||
Self::Version | Self::Devices | Self::Poll | Self::Watch(None) | Self::Device(None) => {
|
||||
s + ";"
|
||||
}
|
||||
Self::Watch(Some(w)) => s + "=" + &serde_json::to_string(w).unwrap() + ";",
|
||||
Self::Device(Some(d)) => s + "=" + &serde_json::to_string(d).unwrap() + ";",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl GpsdTransport {
|
||||
async fn new<T: ToSocketAddrs + Debug>(host: &T) -> Result<Self, std::io::Error> {
|
||||
// TODO: implement proper handling of multiple responses
|
||||
let host_addr = lookup_host(host).await?.next().ok_or(std::io::Error::new(
|
||||
std::io::ErrorKind::NotFound,
|
||||
format!("No response looking up `{:?}`", host),
|
||||
))?;
|
||||
Ok(Self {
|
||||
host: host_addr,
|
||||
framed: None,
|
||||
conn_backoff: ExponentialBackoff::default(),
|
||||
})
|
||||
}
|
||||
async fn connect(&mut self) -> Result<(), Box<dyn std::error::Error>> {
|
||||
info!("Connecting to gpsd @ {}", self.host);
|
||||
let mut framed = backoff::future::retry_notify(
|
||||
self.conn_backoff.clone(),
|
||||
async || {
|
||||
Ok(Framed::new(
|
||||
TcpStream::connect(self.host).await?,
|
||||
LinesCodec::new(),
|
||||
))
|
||||
},
|
||||
|e, d| warn!("Failed to connect to {} after {:?}: `{}`", self.host, d, e),
|
||||
)
|
||||
.await?;
|
||||
debug!("Waiting for initial VERSION");
|
||||
if let Ok(Some(Ok(r))) = timeout(Duration::from_secs(5), framed.next()).await {
|
||||
if let Ok(version) = serde_json::from_str::<Version>(&r) {
|
||||
info!(
|
||||
"Connected to gpsd @ {}, release {}",
|
||||
self.host, version.release
|
||||
)
|
||||
} else {
|
||||
warn!("Got unexpected non-VERSION response after connection (`{r}`)")
|
||||
}
|
||||
self.framed = Some(framed);
|
||||
Ok(())
|
||||
} else {
|
||||
Err("Unexpected failure to receive initial handshake response".into())
|
||||
}
|
||||
}
|
||||
async fn ensure_connection(&mut self) -> Result<(), Box<dyn std::error::Error>> {
|
||||
if let Some(conn) = &self.framed {
|
||||
Ok(())
|
||||
} else {
|
||||
self.connect().await
|
||||
}
|
||||
}
|
||||
async fn cmd_response(
|
||||
&mut self,
|
||||
cmd: &GpsdCommand,
|
||||
) -> Result<Vec<UnifiedResponse>, Box<dyn std::error::Error>> {
|
||||
debug!("Command: `{cmd:?}`");
|
||||
self.ensure_connection().await?;
|
||||
|
||||
let mut responses = Vec::new();
|
||||
if let Some(conn) = &mut self.framed {
|
||||
debug!("Raw command: `{}`", cmd.to_string());
|
||||
conn.send(cmd.to_string()).await?;
|
||||
for _ in 0..cmd.expected_responses() {
|
||||
match conn.next().await {
|
||||
None => return Err("Connection lost".into()),
|
||||
Some(Err(e)) => return Err(format!("Unable to parse response {e}").into()),
|
||||
Some(Ok(r)) => {
|
||||
debug!("Raw response: `{r}`");
|
||||
responses.push(serde_json::from_str::<UnifiedResponse>(&r)?)
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return Err("Missing connection despite ensure".into());
|
||||
}
|
||||
|
||||
Ok(responses)
|
||||
}
|
||||
|
||||
async fn stream(
|
||||
&mut self,
|
||||
) -> Result<
|
||||
impl Stream<Item = Result<UnifiedResponse, Box<dyn std::error::Error>>>,
|
||||
Box<dyn std::error::Error>,
|
||||
> {
|
||||
self.ensure_connection().await?;
|
||||
if let Some(conn) = &mut self.framed {
|
||||
Ok(conn.map(|line| Ok(serde_json::from_str::<UnifiedResponse>(&line?)?)))
|
||||
} else {
|
||||
Err("No connection after connecting.".into())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
mod tests {
|
||||
use gpsd_proto::{ResponseData, UnifiedResponse};
|
||||
use tokio_stream::StreamExt;
|
||||
|
||||
use crate::gpsd::{GpsdCommand, GpsdTransport, WatchParams};
|
||||
use std::sync::Once;
|
||||
|
||||
static INIT: Once = Once::new();
|
||||
|
||||
fn init_logger() {
|
||||
INIT.call_once(|| tracing_subscriber::fmt::init());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_gpsd() {
|
||||
init_logger();
|
||||
let mut gpsd = GpsdTransport::new(&"192.168.65.93:2947").await.unwrap();
|
||||
gpsd.connect().await.unwrap();
|
||||
let mut params = WatchParams::default();
|
||||
params.enable = Some(true);
|
||||
params.json = Some(true);
|
||||
let res = {
|
||||
gpsd.cmd_response(&GpsdCommand::Watch(Some(params)))
|
||||
.await
|
||||
.unwrap()
|
||||
};
|
||||
println!("{res:?}");
|
||||
while let Some(Ok(s)) = gpsd.framed.as_mut().unwrap().next().await {
|
||||
let res2 = serde_json::from_str::<UnifiedResponse>(&s);
|
||||
println!("{res2:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
151
src/hwmon.rs
151
src/hwmon.rs
@@ -1,23 +1,100 @@
|
||||
use crate::{
|
||||
ChimemonSource, ChimemonSourceChannel, Config, SourceMetric, SourceReport, SourceReportDetails,
|
||||
SourceStatus,
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use chimemon::{ChimemonSource, ChimemonSourceChannel, Config};
|
||||
use futures::{stream, StreamExt};
|
||||
use futures::{StreamExt, stream};
|
||||
use influxdb2::models::DataPoint;
|
||||
use log::{debug, info};
|
||||
use std::{
|
||||
fs::File,
|
||||
io::Read,
|
||||
path::PathBuf,
|
||||
sync::Arc,
|
||||
time::{Duration, SystemTime, UNIX_EPOCH},
|
||||
};
|
||||
use tracing::{Instrument, debug, error, info, info_span, warn};
|
||||
|
||||
pub struct HwmonSource {
|
||||
config: Config,
|
||||
sensors: Vec<HwmonSensor>,
|
||||
sensors: Vec<Arc<HwmonSensor>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct HwmonSensor {
|
||||
path: PathBuf,
|
||||
name: String,
|
||||
value_path: PathBuf,
|
||||
device: String,
|
||||
sensor: String,
|
||||
name: String,
|
||||
label: Option<String>,
|
||||
tags: Arc<Vec<(String, String)>>,
|
||||
}
|
||||
|
||||
impl HwmonSensor {
|
||||
fn new(name: &str, device: &str, sensor: &str) -> Self {
|
||||
let value_path = PathBuf::from(HWMON_ROOT)
|
||||
.join(device)
|
||||
.join(sensor.to_owned() + "_input");
|
||||
let label_path_raw = PathBuf::from(HWMON_ROOT)
|
||||
.join(device)
|
||||
.join(sensor.to_owned() + "_label");
|
||||
let label = if label_path_raw.is_file() {
|
||||
let mut f =
|
||||
File::open(&label_path_raw).expect(&format!("Unable to open `{label_path_raw:?}`"));
|
||||
let mut label = String::new();
|
||||
f.read_to_string(&mut label)
|
||||
.expect(&format!("Unable to read from `{label_path_raw:?}"));
|
||||
Some(label.trim().to_owned())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let mut tags_vec = vec![
|
||||
("name".to_owned(), name.to_owned()),
|
||||
("device".to_owned(), device.to_owned()),
|
||||
("sensor".to_owned(), sensor.to_owned()),
|
||||
];
|
||||
if let Some(label) = &label {
|
||||
tags_vec.push(("label".to_owned(), label.clone()))
|
||||
}
|
||||
Self {
|
||||
value_path,
|
||||
label,
|
||||
device: device.to_owned(),
|
||||
sensor: sensor.to_owned(),
|
||||
name: name.to_owned(),
|
||||
tags: Arc::new(tags_vec),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct HwmonReport {
|
||||
values: Vec<(Arc<HwmonSensor>, f64)>,
|
||||
}
|
||||
|
||||
impl SourceReportDetails for HwmonReport {
|
||||
fn is_healthy(&self) -> bool {
|
||||
//self.alarms.iter().any(|(_sensor, alarm)| *alarm)
|
||||
true
|
||||
}
|
||||
fn to_metrics(&self) -> Vec<SourceMetric> {
|
||||
let mut metrics = Vec::new();
|
||||
for (sensor, value) in &self.values {
|
||||
metrics.push(SourceMetric::new_float(
|
||||
"hwmon_value",
|
||||
*value,
|
||||
sensor.tags.clone(),
|
||||
))
|
||||
}
|
||||
// for (sensor, alarm) in &self.alarms {
|
||||
// metrics.push(SourceMetric::new_bool(
|
||||
// "hwmon_alarm",
|
||||
// *alarm,
|
||||
// sensor.tags.clone(),
|
||||
// ))
|
||||
// }
|
||||
|
||||
metrics
|
||||
}
|
||||
}
|
||||
|
||||
const HWMON_ROOT: &str = "/sys/class/hwmon";
|
||||
@@ -29,19 +106,14 @@ impl HwmonSource {
|
||||
.hwmon
|
||||
.sensors
|
||||
.iter()
|
||||
.map(|(k, v)| HwmonSensor {
|
||||
name: k.into(),
|
||||
device: v.name.clone(),
|
||||
sensor: v.sensor.clone(),
|
||||
path: PathBuf::from(HWMON_ROOT).join(&v.name).join(&v.sensor),
|
||||
})
|
||||
.map(|(k, v)| Arc::new(HwmonSensor::new(k, &v.name, &v.sensor)))
|
||||
.collect();
|
||||
|
||||
HwmonSource { config, sensors }
|
||||
}
|
||||
|
||||
async fn get_raw_value(sensor: &HwmonSensor) -> Result<String, std::io::Error> {
|
||||
tokio::fs::read_to_string(&sensor.path).await
|
||||
tokio::fs::read_to_string(&sensor.value_path).await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -53,31 +125,38 @@ impl ChimemonSource for HwmonSource {
|
||||
tokio::time::interval(Duration::from_secs(self.config.sources.hwmon.interval));
|
||||
loop {
|
||||
interval.tick().await;
|
||||
let s = stream::iter(&self.sensors).then(|s| async {
|
||||
let sensor_val = HwmonSource::get_raw_value(s)
|
||||
.await
|
||||
.expect("Unable to read sensor");
|
||||
debug!(
|
||||
"hwmon {} raw value {}",
|
||||
s.path.to_string_lossy(),
|
||||
sensor_val
|
||||
);
|
||||
let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
|
||||
let mut builder = DataPoint::builder(&self.config.sources.hwmon.measurement)
|
||||
.timestamp(now.as_nanos().try_into().unwrap());
|
||||
for (key, value) in &self.config.influxdb.tags {
|
||||
builder = builder.tag(key, value)
|
||||
let mut values = Vec::new();
|
||||
for s in &self.sensors {
|
||||
if let Ok(sensor_val) = HwmonSource::get_raw_value(s).await {
|
||||
debug!(
|
||||
"hwmon {} raw value {}",
|
||||
s.value_path.to_string_lossy(),
|
||||
sensor_val
|
||||
);
|
||||
if let Ok(parsed) = sensor_val.trim().parse::<f64>() {
|
||||
values.push((s.clone(), parsed));
|
||||
} else {
|
||||
error!(
|
||||
"Unable to parse sensor value {sensor_val} at {}",
|
||||
s.value_path.to_string_lossy()
|
||||
);
|
||||
}
|
||||
} else {
|
||||
error!("Unable to get hwmon sensor value");
|
||||
}
|
||||
builder = builder
|
||||
.tag("name", &s.name)
|
||||
.tag("sensor", &s.sensor)
|
||||
.tag("device", &s.device)
|
||||
.field("value", sensor_val.trim().parse::<i64>().unwrap());
|
||||
builder.build().unwrap()
|
||||
});
|
||||
}
|
||||
let report = SourceReport {
|
||||
name: "hwmon".to_owned(),
|
||||
status: SourceStatus::Healthy,
|
||||
details: Arc::new(HwmonReport { values }),
|
||||
};
|
||||
info!("Writing hwmon data");
|
||||
chan.send(s.collect::<Vec<DataPoint>>().await.into())
|
||||
.unwrap();
|
||||
match chan.send(report.into()) {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
warn!("Unable to send to message channel ({e})")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
127
src/lib.rs
127
src/lib.rs
@@ -1,19 +1,28 @@
|
||||
pub mod chrony;
|
||||
pub mod chrony_refclock;
|
||||
pub mod gpsd;
|
||||
pub mod hwmon;
|
||||
pub mod prs10;
|
||||
pub mod uccm;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use chrono::{DateTime, Utc};
|
||||
use figment::{
|
||||
Figment,
|
||||
providers::{Format, Serialized, Toml},
|
||||
util::map,
|
||||
value::Map,
|
||||
Figment,
|
||||
};
|
||||
use gethostname::gethostname;
|
||||
use influxdb2::models::DataPoint;
|
||||
use serde_derive::{Deserialize, Serialize};
|
||||
use std::path::Path;
|
||||
use tokio::sync::broadcast::*;
|
||||
|
||||
use std::{fmt::Debug, net::SocketAddr, path::Path, sync::Arc};
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
pub struct InfluxConfig {
|
||||
pub enabled: bool,
|
||||
pub url: String,
|
||||
pub org: String,
|
||||
pub bucket: String,
|
||||
@@ -25,6 +34,7 @@ impl Default for InfluxConfig {
|
||||
fn default() -> Self {
|
||||
let host = gethostname().into_string().unwrap();
|
||||
InfluxConfig {
|
||||
enabled: false,
|
||||
url: "http://localhost:8086".into(),
|
||||
org: "default".into(),
|
||||
bucket: "default".into(),
|
||||
@@ -100,6 +110,47 @@ impl Default for HwmonConfig {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
pub struct GpsdConfig {
|
||||
pub enabled: bool,
|
||||
pub interval: u64,
|
||||
pub host: String,
|
||||
}
|
||||
|
||||
impl Default for GpsdConfig {
|
||||
fn default() -> Self {
|
||||
GpsdConfig {
|
||||
enabled: false,
|
||||
interval: 60,
|
||||
host: "localhost:2947".into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
pub struct Prs10Config {
|
||||
pub enabled: bool,
|
||||
pub port: String,
|
||||
pub baud: u32,
|
||||
pub timeout: std::time::Duration,
|
||||
pub status_interval: std::time::Duration,
|
||||
pub stats_interval: std::time::Duration,
|
||||
}
|
||||
|
||||
impl Default for Prs10Config {
|
||||
fn default() -> Self {
|
||||
Prs10Config {
|
||||
enabled: false,
|
||||
port: "/dev/ttyS0".into(),
|
||||
baud: 9600,
|
||||
timeout: std::time::Duration::from_secs(1),
|
||||
status_interval: std::time::Duration::from_secs(10),
|
||||
stats_interval: std::time::Duration::from_secs(10),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct TimeReport {
|
||||
pub system_time: DateTime<Utc>,
|
||||
@@ -109,6 +160,67 @@ pub struct TimeReport {
|
||||
pub valid: bool,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum SourceStatus {
|
||||
Healthy,
|
||||
LossOfSignal(Option<String>),
|
||||
LossOfSync(Option<String>),
|
||||
Other(Option<String>),
|
||||
Unknown,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum MetricValue {
|
||||
Int(i64),
|
||||
Float(f64),
|
||||
Bool(bool),
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct SourceMetric {
|
||||
name: String,
|
||||
value: MetricValue,
|
||||
tags: Arc<Vec<(String, String)>>,
|
||||
}
|
||||
|
||||
impl SourceMetric {
|
||||
pub fn new_int(name: &str, value: i64, tags: Arc<Vec<(String, String)>>) -> Self {
|
||||
Self {
|
||||
name: name.to_owned(),
|
||||
value: MetricValue::Int(value),
|
||||
tags,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_float(name: &str, value: f64, tags: Arc<Vec<(String, String)>>) -> Self {
|
||||
Self {
|
||||
name: name.to_owned(),
|
||||
value: MetricValue::Float(value),
|
||||
tags,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_bool(name: &str, value: bool, tags: Arc<Vec<(String, String)>>) -> Self {
|
||||
Self {
|
||||
name: name.to_owned(),
|
||||
value: MetricValue::Bool(value),
|
||||
tags,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub trait SourceReportDetails: Debug + Send + Sync {
|
||||
fn to_metrics(&self) -> Vec<SourceMetric>;
|
||||
fn is_healthy(&self) -> bool;
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct SourceReport {
|
||||
pub name: String,
|
||||
pub status: SourceStatus,
|
||||
pub details: Arc<dyn SourceReportDetails>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
pub struct UCCMConfig {
|
||||
pub enabled: bool,
|
||||
@@ -137,6 +249,8 @@ pub struct SourcesConfig {
|
||||
pub chrony: ChronyConfig,
|
||||
pub hwmon: HwmonConfig,
|
||||
pub uccm: UCCMConfig,
|
||||
pub gpsd: GpsdConfig,
|
||||
pub prs10: Prs10Config,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Default)]
|
||||
@@ -155,11 +269,12 @@ pub fn load_config(filename: &Path) -> Figment {
|
||||
Figment::from(Serialized::defaults(Config::default())).merge(Toml::file(filename))
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum ChimemonMessage {
|
||||
DataPoint(DataPoint),
|
||||
DataPoints(Vec<DataPoint>),
|
||||
TimeReport(TimeReport),
|
||||
SourceReport(SourceReport),
|
||||
}
|
||||
|
||||
impl From<DataPoint> for ChimemonMessage {
|
||||
@@ -179,6 +294,12 @@ impl From<TimeReport> for ChimemonMessage {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<SourceReport> for ChimemonMessage {
|
||||
fn from(sr: SourceReport) -> Self {
|
||||
ChimemonMessage::SourceReport(sr)
|
||||
}
|
||||
}
|
||||
|
||||
pub type ChimemonSourceChannel = Sender<ChimemonMessage>;
|
||||
pub type ChimemonTargetChannel = Receiver<ChimemonMessage>;
|
||||
|
||||
|
||||
168
src/main.rs
168
src/main.rs
@@ -1,17 +1,15 @@
|
||||
mod chrony;
|
||||
mod chrony_refclock;
|
||||
mod hwmon;
|
||||
mod uccm;
|
||||
|
||||
use clap::{Parser, ValueEnum};
|
||||
use env_logger::{self, Env};
|
||||
use futures::future::join_all;
|
||||
use log::{debug, info};
|
||||
use std::path::Path;
|
||||
use tokio::sync::broadcast;
|
||||
use tracing::{Instrument, debug, error, info, info_span, warn};
|
||||
use tracing_subscriber;
|
||||
|
||||
use crate::{chrony::*, chrony_refclock::ChronySockServer, hwmon::HwmonSource, uccm::UCCMMonitor};
|
||||
use chimemon::*;
|
||||
use crate::{
|
||||
chrony::*, chrony_refclock::ChronySockServer, hwmon::HwmonSource, prs10::Prs10Monitor,
|
||||
uccm::UCCMMonitor,
|
||||
};
|
||||
use chimemon::{gpsd::GpsdSource, *};
|
||||
|
||||
const PROGRAM_NAME: &str = "chimemon";
|
||||
const VERSION: &str = "0.0.1";
|
||||
@@ -35,33 +33,51 @@ struct Args {
|
||||
|
||||
#[tokio::main(flavor = "current_thread")]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let args = Args::parse();
|
||||
let loglevel = args
|
||||
.log_level
|
||||
.to_possible_value()
|
||||
.unwrap()
|
||||
.get_name()
|
||||
.to_owned();
|
||||
env_logger::Builder::from_env(Env::default().default_filter_or(loglevel)).init();
|
||||
tracing_subscriber::fmt::init();
|
||||
|
||||
info!("{} v{} starting...", PROGRAM_NAME, VERSION);
|
||||
let args = Args::parse();
|
||||
|
||||
info!("{PROGRAM_NAME} v{VERSION} starting...");
|
||||
let fig = load_config(Path::new(&args.config_file));
|
||||
debug!("{:?}", fig);
|
||||
debug!("{fig:?}");
|
||||
let config: Config = fig.extract()?;
|
||||
|
||||
let mut tasks = Vec::new();
|
||||
let (tx, _) = broadcast::channel(16);
|
||||
let sourcechan: ChimemonSourceChannel = tx;
|
||||
|
||||
info!(
|
||||
"Connecting to influxdb {} org: {} using token",
|
||||
&config.influxdb.url, &config.influxdb.org
|
||||
);
|
||||
let influx = influxdb2::Client::new(
|
||||
&config.influxdb.url,
|
||||
&config.influxdb.org,
|
||||
&config.influxdb.token,
|
||||
);
|
||||
if config.influxdb.enabled {
|
||||
info!(
|
||||
"Connecting to influxdb {} org: {} using token",
|
||||
&config.influxdb.url, &config.influxdb.org
|
||||
);
|
||||
let config = config.clone();
|
||||
let influx = influxdb2::Client::new(
|
||||
&config.influxdb.url,
|
||||
&config.influxdb.org,
|
||||
&config.influxdb.token,
|
||||
);
|
||||
|
||||
let mut influxrx = sourcechan.subscribe();
|
||||
|
||||
tasks.push(tokio::spawn(
|
||||
async move {
|
||||
let stream = async_stream::stream! {
|
||||
while let Ok(msg) = influxrx.recv().await {
|
||||
match msg { ChimemonMessage::DataPoint(dp) => {
|
||||
yield dp
|
||||
}, ChimemonMessage::DataPoints(dps) => {
|
||||
for p in dps {
|
||||
yield p
|
||||
}
|
||||
}, _ => {}
|
||||
} }
|
||||
};
|
||||
influx.write(&config.influxdb.bucket, stream).await.unwrap();
|
||||
}
|
||||
.instrument(info_span!("influx-task")),
|
||||
));
|
||||
}
|
||||
|
||||
let chrony = if config.sources.chrony.enabled {
|
||||
Some(ChronyClient::new(config.to_owned()))
|
||||
@@ -69,7 +85,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
None
|
||||
};
|
||||
if let Some(c) = chrony {
|
||||
tasks.push(tokio::spawn(c.run(sourcechan.clone())));
|
||||
tasks.push(tokio::spawn(
|
||||
c.run(sourcechan.clone())
|
||||
.instrument(info_span!("chrony-task")),
|
||||
));
|
||||
};
|
||||
|
||||
let hwmon = if config.sources.hwmon.enabled {
|
||||
@@ -78,18 +97,23 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
None
|
||||
};
|
||||
if let Some(hwmon) = hwmon {
|
||||
tasks.push(tokio::spawn(hwmon.run(sourcechan.clone())));
|
||||
tasks.push(tokio::spawn(
|
||||
hwmon
|
||||
.run(sourcechan.clone())
|
||||
.instrument(info_span!("hwmon-task")),
|
||||
));
|
||||
};
|
||||
|
||||
let uccm = if config.sources.uccm.enabled {
|
||||
info!("Spawning UCCMMonitor");
|
||||
Some(UCCMMonitor::new(config.to_owned()))
|
||||
} else {
|
||||
info!("UCCMMonitor not configured");
|
||||
None
|
||||
};
|
||||
if let Some(uccm) = uccm {
|
||||
tasks.push(tokio::spawn(uccm.run(sourcechan.clone())));
|
||||
tasks.push(tokio::spawn(
|
||||
uccm.run(sourcechan.clone())
|
||||
.instrument(info_span!("uccm-task")),
|
||||
));
|
||||
};
|
||||
|
||||
let chrony_refclock = if config.targets.chrony.enabled {
|
||||
@@ -98,33 +122,67 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
None
|
||||
};
|
||||
if let Some(chrony_refclock) = chrony_refclock {
|
||||
tasks.push(tokio::spawn(chrony_refclock.run(sourcechan.subscribe())));
|
||||
tasks.push(tokio::spawn(
|
||||
chrony_refclock
|
||||
.run(sourcechan.subscribe())
|
||||
.instrument(info_span!("chrony-refclock-task")),
|
||||
));
|
||||
};
|
||||
|
||||
let mut influxrx = sourcechan.subscribe();
|
||||
let gpsd = if config.sources.gpsd.enabled {
|
||||
Some(GpsdSource::new(config.to_owned()).await.unwrap())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
if let Some(gpsd) = gpsd {
|
||||
tasks.push(tokio::spawn(
|
||||
gpsd.run(sourcechan.clone())
|
||||
.instrument(info_span!("gpsd-task")),
|
||||
))
|
||||
}
|
||||
|
||||
tasks.push(tokio::spawn(async move {
|
||||
let stream = async_stream::stream! {
|
||||
while let Ok(msg) = influxrx.recv().await {
|
||||
match msg { ChimemonMessage::DataPoint(dp) => {
|
||||
yield dp
|
||||
}, ChimemonMessage::DataPoints(dps) => {
|
||||
for p in dps {
|
||||
yield p
|
||||
let prs10 = if config.sources.prs10.enabled {
|
||||
Some(Prs10Monitor::new(config.sources.prs10))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
if let Some(prs10) = prs10 {
|
||||
tasks.push(tokio::spawn(
|
||||
prs10
|
||||
.run(sourcechan.clone())
|
||||
.instrument(info_span!("prs10-task")),
|
||||
))
|
||||
}
|
||||
|
||||
if tasks.len() == 0 {
|
||||
error!("No tasks configured, exiting.");
|
||||
return Ok(()); // not an error, but exit before starting a dummy task
|
||||
}
|
||||
|
||||
if sourcechan.receiver_count() == 0 {
|
||||
warn!("No consumers configured, events will be discarded");
|
||||
let mut chan = sourcechan.subscribe();
|
||||
// spawn a dummy task to reap the channel and keep the process alive
|
||||
tasks.push(tokio::spawn(
|
||||
async move {
|
||||
loop {
|
||||
while let Ok(m) = chan.recv().await {
|
||||
info!("received {m:?}");
|
||||
match m {
|
||||
ChimemonMessage::SourceReport(report) => {
|
||||
let metrics = report.details.to_metrics();
|
||||
info!("metrics: {metrics:?}");
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}, ChimemonMessage::TimeReport(_tr) => {}
|
||||
} }
|
||||
};
|
||||
influx.write(&config.influxdb.bucket, stream).await.unwrap();
|
||||
}));
|
||||
}
|
||||
}
|
||||
.instrument(info_span!("dummy-receiver-task")),
|
||||
))
|
||||
}
|
||||
|
||||
// let mut debugrx = sourcechan.subscribe();
|
||||
// tasks.push(tokio::spawn(async move {
|
||||
// loop {
|
||||
// let v = debugrx.recv().await;
|
||||
// warn!("streamed: {:?}", v.unwrap());
|
||||
// }
|
||||
// }));
|
||||
debug!("Task setup complete, tasks: {}", tasks.len());
|
||||
|
||||
join_all(tasks).await;
|
||||
|
||||
|
||||
487
src/prs10.rs
Normal file
487
src/prs10.rs
Normal file
@@ -0,0 +1,487 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::{
|
||||
ChimemonMessage, ChimemonSource, ChimemonSourceChannel, Prs10Config, SourceMetric,
|
||||
SourceReport, SourceReportDetails, SourceStatus,
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use bit_struct::u4;
|
||||
use bitflags::bitflags;
|
||||
use itertools::Itertools;
|
||||
use serde::Deserialize;
|
||||
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, ReadHalf, WriteHalf};
|
||||
use tokio::select;
|
||||
use tokio::sync::OnceCell;
|
||||
use tokio::time::{Interval, interval, timeout};
|
||||
use tokio_serial;
|
||||
use tokio_serial::{SerialPort, SerialStream};
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Prs10Info {
|
||||
pub model: String,
|
||||
pub version: String,
|
||||
pub serial: String,
|
||||
}
|
||||
|
||||
impl TryFrom<&[u8]> for Prs10Info {
|
||||
type Error = Box<dyn std::error::Error>;
|
||||
fn try_from(value: &[u8]) -> Result<Self, Self::Error> {
|
||||
let parts = value.splitn(3, |c| *c == b'_');
|
||||
let (model, version, serial) = parts
|
||||
.collect_tuple()
|
||||
.ok_or("Not enough parts in ID response")?;
|
||||
Ok(Self {
|
||||
model: str::from_utf8(model)?.to_string(),
|
||||
version: str::from_utf8(version)?.to_string(),
|
||||
serial: str::from_utf8(serial)?.to_string(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
bitflags! {
|
||||
pub struct Prs10PowerLampFlags: u8 {
|
||||
const ELEC_VOLTAGE_LOW = (1<<0);
|
||||
const ELEC_VOLTAGE_HIGH = (1<<1);
|
||||
const HEAT_VOLTAGE_LOW = (1<<2);
|
||||
const HEAT_VOLTAGE_HIGH = (1<<3);
|
||||
const LAMP_LIGHT_LOW = (1<<4);
|
||||
const LAMP_LIGHT_HIGH = (1<<5);
|
||||
const GATE_VOLTAGE_LOW = (1<<6);
|
||||
const GATE_VOLTAGE_HIGH = (1<<7);
|
||||
}
|
||||
}
|
||||
|
||||
impl Prs10PowerLampFlags {
|
||||
pub fn get_metrics(&self, no_tags: Vec<String>) -> Vec<SourceMetric> {
|
||||
// Define the mapping statically
|
||||
const FLAG_LABELS: [(&Prs10PowerLampFlags, &str); 8] = [
|
||||
(&Prs10PowerLampFlags::ELEC_VOLTAGE_LOW, "elec_voltage_low"),
|
||||
(&Prs10PowerLampFlags::ELEC_VOLTAGE_HIGH, "elec_voltage_high"),
|
||||
(&Prs10PowerLampFlags::HEAT_VOLTAGE_LOW, "heat_voltage_low"),
|
||||
(&Prs10PowerLampFlags::HEAT_VOLTAGE_HIGH, "heat_voltage_high"),
|
||||
(&Prs10PowerLampFlags::LAMP_LIGHT_LOW, "lamp_light_low"),
|
||||
(&Prs10PowerLampFlags::LAMP_LIGHT_HIGH, "lamp_light_high"),
|
||||
(&Prs10PowerLampFlags::GATE_VOLTAGE_LOW, "gate_voltage_low"),
|
||||
(&Prs10PowerLampFlags::GATE_VOLTAGE_HIGH, "gate_voltage_high"),
|
||||
];
|
||||
|
||||
let no_tags = Arc::new(vec![]);
|
||||
|
||||
// Generate metrics based on flag availability
|
||||
FLAG_LABELS
|
||||
.iter()
|
||||
.map(|(flag, label)| {
|
||||
// We track whether each flag is set (true) or not (false)
|
||||
SourceMetric::new_bool(*label, self.contains(**flag), no_tags.clone())
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
bitflags! {
|
||||
pub struct Prs10RfFlags: u8 {
|
||||
const PLL_UNLOCK = (1<<0);
|
||||
const XTAL_VAR_LOW = (1<<1);
|
||||
const XTAL_VAR_HIGH = (1<<2);
|
||||
const VCO_CTRL_LOW = (1<<3);
|
||||
const VCO_CTRL_HIGH = (1<<4);
|
||||
const AGC_CTRL_LOW = (1<<5);
|
||||
const AGC_CTRL_HIGH = (1<<6);
|
||||
const PLL_BAD_PARAM = (1<<7);
|
||||
}
|
||||
}
|
||||
|
||||
bitflags! {
|
||||
pub struct Prs10TempFlags: u8 {
|
||||
const LAMP_TEMP_LOW = (1<<0);
|
||||
const LAMP_TEMP_HIGH = (1<<1);
|
||||
const XTAL_TEMP_LOW = (1<<2);
|
||||
const XTAL_TEMP_HIGH = (1<<3);
|
||||
const CELL_TEMP_LOW = (1<<4);
|
||||
const CELL_TEMP_HIGH = (1<<5);
|
||||
const CASE_TEMP_LOW = (1<<6);
|
||||
const CASE_TEMP_HIGH = (1<<7);
|
||||
}
|
||||
}
|
||||
|
||||
bitflags! {
|
||||
pub struct Prs10FllFlags: u8 {
|
||||
const FLL_OFF = (1<<0);
|
||||
const FLL_DISABLED = (1<<1);
|
||||
const EFC_HIGH = (1<<2);
|
||||
const EFC_LOW = (1<<3);
|
||||
const CAL_VOLTAGE_HIGH = (1<<4);
|
||||
const CAL_VOLTAGE_LOW = (1<<5);
|
||||
}
|
||||
}
|
||||
|
||||
bitflags! {
|
||||
pub struct Prs10PpsFlags: u8 {
|
||||
const PLL_DISABLED = (1<<0);
|
||||
const PPS_WARMUP = (1<<1);
|
||||
const PLL_ACTIVE = (1<<2);
|
||||
const PPS_BAD = (1<<3);
|
||||
const PPS_INTERVAL_LONG = (1<<4);
|
||||
const PLL_RESTART = (1<<5);
|
||||
const PLL_SATURATED = (1<<6);
|
||||
const PPS_MISSING = (1<<7);
|
||||
}
|
||||
}
|
||||
|
||||
bitflags! {
|
||||
pub struct Prs10SystemFlags: u8 {
|
||||
const LAMP_RESTART = (1<<0);
|
||||
const WDT_RESET = (1<<1);
|
||||
const BAD_INT_VECTOR = (1<<2);
|
||||
const EEPROM_WRITE_FAIL = (1<<3);
|
||||
const EEPROM_CORRUPT = (1<<4);
|
||||
const BAD_COMMAND = (1<<5);
|
||||
const BAD_COMMAND_PARAM = (1<<6);
|
||||
const SYSTEM_RESET = (1<<7);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Eq, PartialEq)]
|
||||
pub struct Prs10Status {
|
||||
pub volt_lamp_flags: Prs10PowerLampFlags,
|
||||
pub rf_flags: Prs10RfFlags,
|
||||
pub temp_flags: Prs10TempFlags,
|
||||
pub fll_flags: Prs10FllFlags,
|
||||
pub pps_flags: Prs10PpsFlags,
|
||||
pub system_flags: Prs10SystemFlags,
|
||||
}
|
||||
|
||||
impl Default for Prs10Status {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
volt_lamp_flags: Prs10PowerLampFlags::empty(),
|
||||
rf_flags: Prs10RfFlags::empty(),
|
||||
temp_flags: Prs10TempFlags::empty(),
|
||||
fll_flags: Prs10FllFlags::empty(),
|
||||
pps_flags: Prs10PpsFlags::empty(),
|
||||
system_flags: Prs10SystemFlags::empty(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl SourceReportDetails for Prs10Status {
|
||||
fn is_healthy(&self) -> bool {
|
||||
const HEALTHY_PPS: Prs10PpsFlags = Prs10PpsFlags { bits: 4 };
|
||||
self.volt_lamp_flags.is_empty()
|
||||
&& self.rf_flags.is_empty()
|
||||
&& self.temp_flags.is_empty()
|
||||
&& self.fll_flags.is_empty()
|
||||
&& self.pps_flags == HEALTHY_PPS
|
||||
}
|
||||
|
||||
fn to_metrics(&self) -> Vec<SourceMetric> {
|
||||
let no_tags = Arc::new(vec![]);
|
||||
vec![
|
||||
SourceMetric::new_int(
|
||||
"volt_lamp_flags",
|
||||
self.volt_lamp_flags.bits() as i64,
|
||||
no_tags.clone(),
|
||||
),
|
||||
SourceMetric::new_int("rf_flags", self.rf_flags.bits() as i64, no_tags.clone()),
|
||||
SourceMetric::new_int("temp_flags", self.temp_flags.bits() as i64, no_tags.clone()),
|
||||
SourceMetric::new_int("fll_flags", self.fll_flags.bits() as i64, no_tags.clone()),
|
||||
SourceMetric::new_int("pps_flags", self.pps_flags.bits() as i64, no_tags.clone()),
|
||||
// system flags are kind of useless because we can't guarantee they get upstreamed and will only appear once since they are 'event flags'
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<&[u8]> for Prs10Status {
|
||||
type Error = Box<dyn std::error::Error>;
|
||||
fn try_from(value: &[u8]) -> Result<Self, Self::Error> {
|
||||
let (volt_lamp_flags, rf_flags, temp_flags, fll_flags, pps_flags, system_flags) = value
|
||||
.splitn(6, |c| *c == b',')
|
||||
.map(|s| str::from_utf8(s).unwrap().parse::<u8>())
|
||||
.collect_tuple()
|
||||
.ok_or("Not enough parts in ST reply")?;
|
||||
Ok(Self {
|
||||
volt_lamp_flags: Prs10PowerLampFlags::from_bits(volt_lamp_flags?)
|
||||
.ok_or("Invalid bits set ({volt_lamp_flags}) for power/lamp flags")?,
|
||||
rf_flags: Prs10RfFlags::from_bits(rf_flags?)
|
||||
.ok_or("Invalid bits set ({rf_flags}) for RF flags")?,
|
||||
temp_flags: Prs10TempFlags::from_bits(temp_flags?)
|
||||
.ok_or("Invalid bits set ({temp_flags}) for temp flags")?,
|
||||
fll_flags: Prs10FllFlags::from_bits(fll_flags?)
|
||||
.ok_or("Invalid bits set ({fll_flags}) for FLL flags")?,
|
||||
pps_flags: Prs10PpsFlags::from_bits(pps_flags?)
|
||||
.ok_or("Invalid bits set ({pps_flags}) for PPS flags")?,
|
||||
system_flags: Prs10SystemFlags::from_bits(system_flags?)
|
||||
.ok_or("Invalid bits set ({system_flags}) for system flags")?,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Prs10Stats {
|
||||
pub ocxo_efc: u32,
|
||||
pub error_signal_volts: f64,
|
||||
pub detect_signal_volts: f64,
|
||||
pub freq_offset_ppt: u16,
|
||||
pub mag_efc: u16,
|
||||
pub heat_volts: f64,
|
||||
pub elec_volts: f64,
|
||||
pub lamp_fet_drain_volts: f64,
|
||||
pub lamp_fet_gate_volts: f64,
|
||||
pub ocxo_heat_volts: f64,
|
||||
pub cell_heat_volts: f64,
|
||||
pub lamp_heat_volts: f64,
|
||||
pub rb_photo: f64,
|
||||
pub rb_photo_iv: f64,
|
||||
pub case_temp: f64,
|
||||
pub ocxo_therm: f64,
|
||||
pub cell_therm: f64,
|
||||
pub lamp_therm: f64,
|
||||
pub ext_cal_volts: f64,
|
||||
pub analog_gnd_volts: f64,
|
||||
}
|
||||
|
||||
impl SourceReportDetails for Prs10Stats {
|
||||
fn is_healthy(&self) -> bool {
|
||||
true
|
||||
}
|
||||
fn to_metrics(&self) -> Vec<SourceMetric> {
|
||||
let no_tags = Arc::new(vec![]);
|
||||
vec![
|
||||
// Integer Metrics
|
||||
SourceMetric::new_int("ocxo_efc", self.ocxo_efc as i64, no_tags.clone()),
|
||||
// Float Metrics
|
||||
SourceMetric::new_float(
|
||||
"error_signal_volts",
|
||||
self.error_signal_volts,
|
||||
no_tags.clone(),
|
||||
),
|
||||
SourceMetric::new_float(
|
||||
"detect_signal_volts",
|
||||
self.detect_signal_volts,
|
||||
no_tags.clone(),
|
||||
),
|
||||
SourceMetric::new_float("heat_volts", self.heat_volts, no_tags.clone()),
|
||||
SourceMetric::new_float("elec_volts", self.elec_volts, no_tags.clone()),
|
||||
SourceMetric::new_float(
|
||||
"lamp_fet_drain_volts",
|
||||
self.lamp_fet_drain_volts,
|
||||
no_tags.clone(),
|
||||
),
|
||||
SourceMetric::new_float(
|
||||
"lamp_fet_gate_volts",
|
||||
self.lamp_fet_gate_volts,
|
||||
no_tags.clone(),
|
||||
),
|
||||
SourceMetric::new_float("ocxo_heat_volts", self.ocxo_heat_volts, no_tags.clone()),
|
||||
SourceMetric::new_float("cell_heat_volts", self.cell_heat_volts, no_tags.clone()),
|
||||
SourceMetric::new_float("lamp_heat_volts", self.lamp_heat_volts, no_tags.clone()),
|
||||
SourceMetric::new_float("rb_photo", self.rb_photo, no_tags.clone()),
|
||||
SourceMetric::new_float("rb_photo_iv", self.rb_photo_iv, no_tags.clone()),
|
||||
SourceMetric::new_float("case_temp", self.case_temp, no_tags.clone()),
|
||||
SourceMetric::new_float("ocxo_therm", self.ocxo_therm, no_tags.clone()),
|
||||
SourceMetric::new_float("cell_therm", self.cell_therm, no_tags.clone()),
|
||||
SourceMetric::new_float("lamp_therm", self.lamp_therm, no_tags.clone()),
|
||||
SourceMetric::new_float("ext_cal_volts", self.ext_cal_volts, no_tags.clone()),
|
||||
SourceMetric::new_float("analog_gnd_volts", self.analog_gnd_volts, no_tags.clone()),
|
||||
// U16 Metrics (optional, but can be treated as integers)
|
||||
SourceMetric::new_int(
|
||||
"freq_offset_ppt",
|
||||
self.freq_offset_ppt as i64,
|
||||
no_tags.clone(),
|
||||
),
|
||||
SourceMetric::new_int("mag_efc", self.mag_efc as i64, no_tags.clone()),
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Prs10Monitor {
|
||||
rx: ReadHalf<SerialStream>,
|
||||
tx: WriteHalf<SerialStream>,
|
||||
info: OnceCell<Prs10Info>,
|
||||
config: Prs10Config,
|
||||
}
|
||||
|
||||
impl Prs10Monitor {
|
||||
pub fn new(config: Prs10Config) -> Self {
|
||||
let builder = tokio_serial::new(&config.port, config.baud)
|
||||
.timeout(config.timeout)
|
||||
.data_bits(tokio_serial::DataBits::Eight)
|
||||
.parity(tokio_serial::Parity::None)
|
||||
.stop_bits(tokio_serial::StopBits::One)
|
||||
.flow_control(tokio_serial::FlowControl::None);
|
||||
let mut port = SerialStream::open(&builder).expect("Must be able to open serial port");
|
||||
port.set_exclusive(true).expect("Can't lock serial port");
|
||||
info!(
|
||||
"Opened serial port {}@{}",
|
||||
port.name().unwrap(),
|
||||
port.baud_rate().unwrap()
|
||||
);
|
||||
let (rx, tx) = tokio::io::split(port);
|
||||
|
||||
Self {
|
||||
rx,
|
||||
tx,
|
||||
config,
|
||||
info: OnceCell::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn info(&self) -> &Prs10Info {
|
||||
self.info.get().expect("info() used before run()")
|
||||
}
|
||||
|
||||
pub async fn cmd_response(&mut self, cmd: &[u8]) -> Result<Vec<u8>, std::io::Error> {
|
||||
debug!("cmd: `{:?}`", String::from_utf8_lossy(cmd));
|
||||
self.tx.write_all(cmd).await.unwrap();
|
||||
self.tx.write_u8(b'\r').await.unwrap();
|
||||
let mut reader = BufReader::new(&mut self.rx);
|
||||
let mut buf = Vec::new();
|
||||
let read = timeout(self.config.timeout, reader.read_until(b'\r', &mut buf)).await??;
|
||||
buf.truncate(buf.len() - 1); // strip "\r"
|
||||
debug!("cmd response: ({read}) `{buf:?}`");
|
||||
Ok(buf)
|
||||
}
|
||||
|
||||
async fn set_info(&mut self) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let id = self.get_id().await?;
|
||||
self.info.set(id);
|
||||
debug!("Set info to {:?}", self.info);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn get_status(&mut self) -> Result<Prs10Status, Box<dyn std::error::Error>> {
|
||||
debug!("Getting status");
|
||||
let resp = self.cmd_response(b"ST?").await?;
|
||||
let status = resp.as_slice().try_into();
|
||||
debug!("Got: {status:?}");
|
||||
status
|
||||
}
|
||||
|
||||
pub async fn get_id(&mut self) -> Result<Prs10Info, Box<dyn std::error::Error>> {
|
||||
debug!("Getting identity");
|
||||
let resp = self.cmd_response(b"ID?").await?;
|
||||
let id = resp.as_slice().try_into();
|
||||
debug!("Got: {id:?}");
|
||||
id
|
||||
}
|
||||
|
||||
pub async fn get_analog(&mut self, id: u4) -> Result<f64, Box<dyn std::error::Error>> {
|
||||
debug!("Getting analog value {id}");
|
||||
let mut cmd = b"AD".to_vec();
|
||||
cmd.extend_from_slice(id.to_string().as_bytes());
|
||||
cmd.push(b'?');
|
||||
let resp = self.cmd_response(&cmd).await?;
|
||||
let value = str::from_utf8(&resp)?.parse::<f64>()?;
|
||||
|
||||
Ok(value)
|
||||
}
|
||||
|
||||
pub async fn get_ocxo_efc(&mut self) -> Result<u32, Box<dyn std::error::Error>> {
|
||||
debug!("Getting u16,u16 -> u32 for OCXO EFC value");
|
||||
let resp = self.cmd_response(b"FC?").await?;
|
||||
let values = resp
|
||||
.splitn(2, |c| *c == b',')
|
||||
.map(|s| str::from_utf8(s).unwrap().parse::<u16>())
|
||||
.collect_tuple()
|
||||
.ok_or("Not enough values in response to FC?")?;
|
||||
if let (Ok(high), Ok(low)) = values {
|
||||
Ok((high as u32) << 8 | low as u32)
|
||||
} else {
|
||||
Err("Unparseable numbers in response to FC?".into())
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_float(&mut self, cmd: &[u8]) -> Result<f64, Box<dyn std::error::Error>> {
|
||||
debug!("Getting float value for command {cmd:?}");
|
||||
let resp = self.cmd_response(cmd).await?;
|
||||
let val = str::from_utf8(&resp)?.parse::<f64>()?;
|
||||
Ok(val)
|
||||
}
|
||||
|
||||
async fn status_poll(&mut self) -> Option<ChimemonMessage> {
|
||||
debug!("polling status");
|
||||
let status = self.get_status().await;
|
||||
if let Ok(status) = status {
|
||||
Some(ChimemonMessage::SourceReport(SourceReport {
|
||||
name: "prs10".into(),
|
||||
status: if status.is_healthy() {
|
||||
SourceStatus::Healthy
|
||||
} else {
|
||||
SourceStatus::Unknown
|
||||
},
|
||||
details: Arc::new(status),
|
||||
}))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
async fn stats_poll(&mut self) -> Option<ChimemonMessage> {
|
||||
debug!("polling stats");
|
||||
|
||||
let ocxo_efc = self.get_ocxo_efc().await;
|
||||
|
||||
Some(ChimemonMessage::SourceReport(SourceReport {
|
||||
name: "prs10".into(),
|
||||
status: SourceStatus::Unknown,
|
||||
details: Arc::new(Prs10Stats {}),
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ChimemonSource for Prs10Monitor {
|
||||
async fn run(mut self, chan: ChimemonSourceChannel) {
|
||||
info!("PRS10 task starting");
|
||||
if let Err(e) = self.set_info().await {
|
||||
warn!("Error starting PRS10: {e:?}");
|
||||
return;
|
||||
}
|
||||
info!(
|
||||
"Connected to PRS10 model: {} version: {} serial: {}",
|
||||
self.info().model,
|
||||
self.info().version,
|
||||
self.info().serial
|
||||
);
|
||||
|
||||
let mut status_timer = interval(self.config.status_interval);
|
||||
let mut pps_timer = interval(self.config.stats_interval);
|
||||
|
||||
loop {
|
||||
let msg = select! {
|
||||
_ = status_timer.tick() => {
|
||||
self.status_poll().await
|
||||
},
|
||||
_ = pps_timer.tick() => {
|
||||
self.stats_poll().await
|
||||
}
|
||||
};
|
||||
if let Some(msg) = msg {
|
||||
chan.send(msg).expect("Unable to send to channel");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
mod tests {
|
||||
use crate::prs10::{Prs10Info, Prs10PowerLampFlags, Prs10PpsFlags, Prs10Status};
|
||||
#[test]
|
||||
fn test_info_parse() -> Result<(), Box<dyn std::error::Error>> {
|
||||
const INFO_VECTOR: &[u8] = b"PRS10_3.15_SN_12345";
|
||||
let info: Prs10Info = INFO_VECTOR.try_into()?;
|
||||
assert_eq!(info.model, "PRS10");
|
||||
assert_eq!(info.version, "3.15");
|
||||
assert_eq!(info.serial, "SN_12345");
|
||||
Ok(())
|
||||
}
|
||||
#[test]
|
||||
fn test_status_parse() -> Result<(), Box<dyn std::error::Error>> {
|
||||
//TODO: Add vectors for some more complicated state
|
||||
const STATUS_VECTOR1: &[u8] = b"0,0,0,0,4,0";
|
||||
let status: Prs10Status = STATUS_VECTOR1.try_into()?;
|
||||
let mut expect = Prs10Status::default();
|
||||
expect.pps_flags.set(Prs10PpsFlags::PLL_ACTIVE, true);
|
||||
assert_eq!(status, expect);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
116
src/uccm.rs
116
src/uccm.rs
@@ -1,14 +1,16 @@
|
||||
use crate::{
|
||||
ChimemonMessage, ChimemonSource, ChimemonSourceChannel, Config, SourceMetric,
|
||||
SourceReportDetails, TimeReport,
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use bitflags::bitflags;
|
||||
use byteorder::{BigEndian, ReadBytesExt};
|
||||
use bytes::{Buf, BytesMut};
|
||||
use chimemon::{ChimemonMessage, ChimemonSource, ChimemonSourceChannel, Config, TimeReport};
|
||||
use chrono::{DateTime, Duration, NaiveDateTime, Utc};
|
||||
use figment::value::Map;
|
||||
use influxdb2::models::data_point::DataPointBuilder;
|
||||
use influxdb2::models::DataPoint;
|
||||
use influxdb2::models::data_point::DataPointBuilder;
|
||||
use itertools::Itertools;
|
||||
use log::{debug, info, warn};
|
||||
use std::io::{BufRead, Cursor};
|
||||
use std::str;
|
||||
use std::sync::Arc;
|
||||
@@ -17,6 +19,7 @@ use tokio::join;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::time::sleep;
|
||||
use tokio_serial::{SerialPort, SerialStream};
|
||||
use tracing::{debug, info, warn};
|
||||
|
||||
pub const GPS_EPOCH: i64 = 315964800; // Doesn't seem possible to have a const DateTime object
|
||||
pub type UccmEndian = BigEndian;
|
||||
@@ -43,6 +46,77 @@ pub struct UCCMTODReport {
|
||||
pub flags: UCCMFlags,
|
||||
}
|
||||
|
||||
impl SourceReportDetails for UCCMTODReport {
|
||||
fn is_healthy(&self) -> bool {
|
||||
self.flags.contains(UCCMFlags::OSC_LOCK)
|
||||
&& self.flags.contains(UCCMFlags::HAVE_GPS_TIME)
|
||||
&& !self.flags.contains(UCCMFlags::INIT_UNLOCK)
|
||||
&& !self.flags.contains(UCCMFlags::INIT_NO_SATS)
|
||||
&& !self.flags.contains(UCCMFlags::POWER_FAIL)
|
||||
&& !self.flags.contains(UCCMFlags::NO_GPS_SYNC)
|
||||
&& !self.flags.contains(UCCMFlags::NO_GPS_SYNC2)
|
||||
&& !self.flags.contains(UCCMFlags::NO_ANT)
|
||||
&& !self.flags.contains(UCCMFlags::GPS_LOS)
|
||||
}
|
||||
|
||||
fn to_metrics(&self) -> Vec<SourceMetric> {
|
||||
let no_tags = Arc::new(vec![]);
|
||||
vec![
|
||||
SourceMetric::new_int("leaps", self.leaps as i64, no_tags.clone()),
|
||||
SourceMetric::new_bool(
|
||||
"osc_lock",
|
||||
self.flags.contains(UCCMFlags::OSC_LOCK),
|
||||
no_tags.clone(),
|
||||
),
|
||||
SourceMetric::new_bool(
|
||||
"leap_flag",
|
||||
self.flags.contains(UCCMFlags::LEAP_FLAG),
|
||||
no_tags.clone(),
|
||||
),
|
||||
SourceMetric::new_bool(
|
||||
"init_unlock",
|
||||
self.flags.contains(UCCMFlags::INIT_UNLOCK),
|
||||
no_tags.clone(),
|
||||
),
|
||||
SourceMetric::new_bool(
|
||||
"init_no_sats",
|
||||
self.flags.contains(UCCMFlags::INIT_NO_SATS),
|
||||
no_tags.clone(),
|
||||
),
|
||||
SourceMetric::new_bool(
|
||||
"have_gps_time",
|
||||
self.flags.contains(UCCMFlags::HAVE_GPS_TIME),
|
||||
no_tags.clone(),
|
||||
),
|
||||
SourceMetric::new_bool(
|
||||
"power_fail",
|
||||
self.flags.contains(UCCMFlags::POWER_FAIL),
|
||||
no_tags.clone(),
|
||||
),
|
||||
SourceMetric::new_bool(
|
||||
"no_gps_sync",
|
||||
self.flags.contains(UCCMFlags::NO_GPS_SYNC),
|
||||
no_tags.clone(),
|
||||
),
|
||||
SourceMetric::new_bool(
|
||||
"no_gps_sync2",
|
||||
self.flags.contains(UCCMFlags::NO_GPS_SYNC2),
|
||||
no_tags.clone(),
|
||||
),
|
||||
SourceMetric::new_bool(
|
||||
"ant_fault",
|
||||
self.flags.contains(UCCMFlags::NO_ANT),
|
||||
no_tags.clone(),
|
||||
),
|
||||
SourceMetric::new_bool(
|
||||
"gps_los",
|
||||
self.flags.contains(UCCMFlags::GPS_LOS),
|
||||
no_tags.clone(),
|
||||
),
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
impl UCCMTODReport {
|
||||
pub fn as_builder(&self, measurement: &String, tags: &Map<String, String>) -> DataPointBuilder {
|
||||
let mut builder =
|
||||
@@ -194,6 +268,27 @@ pub struct UCCMStatusReport {
|
||||
pub freq_error: f32,
|
||||
}
|
||||
|
||||
impl SourceReportDetails for UCCMStatusReport {
|
||||
fn is_healthy(&self) -> bool {
|
||||
self.gps_pps_valid
|
||||
}
|
||||
fn to_metrics(&self) -> Vec<SourceMetric> {
|
||||
let no_tags = Arc::new(vec![]);
|
||||
vec![
|
||||
SourceMetric::new_int("tfom", self.tfom as i64, no_tags.clone()),
|
||||
SourceMetric::new_int("ffom", self.ffom as i64, no_tags.clone()),
|
||||
SourceMetric::new_float("gps_phase", self.gps_phase as f64, no_tags.clone()),
|
||||
// TODO: sv info
|
||||
// TOOD: timestamp
|
||||
SourceMetric::new_float("ant_voltage", self.ant_voltage as f64, no_tags.clone()),
|
||||
SourceMetric::new_float("ant_current", self.ant_current as f64, no_tags.clone()),
|
||||
SourceMetric::new_float("temp", self.temp as f64, no_tags.clone()),
|
||||
SourceMetric::new_int("efc_dac", self.efc_dac as i64, no_tags.clone()),
|
||||
SourceMetric::new_float("freq_error", self.freq_error as f64, no_tags.clone()),
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
pub struct UCCMInfo {
|
||||
pub vendor: String,
|
||||
pub model: String,
|
||||
@@ -257,7 +352,8 @@ impl TryFrom<&str> for UCCMLoopDiagReport {
|
||||
"No lines!",
|
||||
))??;
|
||||
let ocxo_val = ocxo_line
|
||||
.split(':').nth(1)
|
||||
.split(':')
|
||||
.nth(1)
|
||||
.ok_or(std::io::Error::new(
|
||||
std::io::ErrorKind::InvalidData,
|
||||
"no colon!",
|
||||
@@ -280,7 +376,10 @@ impl TryFrom<&str> for UCCMGPSSatsReport {
|
||||
"Invalid response (expected `NSATS CNOS`)",
|
||||
))?;
|
||||
let nsats = nsats.parse::<u8>().map_err(|e| {
|
||||
std::io::Error::new(std::io::ErrorKind::InvalidData, format!("Invalid number of sats ({e})"))
|
||||
std::io::Error::new(
|
||||
std::io::ErrorKind::InvalidData,
|
||||
format!("Invalid number of sats ({e})"),
|
||||
)
|
||||
})?;
|
||||
let tracked_svs = cnos
|
||||
.split(',')
|
||||
@@ -428,13 +527,14 @@ impl UCCMMonitor {
|
||||
>= Duration::from_std(self.config.sources.uccm.status_interval)
|
||||
.unwrap()
|
||||
{
|
||||
let mut points = vec![tod
|
||||
.as_builder(
|
||||
let mut points = vec![
|
||||
tod.as_builder(
|
||||
&self.config.sources.uccm.measurement,
|
||||
&self.config.influxdb.tags,
|
||||
)
|
||||
.build()
|
||||
.unwrap()];
|
||||
.unwrap(),
|
||||
];
|
||||
if let Some(loop_diag) = &last_loop_diag {
|
||||
points.push(
|
||||
loop_diag
|
||||
|
||||
Reference in New Issue
Block a user