Compare commits

..

3 Commits

Author SHA1 Message Date
c22ad764e6 improvements 2026-02-04 18:53:05 -08:00
50894fd0e1 refactor config 2026-02-04 14:25:17 -08:00
d31bf8d39e start unpicking influxdb 2026-02-04 13:50:24 -08:00
14 changed files with 773 additions and 917 deletions

232
Cargo.lock generated
View File

@@ -20,15 +20,6 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "ansi_term"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d52a9bb7ec0cf484c551830a7ce27bd20d67eac647e1befb56b0be4ee39a55d2"
dependencies = [
"winapi",
]
[[package]] [[package]]
name = "anstream" name = "anstream"
version = "0.6.21" version = "0.6.21"
@@ -264,7 +255,7 @@ dependencies = [
"tokio-stream", "tokio-stream",
"tokio-util", "tokio-util",
"tracing", "tracing",
"tracing-subscriber 0.3.22", "tracing-subscriber",
] ]
[[package]] [[package]]
@@ -296,7 +287,7 @@ dependencies = [
"num_enum", "num_enum",
"rand", "rand",
"siphasher", "siphasher",
"thiserror 2.0.18", "thiserror",
"tokio", "tokio",
] ]
@@ -449,16 +440,6 @@ dependencies = [
"syn 2.0.114", "syn 2.0.114",
] ]
[[package]]
name = "dashmap"
version = "4.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e77a43b28d0668df09411cb0bc9a8c2adc40f9a048afe863e05fd43251e8e39c"
dependencies = [
"cfg-if",
"num_cpus",
]
[[package]] [[package]]
name = "deranged" name = "deranged"
version = "0.5.5" version = "0.5.5"
@@ -498,12 +479,6 @@ version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "780955b8b195a21ab8e4ac6b60dd1dbdcec1dc6c51c0617964b08c81785e12c9" checksum = "780955b8b195a21ab8e4ac6b60dd1dbdcec1dc6c51c0617964b08c81785e12c9"
[[package]]
name = "dotenv"
version = "0.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f"
[[package]] [[package]]
name = "dyn-clone" name = "dyn-clone"
version = "1.0.20" version = "1.0.20"
@@ -779,12 +754,6 @@ version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
[[package]]
name = "hermit-abi"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c"
[[package]] [[package]]
name = "hex" name = "hex"
version = "0.4.3" version = "0.4.3"
@@ -1019,33 +988,26 @@ dependencies = [
[[package]] [[package]]
name = "influxdb2" name = "influxdb2"
version = "0.3.9" version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a152a72a5d8d387580a1c6df5dc078891ba914d8462da51f7286321680f998b4" checksum = "24cc9f9d9fee9ebda1a77b61769cde513e03ad09607b347602f4ea887657689e"
dependencies = [ dependencies = [
"base64 0.13.1", "base64 0.13.1",
"bytes", "bytes",
"chrono", "chrono",
"csv", "csv",
"dotenv",
"fallible-iterator", "fallible-iterator",
"futures", "futures",
"go-parse-duration", "go-parse-duration",
"influxdb2-derive", "influxdb2-derive",
"influxdb2-structmap", "influxdb2-structmap",
"nom",
"opentelemetry",
"ordered-float", "ordered-float",
"parking_lot", "parking_lot",
"reqwest", "reqwest",
"secrecy",
"serde", "serde",
"serde_json", "serde_json",
"serde_qs",
"smallvec",
"snafu", "snafu",
"tempfile",
"tracing",
"tracing-subscriber 0.2.25",
"url", "url",
] ]
@@ -1206,22 +1168,13 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "matchers"
version = "0.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f099785f7595cc4b4553a174ce30dd7589ef93391ff414dbb67f62392b9e0ce1"
dependencies = [
"regex-automata 0.1.10",
]
[[package]] [[package]]
name = "matchers" name = "matchers"
version = "0.2.0" version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d1525a2a28c7f4fa0fc98bb91ae755d1e2d1505079e05539e35bc876b5d65ae9" checksum = "d1525a2a28c7f4fa0fc98bb91ae755d1e2d1505079e05539e35bc876b5d65ae9"
dependencies = [ dependencies = [
"regex-automata 0.4.13", "regex-automata",
] ]
[[package]] [[package]]
@@ -1236,12 +1189,6 @@ version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a"
[[package]]
name = "minimal-lexical"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"
[[package]] [[package]]
name = "mio" name = "mio"
version = "1.1.1" version = "1.1.1"
@@ -1319,16 +1266,6 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "nom"
version = "7.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a"
dependencies = [
"memchr",
"minimal-lexical",
]
[[package]] [[package]]
name = "nu-ansi-term" name = "nu-ansi-term"
version = "0.50.3" version = "0.50.3"
@@ -1353,16 +1290,6 @@ dependencies = [
"autocfg", "autocfg",
] ]
[[package]]
name = "num_cpus"
version = "1.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "91df4bbde75afed763b708b7eee1e8e7651e02d97f6d5dd763e89367e957b23b"
dependencies = [
"hermit-abi",
"libc",
]
[[package]] [[package]]
name = "num_enum" name = "num_enum"
version = "0.5.11" version = "0.5.11"
@@ -1455,26 +1382,6 @@ dependencies = [
"vcpkg", "vcpkg",
] ]
[[package]]
name = "opentelemetry"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b91cea1dfd50064e52db033179952d18c770cbc5dfefc8eba45d619357ba3914"
dependencies = [
"async-trait",
"dashmap",
"fnv",
"futures",
"js-sys",
"lazy_static",
"percent-encoding",
"pin-project",
"rand",
"thiserror 1.0.69",
"tokio",
"tokio-stream",
]
[[package]] [[package]]
name = "ordered-float" name = "ordered-float"
version = "3.9.2" version = "3.9.2"
@@ -1515,26 +1422,6 @@ version = "2.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220" checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220"
[[package]]
name = "pin-project"
version = "1.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
version = "1.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.114",
]
[[package]] [[package]]
name = "pin-project-lite" name = "pin-project-lite"
version = "0.2.16" version = "0.2.16"
@@ -1678,17 +1565,8 @@ checksum = "843bc0191f75f3e22651ae5f1e72939ab2f72a4bc30fa80a066bd66edefc24d4"
dependencies = [ dependencies = [
"aho-corasick", "aho-corasick",
"memchr", "memchr",
"regex-automata 0.4.13", "regex-automata",
"regex-syntax 0.8.8", "regex-syntax",
]
[[package]]
name = "regex-automata"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132"
dependencies = [
"regex-syntax 0.6.29",
] ]
[[package]] [[package]]
@@ -1699,15 +1577,9 @@ checksum = "5276caf25ac86c8d810222b3dbb938e512c55c6831a10f3e6ed1c93b84041f1c"
dependencies = [ dependencies = [
"aho-corasick", "aho-corasick",
"memchr", "memchr",
"regex-syntax 0.8.8", "regex-syntax",
] ]
[[package]]
name = "regex-syntax"
version = "0.6.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1"
[[package]] [[package]]
name = "regex-syntax" name = "regex-syntax"
version = "0.8.8" version = "0.8.8"
@@ -1829,6 +1701,15 @@ version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]]
name = "secrecy"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9bd1c54ea06cfd2f6b63219704de0b9b4f72dcc2b8fdef820be6cd799780e91e"
dependencies = [
"zeroize",
]
[[package]] [[package]]
name = "security-framework" name = "security-framework"
version = "2.11.1" version = "2.11.1"
@@ -1895,17 +1776,6 @@ dependencies = [
"zmij", "zmij",
] ]
[[package]]
name = "serde_qs"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8cac3f1e2ca2fe333923a1ae72caca910b98ed0630bb35ef6f8c8517d6e81afa"
dependencies = [
"percent-encoding",
"serde",
"thiserror 1.0.69",
]
[[package]] [[package]]
name = "serde_repr" name = "serde_repr"
version = "0.1.20" version = "0.1.20"
@@ -2148,33 +2018,13 @@ dependencies = [
"windows-sys 0.61.2", "windows-sys 0.61.2",
] ]
[[package]]
name = "thiserror"
version = "1.0.69"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52"
dependencies = [
"thiserror-impl 1.0.69",
]
[[package]] [[package]]
name = "thiserror" name = "thiserror"
version = "2.0.18" version = "2.0.18"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4288b5bcbc7920c07a1149a35cf9590a2aa808e0bc1eafaade0b80947865fbc4" checksum = "4288b5bcbc7920c07a1149a35cf9590a2aa808e0bc1eafaade0b80947865fbc4"
dependencies = [ dependencies = [
"thiserror-impl 2.0.18", "thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "1.0.69"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.114",
] ]
[[package]] [[package]]
@@ -2414,48 +2264,16 @@ dependencies = [
"tracing-core", "tracing-core",
] ]
[[package]]
name = "tracing-serde"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc6b213177105856957181934e4920de57730fc69bf42c37ee5bb664d406d9e1"
dependencies = [
"serde",
"tracing-core",
]
[[package]]
name = "tracing-subscriber"
version = "0.2.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e0d2eaa99c3c2e41547cfa109e910a68ea03823cccad4a0525dcbc9b01e8c71"
dependencies = [
"ansi_term",
"chrono",
"lazy_static",
"matchers 0.0.1",
"parking_lot",
"regex",
"serde",
"serde_json",
"sharded-slab",
"smallvec",
"thread_local",
"tracing",
"tracing-core",
"tracing-serde",
]
[[package]] [[package]]
name = "tracing-subscriber" name = "tracing-subscriber"
version = "0.3.22" version = "0.3.22"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2f30143827ddab0d256fd843b7a66d164e9f271cfa0dde49142c5ca0ca291f1e" checksum = "2f30143827ddab0d256fd843b7a66d164e9f271cfa0dde49142c5ca0ca291f1e"
dependencies = [ dependencies = [
"matchers 0.2.0", "matchers",
"nu-ansi-term", "nu-ansi-term",
"once_cell", "once_cell",
"regex-automata 0.4.13", "regex-automata",
"sharded-slab", "sharded-slab",
"smallvec", "smallvec",
"thread_local", "thread_local",
@@ -2486,7 +2304,7 @@ version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4064ed685c487dbc25bd3f0e9548f2e34bab9d18cefc700f9ec2dba74ba1138e" checksum = "4064ed685c487dbc25bd3f0e9548f2e34bab9d18cefc700f9ec2dba74ba1138e"
dependencies = [ dependencies = [
"thiserror 2.0.18", "thiserror",
] ]
[[package]] [[package]]
@@ -3050,6 +2868,12 @@ dependencies = [
"synstructure", "synstructure",
] ]
[[package]]
name = "zeroize"
version = "1.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b97154e67e32c85465826e8bcc1c59429aaaf107c1e4a9e53c8d8ccd5eff88d0"
[[package]] [[package]]
name = "zerotrie" name = "zerotrie"
version = "0.2.3" version = "0.2.3"

View File

@@ -10,7 +10,7 @@ release-logs = ["tracing/max_level_info"]
[dependencies] [dependencies]
serde = "1.0" serde = "1.0"
serde_derive = "1.0" serde_derive = "1.0"
tokio = { version = "1", features = ["rt", "io-util"] } tokio = { version = "1", features = ["rt-multi-thread", "io-util"] }
clap = { version = "4.0", features = ["derive"] } clap = { version = "4.0", features = ["derive"] }
figment = { version = "0.10", features = ["toml"] } figment = { version = "0.10", features = ["toml"] }
futures = "0.3.24" futures = "0.3.24"
@@ -32,7 +32,7 @@ tracing-subscriber = { version = "0.3.22", features = ["fmt", "ansi", "time", "e
serialport = "4.8.1" serialport = "4.8.1"
gethostname = "1.1.0" gethostname = "1.1.0"
bitflags = "2.10.0" bitflags = "2.10.0"
influxdb2 = "0.3.9" influxdb2 = { version = "0.5.2" }
chrono = "0.4.43" chrono = "0.4.43"
serde_with = "3.16.1" serde_with = "3.16.1"
ctrlc = "3.5.1" ctrlc = "3.5.1"

View File

@@ -1,6 +1,6 @@
FROM rust:slim as builder FROM rust:slim AS builder
RUN apt-get update && apt-get install -y libssl-dev pkg-config RUN apt-get update && apt-get install -y libudev-dev pkg-config
# build deps only first for build cache # build deps only first for build cache
WORKDIR /usr/src WORKDIR /usr/src
RUN USER=root cargo new chimemon RUN USER=root cargo new chimemon
@@ -11,6 +11,7 @@ COPY . .
RUN cargo build --release RUN cargo build --release
FROM debian:bullseye-slim FROM debian:bullseye-slim
RUN apt-get update && apt-get install -y libudev
WORKDIR /app WORKDIR /app
COPY --from=builder /usr/src/chimemon/target/release/chimemon chimemon COPY --from=builder /usr/src/chimemon/target/release/chimemon chimemon
CMD ["/app/chimemon"] CMD ["/app/chimemon"]

214
src/config.rs Normal file
View File

@@ -0,0 +1,214 @@
use figment::{Provider, providers::Serialized, util::map, value::Map};
use gethostname::gethostname;
use serde_derive::{Deserialize, Serialize};
use serde_with::{DurationSeconds, serde_as};
#[derive(Serialize, Deserialize, Clone)]
#[serde(default)]
pub struct InfluxConfig {
pub enabled: bool,
pub timeout: std::time::Duration,
pub url: String,
pub org: String,
pub bucket: String,
pub token: String,
pub tags: Map<String, String>,
}
impl Default for InfluxConfig {
fn default() -> Self {
let host = gethostname().into_string().unwrap();
InfluxConfig {
enabled: false,
timeout: std::time::Duration::from_secs(10),
url: "http://localhost:8086".into(),
org: "default".into(),
bucket: "default".into(),
token: "".into(),
tags: map! { "host".into() => host },
}
}
}
#[serde_as]
#[derive(Serialize, Deserialize, Clone)]
#[serde(default)]
pub struct ChronyConfig {
pub enabled: bool,
#[serde_as(as = "DurationSeconds<u64>")]
pub timeout: std::time::Duration,
#[serde_as(as = "DurationSeconds<u64>")]
pub tracking_interval: std::time::Duration,
#[serde_as(as = "DurationSeconds<u64>")]
pub sources_interval: std::time::Duration,
pub measurement_prefix: String,
pub tracking_measurement: String,
pub sources_measurement: String,
pub host: String,
}
impl Default for ChronyConfig {
fn default() -> Self {
ChronyConfig {
enabled: false,
timeout: std::time::Duration::from_secs(5),
tracking_interval: std::time::Duration::from_secs(60),
sources_interval: std::time::Duration::from_secs(300),
measurement_prefix: "chrony.".into(),
tracking_measurement: "tracking".into(),
sources_measurement: "sources".into(),
host: "127.0.0.1:323".into(),
}
}
}
#[derive(Serialize, Deserialize, Clone)]
#[serde(default)]
pub struct ChronySockConfig {
pub enabled: bool,
pub sock: String,
}
impl Default for ChronySockConfig {
fn default() -> Self {
ChronySockConfig {
enabled: false,
sock: "".into(),
}
}
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct HwmonSensorConfig {
pub device: String,
pub sensor: String,
}
#[serde_as]
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(default)]
pub struct HwmonConfig {
pub enabled: bool,
#[serde_as(as = "DurationSeconds<u64>")]
pub interval: std::time::Duration,
pub measurement: String,
pub sensors: Map<String, HwmonSensorConfig>,
}
impl Default for HwmonConfig {
fn default() -> Self {
HwmonConfig {
enabled: false,
interval: std::time::Duration::from_secs(60),
measurement: "hwmon".into(),
sensors: map! {},
}
}
}
#[serde_as]
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(default)]
pub struct GpsdConfig {
pub enabled: bool,
#[serde_as(as = "DurationSeconds<u64>")]
pub interval: std::time::Duration,
pub host: String,
}
impl Default for GpsdConfig {
fn default() -> Self {
GpsdConfig {
enabled: false,
interval: std::time::Duration::from_secs(60),
host: "localhost:2947".into(),
}
}
}
#[serde_as]
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(default)]
pub struct Prs10Config {
pub enabled: bool,
pub port: String,
pub baud: u32,
#[serde_as(as = "DurationSeconds<u64>")]
pub timeout: std::time::Duration,
#[serde_as(as = "DurationSeconds<u64>")]
pub status_interval: std::time::Duration,
#[serde_as(as = "DurationSeconds<u64>")]
pub stats_interval: std::time::Duration,
}
impl Default for Prs10Config {
fn default() -> Self {
Prs10Config {
enabled: false,
port: "/dev/ttyS0".into(),
baud: 9600,
timeout: std::time::Duration::from_secs(1),
status_interval: std::time::Duration::from_secs(10),
stats_interval: std::time::Duration::from_secs(30),
}
}
}
#[serde_as]
#[derive(Serialize, Deserialize, Clone)]
#[serde(default)]
pub struct UCCMConfig {
pub enabled: bool,
pub port: String,
pub baud: u32,
#[serde_as(as = "DurationSeconds<u64>")]
pub status_interval: std::time::Duration,
#[serde_as(as = "DurationSeconds<u64>")]
pub timeout: std::time::Duration,
pub measurement: String,
}
impl Default for UCCMConfig {
fn default() -> Self {
UCCMConfig {
enabled: false,
port: "/dev/ttyS0".into(),
baud: 57600,
status_interval: std::time::Duration::from_secs(10),
timeout: std::time::Duration::from_secs(1),
measurement: "uccm_gpsdo".into(),
}
}
}
#[derive(Serialize, Deserialize, Clone)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum SourceConfig {
Chrony(ChronyConfig),
Hwmon(HwmonConfig),
Uccm(UCCMConfig),
Gpsd(GpsdConfig),
Prs10(Prs10Config),
}
#[derive(Serialize, Deserialize, Clone)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum TargetConfig {
ChronySock(ChronySockConfig),
Influxdb(InfluxConfig),
}
#[derive(Serialize, Deserialize, Clone, Default)]
pub struct Config {
pub sources: Map<String, SourceConfig>,
pub targets: Map<String, TargetConfig>,
}
impl Provider for Config {
fn metadata(&self) -> figment::Metadata {
figment::Metadata::named("Default config")
}
fn data(&self) -> Result<Map<figment::Profile, figment::value::Dict>, figment::Error> {
Serialized::defaults(Config::default()).data()
}
}

View File

@@ -1,3 +1,4 @@
pub mod config;
pub mod sources; pub mod sources;
pub mod targets; pub mod targets;
@@ -11,168 +12,11 @@ macro_rules! fatal {
use async_trait::async_trait; use async_trait::async_trait;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use figment::{
Figment, Provider, use tokio::sync::broadcast;
providers::{Format, Serialized, Toml},
util::map,
value::Map,
};
use gethostname::gethostname;
use influxdb2::models::DataPoint;
use serde_derive::{Deserialize, Serialize};
use serde_with::{DurationSeconds, serde_as};
use tokio::sync::broadcast::*;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use std::{fmt::Debug, path::Path, sync::Arc}; use std::{fmt::Debug, sync::Arc};
#[derive(Serialize, Deserialize, Clone)]
pub struct InfluxConfig {
pub enabled: bool,
pub url: String,
pub org: String,
pub bucket: String,
pub token: String,
pub tags: Map<String, String>,
}
impl Default for InfluxConfig {
fn default() -> Self {
let host = gethostname().into_string().unwrap();
InfluxConfig {
enabled: false,
url: "http://localhost:8086".into(),
org: "default".into(),
bucket: "default".into(),
token: "".into(),
tags: map! { "host".into() => host },
}
}
}
#[serde_as]
#[derive(Serialize, Deserialize, Clone)]
#[serde(default)]
pub struct ChronyConfig {
pub enabled: bool,
#[serde_as(as = "DurationSeconds<u64>")]
pub timeout: std::time::Duration,
#[serde_as(as = "DurationSeconds<u64>")]
pub tracking_interval: std::time::Duration,
#[serde_as(as = "DurationSeconds<u64>")]
pub sources_interval: std::time::Duration,
pub measurement_prefix: String,
pub tracking_measurement: String,
pub sources_measurement: String,
pub host: String,
}
impl Default for ChronyConfig {
fn default() -> Self {
ChronyConfig {
enabled: false,
timeout: std::time::Duration::from_secs(5),
tracking_interval: std::time::Duration::from_secs(60),
sources_interval: std::time::Duration::from_secs(300),
measurement_prefix: "chrony.".into(),
tracking_measurement: "tracking".into(),
sources_measurement: "sources".into(),
host: "127.0.0.1:323".into(),
}
}
}
#[derive(Serialize, Deserialize, Clone)]
#[serde(default)]
pub struct ChronySockConfig {
pub enabled: bool,
pub sock: String,
}
impl Default for ChronySockConfig {
fn default() -> Self {
ChronySockConfig {
enabled: false,
sock: "".into(),
}
}
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct HwmonSensorConfig {
pub device: String,
pub sensor: String,
}
#[serde_as]
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(default)]
pub struct HwmonConfig {
pub enabled: bool,
#[serde_as(as = "DurationSeconds<u64>")]
pub interval: std::time::Duration,
pub measurement: String,
pub sensors: Map<String, HwmonSensorConfig>,
}
impl Default for HwmonConfig {
fn default() -> Self {
HwmonConfig {
enabled: false,
interval: std::time::Duration::from_secs(60),
measurement: "hwmon".into(),
sensors: map! {},
}
}
}
#[serde_as]
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(default)]
pub struct GpsdConfig {
pub enabled: bool,
#[serde_as(as = "DurationSeconds<u64>")]
pub interval: std::time::Duration,
pub host: String,
}
impl Default for GpsdConfig {
fn default() -> Self {
GpsdConfig {
enabled: false,
interval: std::time::Duration::from_secs(60),
host: "localhost:2947".into(),
}
}
}
#[serde_as]
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(default)]
pub struct Prs10Config {
pub enabled: bool,
pub port: String,
pub baud: u32,
#[serde_as(as = "DurationSeconds<u64>")]
pub timeout: std::time::Duration,
#[serde_as(as = "DurationSeconds<u64>")]
pub status_interval: std::time::Duration,
#[serde_as(as = "DurationSeconds<u64>")]
pub stats_interval: std::time::Duration,
}
impl Default for Prs10Config {
fn default() -> Self {
Prs10Config {
enabled: false,
port: "/dev/ttyS0".into(),
baud: 9600,
timeout: std::time::Duration::from_secs(1),
status_interval: std::time::Duration::from_secs(10),
stats_interval: std::time::Duration::from_secs(30),
}
}
}
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct TimeReport { pub struct TimeReport {
@@ -206,37 +50,39 @@ type MetricTags = Vec<MetricTag>;
pub struct SourceMetric { pub struct SourceMetric {
name: &'static str, name: &'static str,
value: MetricValue, value: MetricValue,
tags: Arc<MetricTags>,
} }
impl SourceMetric { impl SourceMetric {
pub fn new_int(name: &'static str, value: i64, tags: Arc<MetricTags>) -> Self { pub fn new_int(name: &'static str, value: i64) -> Self {
Self { Self {
name: name, name: name,
value: MetricValue::Int(value), value: MetricValue::Int(value),
tags,
} }
} }
pub fn new_float(name: &'static str, value: f64, tags: Arc<MetricTags>) -> Self { pub fn new_float(name: &'static str, value: f64) -> Self {
Self { Self {
name: name, name: name,
value: MetricValue::Float(value), value: MetricValue::Float(value),
tags,
} }
} }
pub fn new_bool(name: &'static str, value: bool, tags: Arc<MetricTags>) -> Self { pub fn new_bool(name: &'static str, value: bool) -> Self {
Self { Self {
name: name, name: name,
value: MetricValue::Bool(value), value: MetricValue::Bool(value),
tags,
} }
} }
} }
#[derive(Debug)]
pub struct SourceMetricSet {
metrics: Vec<SourceMetric>,
tags: Arc<MetricTags>,
}
pub trait SourceReportDetails: Debug + Send + Sync { pub trait SourceReportDetails: Debug + Send + Sync {
fn to_metrics(&self) -> Vec<SourceMetric>; fn to_metrics(&self) -> Vec<SourceMetricSet>;
fn is_healthy(&self) -> bool; fn is_healthy(&self) -> bool;
} }
@@ -244,106 +90,15 @@ pub trait SourceReportDetails: Debug + Send + Sync {
pub struct SourceReport { pub struct SourceReport {
pub name: String, pub name: String,
pub status: SourceStatus, pub status: SourceStatus,
pub details: Arc<dyn SourceReportDetails>, pub details: Arc<dyn SourceReportDetails + Send + Sync>,
}
#[serde_as]
#[derive(Serialize, Deserialize, Clone)]
#[serde(default)]
pub struct UCCMConfig {
pub enabled: bool,
pub port: String,
pub baud: u32,
#[serde_as(as = "DurationSeconds<u64>")]
pub status_interval: std::time::Duration,
#[serde_as(as = "DurationSeconds<u64>")]
pub timeout: std::time::Duration,
pub measurement: String,
}
impl Default for UCCMConfig {
fn default() -> Self {
UCCMConfig {
enabled: false,
port: "/dev/ttyS0".into(),
baud: 57600,
status_interval: std::time::Duration::from_secs(10),
timeout: std::time::Duration::from_secs(1),
measurement: "uccm_gpsdo".into(),
}
}
}
#[derive(Serialize, Deserialize, Clone, Default)]
pub struct SourcesConfig {
pub chrony: ChronyConfig,
pub hwmon: HwmonConfig,
pub uccm: UCCMConfig,
pub gpsd: GpsdConfig,
pub prs10: Prs10Config,
}
#[derive(Serialize, Deserialize, Clone)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum SourceConfig {
Chrony(ChronyConfig),
Hwmon(HwmonConfig),
Uccm(UCCMConfig),
Gpsd(GpsdConfig),
Prs10(Prs10Config),
}
#[derive(Serialize, Deserialize, Clone)]
pub struct NamedSourceConfig {
pub name: String,
#[serde(flatten)]
pub source: SourceConfig,
}
#[derive(Serialize, Deserialize, Clone, Default)]
pub struct TargetsConfig {
pub chrony: ChronySockConfig,
}
#[derive(Serialize, Deserialize, Clone, Default)]
pub struct Config {
pub influxdb: InfluxConfig,
pub sources: Vec<NamedSourceConfig>,
pub targets: TargetsConfig,
}
impl Provider for Config {
fn metadata(&self) -> figment::Metadata {
figment::Metadata::named("Default config")
}
fn data(&self) -> Result<Map<figment::Profile, figment::value::Dict>, figment::Error> {
Serialized::defaults(Config::default()).data()
}
}
pub fn load_config(filename: &Path) -> Figment {
Figment::from(Serialized::defaults(Config::default())).merge(Toml::file(filename))
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum ChimemonMessage { pub enum ChimemonMessage {
DataPoint(DataPoint),
DataPoints(Vec<DataPoint>),
TimeReport(TimeReport), TimeReport(TimeReport),
SourceReport(SourceReport), SourceReport(SourceReport),
} }
impl From<DataPoint> for ChimemonMessage {
fn from(dp: DataPoint) -> Self {
ChimemonMessage::DataPoint(dp)
}
}
impl From<Vec<DataPoint>> for ChimemonMessage {
fn from(dps: Vec<DataPoint>) -> Self {
ChimemonMessage::DataPoints(dps)
}
}
impl From<TimeReport> for ChimemonMessage { impl From<TimeReport> for ChimemonMessage {
fn from(tr: TimeReport) -> Self { fn from(tr: TimeReport) -> Self {
ChimemonMessage::TimeReport(tr) ChimemonMessage::TimeReport(tr)
@@ -356,17 +111,21 @@ impl From<SourceReport> for ChimemonMessage {
} }
} }
pub type ChimemonSourceChannel = Sender<ChimemonMessage>; pub type ChimemonSourceChannel = broadcast::Sender<ChimemonMessage>;
pub type ChimemonTargetChannel = Receiver<ChimemonMessage>; pub type ChimemonTargetChannel = broadcast::Receiver<ChimemonMessage>;
#[async_trait] #[async_trait]
pub trait ChimemonSource { pub trait ChimemonSource {
type Config; type Config;
const TASK_NAME: &'static str;
fn new(name: &str, config: Self::Config) -> Self; fn new(name: &str, config: Self::Config) -> Self;
async fn run(self, chan: ChimemonSourceChannel, cancel: CancellationToken); async fn run(self, chan: ChimemonSourceChannel, cancel: CancellationToken);
} }
#[async_trait] #[async_trait]
pub trait ChimemonTarget { pub trait ChimemonTarget {
async fn run(self, chan: ChimemonTargetChannel, cancel: CancellationToken); type Config;
const TASK_NAME: &'static str;
fn new(name: &str, config: Self::Config) -> Self;
async fn run(self, mut chan: ChimemonTargetChannel, cancel: CancellationToken);
} }

View File

@@ -1,16 +1,23 @@
use std::sync::Arc;
use async_trait::async_trait;
use clap::{Parser, ValueEnum}; use clap::{Parser, ValueEnum};
use figment::{ use figment::{
Figment, Figment,
providers::{Format, Toml}, providers::{Format, Toml},
}; };
use futures::future::join_all; use futures::future::join_all;
use std::path::Path;
use tokio::{select, sync::broadcast, task::JoinHandle}; use tokio::{select, sync::broadcast, task::JoinHandle};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing::{Instrument, debug, error, info, info_span, warn}; use tracing::{Instrument, debug, error, info, info_span, warn};
use tracing_subscriber::{self, EnvFilter, fmt::format::FmtSpan}; use tracing_subscriber::{self, EnvFilter, fmt::format::FmtSpan};
use chimemon::*; use chimemon::{
config::{SourceConfig, TargetConfig},
targets::influx::InfluxTarget,
*,
};
use config::Config;
use sources::{ use sources::{
chrony::ChronyClient, gpsd::GpsdSource, hwmon::HwmonSource, prs10::Prs10Monitor, chrony::ChronyClient, gpsd::GpsdSource, hwmon::HwmonSource, prs10::Prs10Monitor,
uccm::UCCMMonitor, uccm::UCCMMonitor,
@@ -52,72 +59,119 @@ struct Args {
config_file: String, config_file: String,
#[arg(value_enum, default_value_t = Level::Info)] #[arg(value_enum, default_value_t = Level::Info)]
log_level: Level, log_level: Level,
#[arg(short, long, default_value_t = false)]
echo_task: bool,
} }
fn run_source( fn run_source(
config: NamedSourceConfig, name: &str,
source: SourceConfig,
chan: ChimemonSourceChannel, chan: ChimemonSourceChannel,
shutdown: CancellationToken, cancel: CancellationToken,
) -> Option<JoinHandle<()>> { ) -> Option<JoinHandle<()>> {
let NamedSourceConfig { name, source } = config;
match source { match source {
SourceConfig::Chrony(source_config) if source_config.enabled => { SourceConfig::Chrony(cfg) if cfg.enabled => {
let c = ChronyClient::new(&name, source_config); spawn_source::<ChronyClient>(name, cfg, chan, cancel)
Some(tokio::spawn(
c.run(chan, shutdown).instrument(info_span!("chrony-task")),
))
} }
SourceConfig::Gpsd(source_config) if source_config.enabled => { SourceConfig::Gpsd(cfg) if cfg.enabled => {
let c = GpsdSource::new(&name, source_config); spawn_source::<GpsdSource>(name, cfg, chan, cancel)
Some(tokio::spawn(
c.run(chan, shutdown).instrument(info_span!("gpsd-task")),
))
} }
SourceConfig::Hwmon(source_config) if source_config.enabled => { SourceConfig::Hwmon(cfg) if cfg.enabled => {
let c = HwmonSource::new(&name, source_config); spawn_source::<HwmonSource>(name, cfg, chan, cancel)
Some(tokio::spawn(
c.run(chan, shutdown).instrument(info_span!("hwmon-task")),
))
} }
SourceConfig::Prs10(source_config) if source_config.enabled => { SourceConfig::Prs10(cfg) if cfg.enabled => {
let c = Prs10Monitor::new(&name, source_config); spawn_source::<Prs10Monitor>(name, cfg, chan, cancel)
Some(tokio::spawn(
c.run(chan, shutdown).instrument(info_span!("prs10-task")),
))
} }
SourceConfig::Uccm(source_config) if source_config.enabled => { SourceConfig::Uccm(cfg) if cfg.enabled => {
let c = UCCMMonitor::new(&name, source_config); spawn_source::<UCCMMonitor>(name, cfg, chan, cancel)
Some(tokio::spawn( }
c.run(chan, shutdown).instrument(info_span!("uccm-task")), _ => {
)) debug!("Disabled source {name} skipped");
None
} }
_ => None,
} }
} }
async fn dummy_consumer(mut chan: ChimemonTargetChannel, cancel: CancellationToken) { fn run_target(
name: &str,
target: TargetConfig,
chan: ChimemonTargetChannel,
cancel: CancellationToken,
) -> Option<JoinHandle<()>> {
match target {
TargetConfig::ChronySock(cfg) if cfg.enabled => {
spawn_target::<ChronySockServer>(name, cfg, chan, cancel)
}
TargetConfig::Influxdb(cfg) if cfg.enabled => {
spawn_target::<InfluxTarget>(name, cfg, chan, cancel)
}
_ => {
debug!("Disabled target {name} skipped");
None
}
}
}
fn spawn_source<T: ChimemonSource + Send + Sync + 'static>(
name: &str,
config: T::Config,
chan: ChimemonSourceChannel,
cancel: CancellationToken,
) -> Option<JoinHandle<()>> {
let span = info_span!("source", task = name);
let s = T::new(name, config);
Some(tokio::spawn(s.run(chan, cancel).instrument(span)))
}
fn spawn_target<T: ChimemonTarget + Send + Sync + 'static>(
name: &str,
config: T::Config,
chan: ChimemonTargetChannel,
cancel: CancellationToken,
) -> Option<JoinHandle<()>> {
let span = info_span!("target", task = name);
let t = T::new(name, config);
Some(tokio::spawn(async move {
t.run(chan, cancel).instrument(span).await
}))
}
struct EchoTarget {}
struct EchoTargetConfig {}
#[async_trait]
impl ChimemonTarget for EchoTarget {
type Config = EchoTargetConfig;
const TASK_NAME: &'static str = "echo-task";
fn new(_name: &str, _config: Self::Config) -> Self {
EchoTarget {}
}
async fn run(self, mut chan: ChimemonTargetChannel, cancel: CancellationToken) {
info!("Dummy receiver task started"); info!("Dummy receiver task started");
loop { loop {
select! { let msg = select! {
_ = cancel.cancelled() => { _ = cancel.cancelled() => {
return return
}, },
Ok(m) = chan.recv() => { msg = chan.recv() => msg
match m { };
ChimemonMessage::SourceReport(report) => { match msg {
Ok(ChimemonMessage::SourceReport(report)) => {
let metrics = report.details.to_metrics(); let metrics = report.details.to_metrics();
info!("instance: {} metrics: {metrics:?}", report.name); info!("instance: {} metrics: {metrics:?}", report.name);
} }
msg => { Ok(msg) => {
info!("message: {msg:?}"); info!("message: {msg:?}");
} }
Err(e) => {
warn!(error = ?e, "Error receiving message");
} }
} }
} }
} }
} }
#[tokio::main(flavor = "current_thread")] #[tokio::main(flavor = "multi_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error>> { async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args = Args::parse(); let args = Args::parse();
tracing_subscriber::fmt() tracing_subscriber::fmt()
@@ -137,93 +191,46 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config: Config = fig.extract()?; let config: Config = fig.extract()?;
let mut tasks = Vec::new(); let mut tasks = Vec::new();
let (tx, _) = broadcast::channel(16); let (sourcechan, _) = broadcast::channel(16);
let sourcechan: ChimemonSourceChannel = tx;
let shutdown_token = CancellationToken::new(); let shutdown_token = CancellationToken::new();
if config.influxdb.enabled { for (name, target) in config.targets {
info!( if let Some(task) = run_target(
"Connecting to influxdb {} org: {} using token", &name,
&config.influxdb.url, &config.influxdb.org target,
); sourcechan.subscribe(),
let config = config.clone(); shutdown_token.clone(),
let influx = influxdb2::Client::new( ) {
&config.influxdb.url,
&config.influxdb.org,
&config.influxdb.token,
);
let mut influx_rx = sourcechan.subscribe();
let influx_cancel = shutdown_token.clone();
tasks.push(tokio::spawn(
async move {
let stream = async_stream::stream! {
while let Ok(msg) = influx_rx.recv().await {
match msg {
ChimemonMessage::DataPoint(dp) => {
yield dp
},
ChimemonMessage::DataPoints(dps) => {
for p in dps {
yield p
}
},
_ => {}
}
}
};
select! {
_ = influx_cancel.cancelled() => {
return
},
res = influx.write(&config.influxdb.bucket, stream) => {
match res {
Err(e) => error!("Error writing to influx: {}", e.to_string()),
_ => warn!("Unexpectedly shutting down influx task"),
}
},
}
}
.instrument(info_span!("influx-task")),
));
}
for source in config.sources {
if let Some(task) = run_source(source, sourcechan.clone(), shutdown_token.clone()) {
tasks.push(task) tasks.push(task)
} }
} }
let chrony_refclock = if config.targets.chrony.enabled { for (name, source) in config.sources {
Some(ChronySockServer::new(config.targets.chrony.to_owned())) if let Some(task) = run_source(&name, source, sourcechan.clone(), shutdown_token.clone()) {
} else { tasks.push(task)
None }
}; }
if let Some(chrony_refclock) = chrony_refclock {
tasks.push(tokio::spawn(
chrony_refclock
.run(sourcechan.subscribe(), shutdown_token.clone())
.instrument(info_span!("chrony-refclock-task")),
));
};
if tasks.len() == 0 { if tasks.len() == 0 {
error!("No tasks configured, exiting."); error!("No tasks configured, exiting.");
return Ok(()); // not an error, but exit before starting a dummy task return Ok(()); // not an error, but exit before starting a dummy task
} }
if sourcechan.strong_count() == 0 {
warn!("No sources configured, no events will be generated");
}
if sourcechan.receiver_count() == 0 { if sourcechan.receiver_count() == 0 {
warn!("No consumers configured, events will be discarded"); warn!("No targets configured, events will be discarded");
tasks.push(tokio::spawn( }
dummy_consumer(sourcechan.subscribe(), shutdown_token.clone()) if args.echo_task || sourcechan.receiver_count() == 0 {
.instrument(info_span!("dummy-consumer-task")), let c = EchoTargetConfig {};
)); tasks.push(
spawn_target::<EchoTarget>("echo", c, sourcechan.subscribe(), shutdown_token.clone())
.unwrap(),
)
} }
debug!("Task setup complete, tasks: {}", tasks.len()); debug!("Task setup complete, tasks: {}", tasks.len());
ctrlc::set_handler(move || { ctrlc::set_handler(move || {
if shutdown_token.is_cancelled() { if shutdown_token.is_cancelled() {
info!("Forced shutdown"); info!("Forced shutdown");

View File

@@ -5,14 +5,14 @@ use async_trait::async_trait;
use chrony_candm::reply::{self, ReplyBody, SourceMode}; use chrony_candm::reply::{self, ReplyBody, SourceMode};
use chrony_candm::request::{self, RequestBody}; use chrony_candm::request::{self, RequestBody};
use chrony_candm::{ClientOptions, blocking_query}; use chrony_candm::{ClientOptions, blocking_query};
use futures::future::join; use tokio::select;
use tokio::{join, select};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing::{info, warn}; use tracing::{info, warn};
use crate::SourceMetricSet;
use crate::{ use crate::{
ChimemonSource, ChimemonSourceChannel, ChronyConfig, MetricTags, SourceMetric, SourceReport, ChimemonSource, ChimemonSourceChannel, MetricTags, SourceMetric, SourceReport,
SourceReportDetails, SourceStatus, SourceReportDetails, SourceStatus, config::ChronyConfig,
}; };
pub struct ChronyClient { pub struct ChronyClient {
@@ -44,26 +44,24 @@ impl SourceReportDetails for ChronyTrackingReport {
fn is_healthy(&self) -> bool { fn is_healthy(&self) -> bool {
true true
} }
fn to_metrics(&self) -> Vec<SourceMetric> { fn to_metrics(&self) -> Vec<SourceMetricSet> {
let tags = &self.tags; vec![SourceMetricSet {
vec![ tags: self.tags.clone(),
SourceMetric::new_int("ref_id", self.ref_id, tags.clone()), metrics: vec![
SourceMetric::new_int("stratum", self.stratum, tags.clone()), SourceMetric::new_int("ref_id", self.ref_id),
SourceMetric::new_int("leap_status", self.leap_status, tags.clone()), SourceMetric::new_int("stratum", self.stratum),
SourceMetric::new_float("current_correction", self.current_correction, tags.clone()), SourceMetric::new_int("leap_status", self.leap_status),
SourceMetric::new_float("last_offset", self.last_offset, tags.clone()), SourceMetric::new_float("current_correction", self.current_correction),
SourceMetric::new_float("rms_offset", self.rms_offset, tags.clone()), SourceMetric::new_float("last_offset", self.last_offset),
SourceMetric::new_float("freq_ppm", self.freq_ppm, tags.clone()), SourceMetric::new_float("rms_offset", self.rms_offset),
SourceMetric::new_float("resid_freq_ppm", self.resid_freq_ppm, tags.clone()), SourceMetric::new_float("freq_ppm", self.freq_ppm),
SourceMetric::new_float("skew_ppm", self.skew_ppm, tags.clone()), SourceMetric::new_float("resid_freq_ppm", self.resid_freq_ppm),
SourceMetric::new_float("root_delay", self.root_delay, tags.clone()), SourceMetric::new_float("skew_ppm", self.skew_ppm),
SourceMetric::new_float("root_dispersion", self.root_dispersion, tags.clone()), SourceMetric::new_float("root_delay", self.root_delay),
SourceMetric::new_float( SourceMetric::new_float("root_dispersion", self.root_dispersion),
"last_update_interval", SourceMetric::new_float("last_update_interval", self.last_update_interval),
self.last_update_interval, ],
tags.clone(), }]
),
]
} }
} }
@@ -77,8 +75,8 @@ impl SourceReportDetails for ChronySourcesReport {
//TODO: think about whether there is an idea of unhealthy sources //TODO: think about whether there is an idea of unhealthy sources
true true
} }
fn to_metrics(&self) -> Vec<SourceMetric> { fn to_metrics(&self) -> Vec<SourceMetricSet> {
let mut metrics = Vec::with_capacity(8 * self.sources.len()); let mut metrics = Vec::with_capacity(self.sources.len());
for source in &self.sources { for source in &self.sources {
let tags = Arc::new(vec![ let tags = Arc::new(vec![
@@ -103,28 +101,19 @@ impl SourceReportDetails for ChronySourcesReport {
}, },
), ),
]); ]);
metrics.extend([ metrics.push(SourceMetricSet {
SourceMetric::new_int("poll", source.poll as i64, tags.clone()), tags: tags,
SourceMetric::new_int("stratum", source.stratum as i64, tags.clone()), metrics: vec![
SourceMetric::new_int("flags", source.flags.bits() as i64, tags.clone()), SourceMetric::new_int("poll", source.poll as i64),
SourceMetric::new_int( SourceMetric::new_int("stratum", source.stratum as i64),
"reachability", SourceMetric::new_int("flags", source.flags.bits() as i64),
source.reachability.count_ones() as i64, SourceMetric::new_int("reachability", source.reachability.count_ones() as i64),
tags.clone(), SourceMetric::new_int("since_sample", source.since_sample as i64),
), SourceMetric::new_float("orig_latest_meas", source.orig_latest_meas.into()),
SourceMetric::new_int("since_sample", source.since_sample as i64, tags.clone()), SourceMetric::new_float("latest_meas", source.latest_meas.into()),
SourceMetric::new_float( SourceMetric::new_float("latest_meas_err", source.latest_meas_err.into()),
"orig_latest_meas", ],
source.orig_latest_meas.into(), });
tags.clone(),
),
SourceMetric::new_float("latest_meas", source.latest_meas.into(), tags.clone()),
SourceMetric::new_float(
"latest_meas_err",
source.latest_meas_err.into(),
tags.clone(),
),
]);
} }
metrics metrics
@@ -281,6 +270,7 @@ impl ChronyClient {
#[async_trait] #[async_trait]
impl ChimemonSource for ChronyClient { impl ChimemonSource for ChronyClient {
type Config = ChronyConfig; type Config = ChronyConfig;
const TASK_NAME: &'static str = "chrony-task";
fn new(name: &str, config: Self::Config) -> Self { fn new(name: &str, config: Self::Config) -> Self {
let server = config let server = config
.host .host
@@ -313,7 +303,7 @@ impl ChimemonSource for ChronyClient {
match self.tracking_poll(&chan).await { match self.tracking_poll(&chan).await {
Ok(_) => (), Ok(_) => (),
Err(e) => { Err(e) => {
warn!("Error in chrony task: {}", e.to_string()); warn!(error = ?e, "Error in chrony tracking task");
} }
} }
}, },
@@ -321,7 +311,7 @@ impl ChimemonSource for ChronyClient {
match self.sources_poll(&chan).await { match self.sources_poll(&chan).await {
Ok(_) => (), Ok(_) => (),
Err(e) => { Err(e) => {
warn!("Error in chrony task: {}", e.to_string()); warn!(error = ?e, "Error in chrony sources task");
} }
} }
} }

View File

@@ -15,11 +15,12 @@ use tokio::net::{TcpStream, ToSocketAddrs, lookup_host};
use tokio::time::{interval, timeout}; use tokio::time::{interval, timeout};
use tokio_util::codec::{Framed, LinesCodec}; use tokio_util::codec::{Framed, LinesCodec};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing::{debug, debug_span, info, instrument, warn}; use tracing::{debug, debug_span, error, info, instrument, warn};
use crate::SourceMetricSet;
use crate::{ use crate::{
ChimemonMessage, ChimemonSource, ChimemonSourceChannel, GpsdConfig, SourceMetric, SourceReport, ChimemonSource, ChimemonSourceChannel, SourceMetric, SourceReport, SourceReportDetails,
SourceReportDetails, SourceStatus, SourceStatus, config::GpsdConfig,
}; };
pub struct GpsdSource { pub struct GpsdSource {
@@ -76,13 +77,16 @@ impl SourceReportDetails for GpsdSourceReport {
fn is_healthy(&self) -> bool { fn is_healthy(&self) -> bool {
self.fix_type != GpsdFixType::Unknown && self.fix_type != GpsdFixType::NoFix self.fix_type != GpsdFixType::Unknown && self.fix_type != GpsdFixType::NoFix
} }
fn to_metrics(&self) -> Vec<SourceMetric> { fn to_metrics(&self) -> Vec<SourceMetricSet> {
let tags = Arc::new(vec![]); let tags = Arc::new(vec![]);
vec![ vec![SourceMetricSet {
SourceMetric::new_int("sats_visible", self.sats_visible as i64, tags.clone()), tags,
SourceMetric::new_int("sats_tracked", self.sats_tracked as i64, tags.clone()), metrics: vec![
SourceMetric::new_float("tdop", self.tdop, tags.clone()), SourceMetric::new_int("sats_visible", self.sats_visible as i64),
] SourceMetric::new_int("sats_tracked", self.sats_tracked as i64),
SourceMetric::new_float("tdop", self.tdop),
],
}]
} }
} }
@@ -116,7 +120,8 @@ impl GpsdSource {
.and_then(|sky| sky.tdop) .and_then(|sky| sky.tdop)
.map_or(f64::INFINITY, |tdop| tdop as f64); .map_or(f64::INFINITY, |tdop| tdop as f64);
chan.send(ChimemonMessage::SourceReport(SourceReport { if let Err(e) = chan.send(
SourceReport {
name: self.name.clone(), name: self.name.clone(),
status: SourceStatus::Unknown, status: SourceStatus::Unknown,
details: Arc::new(GpsdSourceReport { details: Arc::new(GpsdSourceReport {
@@ -125,8 +130,11 @@ impl GpsdSource {
sats_visible, sats_visible,
tdop, tdop,
}), }),
})) }
.unwrap(); .into(),
) {
error!(error = ?e, "Unable to send to channel")
}
} }
fn handle_msg(&mut self, msg: String) -> Result<(), Box<dyn std::error::Error>> { fn handle_msg(&mut self, msg: String) -> Result<(), Box<dyn std::error::Error>> {
@@ -163,6 +171,7 @@ impl GpsdSource {
#[async_trait] #[async_trait]
impl ChimemonSource for GpsdSource { impl ChimemonSource for GpsdSource {
type Config = GpsdConfig; type Config = GpsdConfig;
const TASK_NAME: &'static str = "gpsd-task";
fn new(name: &str, config: Self::Config) -> Self { fn new(name: &str, config: Self::Config) -> Self {
// TODO: refactor so this mess isn't necessary // TODO: refactor so this mess isn't necessary
// Should do async setup at the start of run(), not here // Should do async setup at the start of run(), not here

View File

@@ -6,9 +6,10 @@ use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn}; use tracing::{debug, error, info, warn};
use crate::{ use crate::{
ChimemonSource, ChimemonSourceChannel, HwmonConfig, MetricTags, SourceMetric, SourceReport, ChimemonSource, ChimemonSourceChannel, MetricTags, SourceMetric, SourceMetricSet, SourceReport,
SourceReportDetails, SourceStatus, SourceReportDetails, SourceStatus, config::HwmonConfig,
}; };
pub struct HwmonSource { pub struct HwmonSource {
name: String, name: String,
config: HwmonConfig, config: HwmonConfig,
@@ -72,14 +73,13 @@ impl SourceReportDetails for HwmonReport {
//self.alarms.iter().any(|(_sensor, alarm)| *alarm) //self.alarms.iter().any(|(_sensor, alarm)| *alarm)
true true
} }
fn to_metrics(&self) -> Vec<SourceMetric> { fn to_metrics(&self) -> Vec<SourceMetricSet> {
let mut metrics = Vec::new(); let mut metrics = Vec::new();
for (sensor, value) in &self.values { for (sensor, value) in &self.values {
metrics.push(SourceMetric::new_float( metrics.push(SourceMetricSet {
"hwmon_value", tags: sensor.tags.clone(),
*value, metrics: vec![SourceMetric::new_float("hwmon_value", *value)],
sensor.tags.clone(), })
))
} }
// for (sensor, alarm) in &self.alarms { // for (sensor, alarm) in &self.alarms {
// metrics.push(SourceMetric::new_bool( // metrics.push(SourceMetric::new_bool(
@@ -104,6 +104,7 @@ impl HwmonSource {
#[async_trait] #[async_trait]
impl ChimemonSource for HwmonSource { impl ChimemonSource for HwmonSource {
type Config = HwmonConfig; type Config = HwmonConfig;
const TASK_NAME: &'static str = "hwmon-task";
fn new(name: &str, config: Self::Config) -> Self { fn new(name: &str, config: Self::Config) -> Self {
let sensors = config let sensors = config
.sensors .sensors

View File

@@ -16,9 +16,10 @@ use tokio_serial::{SerialPort, SerialStream};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing::{debug, debug_span, error, info, instrument, warn}; use tracing::{debug, debug_span, error, info, instrument, warn};
use crate::SourceMetricSet;
use crate::{ use crate::{
ChimemonMessage, ChimemonSource, ChimemonSourceChannel, MetricTags, Prs10Config, SourceMetric, ChimemonMessage, ChimemonSource, ChimemonSourceChannel, MetricTags, SourceMetric, SourceReport,
SourceReport, SourceReportDetails, SourceStatus, fatal, SourceReportDetails, SourceStatus, config::Prs10Config, fatal,
}; };
#[derive(Debug)] #[derive(Debug)]
@@ -76,7 +77,7 @@ impl Prs10PowerLampFlags {
.iter() .iter()
.map(|(flag, label)| { .map(|(flag, label)| {
// We track whether each flag is set (true) or not (false) // We track whether each flag is set (true) or not (false)
SourceMetric::new_bool(*label, self.contains(**flag), tags.clone()) SourceMetric::new_bool(*label, self.contains(**flag))
}) })
.collect() .collect()
} }
@@ -184,20 +185,19 @@ impl SourceReportDetails for Prs10Status {
&& self.pps_flags == HEALTHY_PPS && self.pps_flags == HEALTHY_PPS
} }
fn to_metrics(&self) -> Vec<SourceMetric> { fn to_metrics(&self) -> Vec<SourceMetricSet> {
let tags = Arc::new(vec![]); let tags = Arc::new(vec![]);
vec![ vec![SourceMetricSet {
SourceMetric::new_int( tags,
"volt_lamp_flags", metrics: vec![
self.volt_lamp_flags.bits() as i64, SourceMetric::new_int("volt_lamp_flags", self.volt_lamp_flags.bits() as i64),
tags.clone(), SourceMetric::new_int("rf_flags", self.rf_flags.bits() as i64),
), SourceMetric::new_int("temp_flags", self.temp_flags.bits() as i64),
SourceMetric::new_int("rf_flags", self.rf_flags.bits() as i64, tags.clone()), SourceMetric::new_int("fll_flags", self.fll_flags.bits() as i64),
SourceMetric::new_int("temp_flags", self.temp_flags.bits() as i64, tags.clone()), SourceMetric::new_int("pps_flags", self.pps_flags.bits() as i64),
SourceMetric::new_int("fll_flags", self.fll_flags.bits() as i64, tags.clone()),
SourceMetric::new_int("pps_flags", self.pps_flags.bits() as i64, tags.clone()),
// system flags are kind of useless because we can't guarantee they get upstreamed and will only appear once since they are 'event flags' // system flags are kind of useless because we can't guarantee they get upstreamed and will only appear once since they are 'event flags'
] ],
}]
} }
} }
@@ -267,57 +267,40 @@ impl SourceReportDetails for Prs10Stats {
fn is_healthy(&self) -> bool { fn is_healthy(&self) -> bool {
true true
} }
fn to_metrics(&self) -> Vec<SourceMetric> { fn to_metrics(&self) -> Vec<SourceMetricSet> {
let tags = Arc::new(vec![]); let tags = Arc::new(vec![]);
vec![ vec![SourceMetricSet {
tags,
metrics: vec![
// Integer Metrics // Integer Metrics
SourceMetric::new_int("ocxo_efc", self.ocxo_efc as i64, tags.clone()), SourceMetric::new_int("ocxo_efc", self.ocxo_efc as i64),
// Float Metrics // Float Metrics
SourceMetric::new_float("error_signal_volts", self.error_signal_volts, tags.clone()), SourceMetric::new_float("error_signal_volts", self.error_signal_volts),
SourceMetric::new_float( SourceMetric::new_float("detect_signal_volts", self.detect_signal_volts),
"detect_signal_volts", SourceMetric::new_float("heat_volts", self.heat_volts),
self.detect_signal_volts, SourceMetric::new_float("elec_volts", self.elec_volts),
tags.clone(), SourceMetric::new_float("lamp_fet_drain_volts", self.lamp_fet_drain_volts),
), SourceMetric::new_float("lamp_fet_gate_volts", self.lamp_fet_gate_volts),
SourceMetric::new_float("heat_volts", self.heat_volts, tags.clone()), SourceMetric::new_float("ocxo_heat_volts", self.ocxo_heat_volts),
SourceMetric::new_float("elec_volts", self.elec_volts, tags.clone()), SourceMetric::new_float("cell_heat_volts", self.cell_heat_volts),
SourceMetric::new_float( SourceMetric::new_float("lamp_heat_volts", self.lamp_heat_volts),
"lamp_fet_drain_volts", SourceMetric::new_float("rb_photo", self.rb_photo),
self.lamp_fet_drain_volts, SourceMetric::new_float("rb_photo_iv", self.rb_photo_iv),
tags.clone(), SourceMetric::new_float("case_temp", self.case_temp),
), SourceMetric::new_float("ocxo_therm", self.ocxo_therm),
SourceMetric::new_float( SourceMetric::new_float("cell_therm", self.cell_therm),
"lamp_fet_gate_volts", SourceMetric::new_float("lamp_therm", self.lamp_therm),
self.lamp_fet_gate_volts, SourceMetric::new_float("ext_cal_volts", self.ext_cal_volts),
tags.clone(), SourceMetric::new_float("analog_gnd_volts", self.analog_gnd_volts),
), SourceMetric::new_float("if_vco_varactor_volts", self.if_vco_varactor_volts),
SourceMetric::new_float("ocxo_heat_volts", self.ocxo_heat_volts, tags.clone()), SourceMetric::new_float("op_vco_varactor_volts", self.op_vco_varactor_volts),
SourceMetric::new_float("cell_heat_volts", self.cell_heat_volts, tags.clone()), SourceMetric::new_float("mul_amp_gain_volts", self.mul_amp_gain_volts),
SourceMetric::new_float("lamp_heat_volts", self.lamp_heat_volts, tags.clone()), SourceMetric::new_float("rf_lock_volts", self.rf_lock_volts),
SourceMetric::new_float("rb_photo", self.rb_photo, tags.clone()),
SourceMetric::new_float("rb_photo_iv", self.rb_photo_iv, tags.clone()),
SourceMetric::new_float("case_temp", self.case_temp, tags.clone()),
SourceMetric::new_float("ocxo_therm", self.ocxo_therm, tags.clone()),
SourceMetric::new_float("cell_therm", self.cell_therm, tags.clone()),
SourceMetric::new_float("lamp_therm", self.lamp_therm, tags.clone()),
SourceMetric::new_float("ext_cal_volts", self.ext_cal_volts, tags.clone()),
SourceMetric::new_float("analog_gnd_volts", self.analog_gnd_volts, tags.clone()),
SourceMetric::new_float(
"if_vco_varactor_volts",
self.if_vco_varactor_volts,
tags.clone(),
),
SourceMetric::new_float(
"op_vco_varactor_volts",
self.op_vco_varactor_volts,
tags.clone(),
),
SourceMetric::new_float("mul_amp_gain_volts", self.mul_amp_gain_volts, tags.clone()),
SourceMetric::new_float("rf_lock_volts", self.rf_lock_volts, tags.clone()),
// U16 Metrics (optional, but can be treated as integers) // U16 Metrics (optional, but can be treated as integers)
SourceMetric::new_int("freq_offset_ppt", self.freq_offset_ppt as i64, tags.clone()), SourceMetric::new_int("freq_offset_ppt", self.freq_offset_ppt as i64),
SourceMetric::new_int("mag_efc", self.mag_efc as i64, tags.clone()), SourceMetric::new_int("mag_efc", self.mag_efc as i64),
] ],
}]
} }
} }
@@ -429,7 +412,7 @@ impl Prs10Monitor {
#[instrument(skip_all)] #[instrument(skip_all)]
async fn status_poll(&mut self) -> Result<ChimemonMessage, Box<dyn std::error::Error>> { async fn status_poll(&mut self) -> Result<ChimemonMessage, Box<dyn std::error::Error>> {
let status = self.get_status().await?; let status = self.get_status().await?;
Ok(ChimemonMessage::SourceReport(SourceReport { Ok(SourceReport {
name: self.name.clone(), name: self.name.clone(),
status: if status.is_healthy() { status: if status.is_healthy() {
SourceStatus::Healthy SourceStatus::Healthy
@@ -437,7 +420,8 @@ impl Prs10Monitor {
SourceStatus::Unknown SourceStatus::Unknown
}, },
details: Arc::new(status), details: Arc::new(status),
})) }
.into())
} }
#[instrument(skip_all)] #[instrument(skip_all)]
@@ -459,7 +443,7 @@ impl Prs10Monitor {
} }
drop(stats_guard); drop(stats_guard);
Ok(ChimemonMessage::SourceReport(SourceReport { Ok(SourceReport {
name: self.name.clone(), name: self.name.clone(),
status: SourceStatus::Unknown, status: SourceStatus::Unknown,
details: Arc::new(Prs10Stats { details: Arc::new(Prs10Stats {
@@ -488,7 +472,8 @@ impl Prs10Monitor {
mul_amp_gain_volts: analog_values[18], mul_amp_gain_volts: analog_values[18],
rf_lock_volts: analog_values[19], rf_lock_volts: analog_values[19],
}), }),
})) }
.into())
} }
async fn reset_rx_state(&mut self) -> Result<(), Box<dyn std::error::Error>> { async fn reset_rx_state(&mut self) -> Result<(), Box<dyn std::error::Error>> {
@@ -509,6 +494,7 @@ impl Prs10Monitor {
#[async_trait] #[async_trait]
impl ChimemonSource for Prs10Monitor { impl ChimemonSource for Prs10Monitor {
type Config = Prs10Config; type Config = Prs10Config;
const TASK_NAME: &'static str = "prs10-task";
fn new(name: &str, config: Self::Config) -> Self { fn new(name: &str, config: Self::Config) -> Self {
let builder = tokio_serial::new(&config.port, config.baud) let builder = tokio_serial::new(&config.port, config.baud)
.timeout(config.timeout) .timeout(config.timeout)

View File

@@ -7,23 +7,20 @@ use bitflags::bitflags;
use byteorder::{BigEndian, ReadBytesExt}; use byteorder::{BigEndian, ReadBytesExt};
use bytes::{Buf, BytesMut}; use bytes::{Buf, BytesMut};
use chrono::{DateTime, Duration, NaiveDateTime, Utc}; use chrono::{DateTime, Duration, NaiveDateTime, Utc};
use figment::value::Map;
use futures::future::join;
use influxdb2::models::DataPoint;
use influxdb2::models::data_point::DataPointBuilder;
use itertools::Itertools; use itertools::Itertools;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, ReadHalf, WriteHalf}; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, ReadHalf, WriteHalf};
use tokio::select;
use tokio::sync::Mutex; use tokio::sync::Mutex;
use tokio::time::sleep; use tokio::time::sleep;
use tokio::{join, select};
use tokio_serial::{SerialPort, SerialStream}; use tokio_serial::{SerialPort, SerialStream};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing::{debug, info, warn}; use tracing::{debug, error, info, warn};
use crate::{ use crate::{
ChimemonMessage, ChimemonSource, ChimemonSourceChannel, SourceMetric, SourceReportDetails, ChimemonSource, ChimemonSourceChannel, SourceMetric, SourceReport, SourceReportDetails,
TimeReport, UCCMConfig, SourceStatus, TimeReport, config::UCCMConfig,
}; };
use crate::{SourceMetricSet, fatal};
pub const GPS_EPOCH: i64 = 315964800; // Doesn't seem possible to have a const DateTime object pub const GPS_EPOCH: i64 = 315964800; // Doesn't seem possible to have a const DateTime object
pub type UccmEndian = BigEndian; pub type UccmEndian = BigEndian;
@@ -63,88 +60,33 @@ impl SourceReportDetails for UCCMTODReport {
&& !self.flags.contains(UCCMFlags::GPS_LOS) && !self.flags.contains(UCCMFlags::GPS_LOS)
} }
fn to_metrics(&self) -> Vec<SourceMetric> { fn to_metrics(&self) -> Vec<SourceMetricSet> {
let no_tags = Arc::new(vec![]); let tags = Arc::new(vec![]);
vec![ vec![SourceMetricSet {
SourceMetric::new_int("leaps", self.leaps as i64, no_tags.clone()), tags,
SourceMetric::new_bool( metrics: vec![
"osc_lock", SourceMetric::new_int("leaps", self.leaps as i64),
self.flags.contains(UCCMFlags::OSC_LOCK), SourceMetric::new_bool("osc_lock", self.flags.contains(UCCMFlags::OSC_LOCK)),
no_tags.clone(), SourceMetric::new_bool("leap_flag", self.flags.contains(UCCMFlags::LEAP_FLAG)),
), SourceMetric::new_bool("init_unlock", self.flags.contains(UCCMFlags::INIT_UNLOCK)),
SourceMetric::new_bool(
"leap_flag",
self.flags.contains(UCCMFlags::LEAP_FLAG),
no_tags.clone(),
),
SourceMetric::new_bool(
"init_unlock",
self.flags.contains(UCCMFlags::INIT_UNLOCK),
no_tags.clone(),
),
SourceMetric::new_bool( SourceMetric::new_bool(
"init_no_sats", "init_no_sats",
self.flags.contains(UCCMFlags::INIT_NO_SATS), self.flags.contains(UCCMFlags::INIT_NO_SATS),
no_tags.clone(),
), ),
SourceMetric::new_bool( SourceMetric::new_bool(
"have_gps_time", "have_gps_time",
self.flags.contains(UCCMFlags::HAVE_GPS_TIME), self.flags.contains(UCCMFlags::HAVE_GPS_TIME),
no_tags.clone(),
),
SourceMetric::new_bool(
"power_fail",
self.flags.contains(UCCMFlags::POWER_FAIL),
no_tags.clone(),
),
SourceMetric::new_bool(
"no_gps_sync",
self.flags.contains(UCCMFlags::NO_GPS_SYNC),
no_tags.clone(),
), ),
SourceMetric::new_bool("power_fail", self.flags.contains(UCCMFlags::POWER_FAIL)),
SourceMetric::new_bool("no_gps_sync", self.flags.contains(UCCMFlags::NO_GPS_SYNC)),
SourceMetric::new_bool( SourceMetric::new_bool(
"no_gps_sync2", "no_gps_sync2",
self.flags.contains(UCCMFlags::NO_GPS_SYNC2), self.flags.contains(UCCMFlags::NO_GPS_SYNC2),
no_tags.clone(),
), ),
SourceMetric::new_bool( SourceMetric::new_bool("ant_fault", self.flags.contains(UCCMFlags::NO_ANT)),
"ant_fault", SourceMetric::new_bool("gps_los", self.flags.contains(UCCMFlags::GPS_LOS)),
self.flags.contains(UCCMFlags::NO_ANT), ],
no_tags.clone(), }]
),
SourceMetric::new_bool(
"gps_los",
self.flags.contains(UCCMFlags::GPS_LOS),
no_tags.clone(),
),
]
}
}
impl UCCMTODReport {
pub fn as_builder(&self, measurement: &String, tags: &Map<String, String>) -> DataPointBuilder {
let mut builder =
DataPoint::builder(measurement).timestamp(self.time.timestamp_nanos_opt().unwrap());
builder = builder.field("leaps", self.leaps as i64);
builder = builder.field("osc_lock", self.flags.contains(UCCMFlags::OSC_LOCK));
builder = builder.field("leap_flag", self.flags.contains(UCCMFlags::LEAP_FLAG));
builder = builder.field("init_unlock", self.flags.contains(UCCMFlags::INIT_UNLOCK));
builder = builder.field("init_no_sats", self.flags.contains(UCCMFlags::INIT_NO_SATS));
builder = builder.field(
"have_gps_time",
self.flags.contains(UCCMFlags::HAVE_GPS_TIME),
);
builder = builder.field("power_fail", self.flags.contains(UCCMFlags::POWER_FAIL));
builder = builder.field("no_gps_sync", self.flags.contains(UCCMFlags::NO_GPS_SYNC));
builder = builder.field("no_gps_sync2", self.flags.contains(UCCMFlags::NO_GPS_SYNC2));
builder = builder.field("ant_fault", self.flags.contains(UCCMFlags::NO_ANT));
builder = builder.field("gps_los", self.flags.contains(UCCMFlags::GPS_LOS));
builder = tags
.iter()
.fold(builder, |builder, (k, v)| builder.tag(k, v));
builder
} }
} }
@@ -153,14 +95,16 @@ pub struct UCCMLoopDiagReport {
pub ocxo: f32, pub ocxo: f32,
} }
impl UCCMLoopDiagReport { impl SourceReportDetails for UCCMLoopDiagReport {
pub fn as_builder(&self, measurement: &String, tags: &Map<String, String>) -> DataPointBuilder { fn is_healthy(&self) -> bool {
let mut builder = DataPoint::builder(measurement); true
builder = builder.field("ocxo_offset", self.ocxo as f64); }
builder = tags fn to_metrics(&self) -> Vec<SourceMetricSet> {
.iter() let tags = Arc::new(vec![]);
.fold(builder, |builder, (k, v)| builder.tag(k, v)); vec![SourceMetricSet {
builder tags,
metrics: vec![SourceMetric::new_float("ocxo_offset", self.ocxo as f64)],
}]
} }
} }
@@ -170,16 +114,9 @@ pub struct UCCMGpsSvTracking {
pub cno: u8, pub cno: u8,
} }
impl UCCMGpsSvTracking { impl From<&UCCMGpsSvTracking> for SourceMetric {
fn as_builder(&self, measurement: &String, tags: &Map<String, String>) -> DataPointBuilder { fn from(value: &UCCMGpsSvTracking) -> Self {
let mut builder = DataPoint::builder(measurement) SourceMetric::new_int("sv_cno", value.cno as i64)
.field("sv_cno", self.cno as i64)
.tag("sv_id", self.sv.to_string());
builder = tags
.iter()
.fold(builder, |builder, (k, v)| builder.tag(k, v));
builder
} }
} }
@@ -188,13 +125,15 @@ pub struct UCCMGPSSatsReport {
pub tracked_svs: Vec<UCCMGpsSvTracking>, pub tracked_svs: Vec<UCCMGpsSvTracking>,
} }
impl UCCMGPSSatsReport { impl SourceReportDetails for UCCMGPSSatsReport {
pub fn build(&self, measurement: &String, tags: &Map<String, String>) -> Vec<DataPoint> { fn is_healthy(&self) -> bool {
self.tracked_svs self.tracked_svs.len() >= 4
.iter() }
.map(|sv| sv.as_builder(measurement, tags)) fn to_metrics(&self) -> Vec<SourceMetricSet> {
.map(|b| b.build().unwrap()) vec![SourceMetricSet {
.collect() tags: Arc::new(vec![]),
metrics: self.tracked_svs.iter().map(|sv| sv.into()).collect(),
}]
} }
} }
@@ -277,20 +216,23 @@ impl SourceReportDetails for UCCMStatusReport {
fn is_healthy(&self) -> bool { fn is_healthy(&self) -> bool {
self.gps_pps_valid self.gps_pps_valid
} }
fn to_metrics(&self) -> Vec<SourceMetric> { fn to_metrics(&self) -> Vec<SourceMetricSet> {
let no_tags = Arc::new(vec![]); let tags = Arc::new(vec![]);
vec![ vec![SourceMetricSet {
SourceMetric::new_int("tfom", self.tfom as i64, no_tags.clone()), tags,
SourceMetric::new_int("ffom", self.ffom as i64, no_tags.clone()), metrics: vec![
SourceMetric::new_float("gps_phase", self.gps_phase as f64, no_tags.clone()), SourceMetric::new_int("tfom", self.tfom as i64),
SourceMetric::new_int("ffom", self.ffom as i64),
SourceMetric::new_float("gps_phase", self.gps_phase as f64),
// TODO: sv info // TODO: sv info
// TOOD: timestamp // TOOD: timestamp
SourceMetric::new_float("ant_voltage", self.ant_voltage as f64, no_tags.clone()), SourceMetric::new_float("ant_voltage", self.ant_voltage as f64),
SourceMetric::new_float("ant_current", self.ant_current as f64, no_tags.clone()), SourceMetric::new_float("ant_current", self.ant_current as f64),
SourceMetric::new_float("temp", self.temp as f64, no_tags.clone()), SourceMetric::new_float("temp", self.temp as f64),
SourceMetric::new_int("efc_dac", self.efc_dac as i64, no_tags.clone()), SourceMetric::new_int("efc_dac", self.efc_dac as i64),
SourceMetric::new_float("freq_error", self.freq_error as f64, no_tags.clone()), SourceMetric::new_float("freq_error", self.freq_error as f64),
] ],
}]
} }
} }
@@ -458,8 +400,8 @@ impl UCCMMonitor {
state: Arc<Mutex<UCCMMonitorParseState>>, state: Arc<Mutex<UCCMMonitorParseState>>,
) { ) {
let mut rdbuf = BytesMut::with_capacity(1024); let mut rdbuf = BytesMut::with_capacity(1024);
let mut last_loop_diag: Option<UCCMLoopDiagReport> = None; let mut last_loop_diag: Option<Arc<UCCMLoopDiagReport>> = None;
let mut last_gps_sats: Option<UCCMGPSSatsReport> = None; let mut last_gps_sats: Option<Arc<UCCMGPSSatsReport>> = None;
let mut last_sent_report = Utc::now() - self.config.status_interval; let mut last_sent_report = Utc::now() - self.config.status_interval;
@@ -496,42 +438,45 @@ impl UCCMMonitor {
&& tod && tod
.flags .flags
.contains(UCCMFlags::OSC_LOCK | UCCMFlags::HAVE_GPS_TIME); .contains(UCCMFlags::OSC_LOCK | UCCMFlags::HAVE_GPS_TIME);
chan.send(ChimemonMessage::TimeReport(TimeReport { chan.send(
TimeReport {
system_time: sysnow, system_time: sysnow,
offset, offset,
leaps: tod.leaps as isize, leaps: tod.leaps as isize,
leap_flag: tod.flags.contains(UCCMFlags::LEAP_FLAG), leap_flag: tod.flags.contains(UCCMFlags::LEAP_FLAG),
valid, valid,
})) }
.into(),
)
.expect("Unable to send to channel"); .expect("Unable to send to channel");
if sysnow - last_sent_report if sysnow - last_sent_report
>= Duration::from_std(self.config.status_interval).unwrap() >= Duration::from_std(self.config.status_interval).unwrap()
{ {
// let mut points = vec![ if let Some(loop_diag) = &last_loop_diag {
// tod.as_builder(&self.config.measurement, &self.config.tags) if let Err(e) = chan.send(
// .build() SourceReport {
// .unwrap(), name: "uccm".to_owned(),
// ]; status: SourceStatus::Unknown,
// if let Some(loop_diag) = &last_loop_diag { details: loop_diag.clone(),
// points.push( }
// loop_diag .into(),
// .as_builder( ) {
// &self.config.measurement, error!(error = ?e, "Unable to send message to channel");
// &self.config.influxdb.tags, }
// ) }
// .build() if let Some(gps_sats) = &last_gps_sats {
// .unwrap(), if let Err(e) = chan.send(
// ) SourceReport {
// } name: "uccm".to_owned(),
// if let Some(gps_sats) = &last_gps_sats { status: SourceStatus::Unknown,
// points.extend(gps_sats.build( details: gps_sats.clone(),
// &self.config.sources.uccm.measurement, }
// &self.config.influxdb.tags, .into(),
// )); ) {
// } error!(error = ?e, "Unable to send message to channel");
}
}
// chan.send(ChimemonMessage::DataPoints(points))
// .expect("Unable to send to channel");
last_sent_report = sysnow; last_sent_report = sysnow;
} }
} }
@@ -545,7 +490,7 @@ impl UCCMMonitor {
let loop_report = UCCMLoopDiagReport::try_from(loop_diag_resp.as_str()); let loop_report = UCCMLoopDiagReport::try_from(loop_diag_resp.as_str());
let gps_report = UCCMGPSSatsReport::try_from(gps_sats_resp.as_str()); let gps_report = UCCMGPSSatsReport::try_from(gps_sats_resp.as_str());
if let Ok(loop_report) = loop_report { if let Ok(loop_report) = loop_report {
last_loop_diag = Some(loop_report) last_loop_diag = Some(Arc::new(loop_report))
} else { } else {
warn!( warn!(
"Unable to parse loop diag report `{}`: {}", "Unable to parse loop diag report `{}`: {}",
@@ -554,7 +499,7 @@ impl UCCMMonitor {
); );
} }
if let Ok(gps_report) = gps_report { if let Ok(gps_report) = gps_report {
last_gps_sats = Some(gps_report) last_gps_sats = Some(Arc::new(gps_report))
} else { } else {
warn!( warn!(
"Unable to parse GPS sats report `{}`: {}", "Unable to parse GPS sats report `{}`: {}",
@@ -574,6 +519,7 @@ impl UCCMMonitor {
#[async_trait] #[async_trait]
impl ChimemonSource for UCCMMonitor { impl ChimemonSource for UCCMMonitor {
type Config = UCCMConfig; type Config = UCCMConfig;
const TASK_NAME: &'static str = "uccm-task";
fn new(name: &str, config: Self::Config) -> Self { fn new(name: &str, config: Self::Config) -> Self {
let builder = tokio_serial::new(&config.port, config.baud) let builder = tokio_serial::new(&config.port, config.baud)
.timeout(config.timeout) .timeout(config.timeout)
@@ -581,8 +527,13 @@ impl ChimemonSource for UCCMMonitor {
.parity(tokio_serial::Parity::None) .parity(tokio_serial::Parity::None)
.stop_bits(tokio_serial::StopBits::One) .stop_bits(tokio_serial::StopBits::One)
.flow_control(tokio_serial::FlowControl::None); .flow_control(tokio_serial::FlowControl::None);
let mut port = SerialStream::open(&builder).expect("Must be able to open serial port"); let mut port = match SerialStream::open(&builder) {
port.set_exclusive(true).expect("Can't lock serial port"); Ok(port) => port,
Err(e) => fatal!(error = ?e, "Error opening port {}", &config.port),
};
if let Err(e) = port.set_exclusive(true) {
fatal!(error= ?e, "Can't lock serial port");
};
info!( info!(
"Opened serial port {}@{}", "Opened serial port {}@{}",
port.name().unwrap(), port.name().unwrap(),
@@ -629,7 +580,5 @@ impl ChimemonSource for UCCMMonitor {
_ = cancel.cancelled() => { return }, _ = cancel.cancelled() => { return },
_ = rx_handle => { return } _ = rx_handle => { return }
}; };
join!(rx_handle).0.unwrap();
} }
} }

View File

@@ -6,13 +6,14 @@ use async_trait::async_trait;
use libc::{c_double, c_int, timeval}; use libc::{c_double, c_int, timeval};
use tokio::select; use tokio::select;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing::{debug, warn}; use tracing::{debug, info, warn};
use crate::{ChimemonMessage, ChimemonTarget, ChimemonTargetChannel, ChronySockConfig}; use crate::{ChimemonMessage, ChimemonTarget, ChimemonTargetChannel, config::ChronySockConfig};
const CHRONY_MAGIC: c_int = 0x534f434b; const CHRONY_MAGIC: c_int = 0x534f434b;
pub struct ChronySockServer { pub struct ChronySockServer {
name: String,
sock_path: PathBuf, sock_path: PathBuf,
} }
@@ -27,23 +28,27 @@ pub struct ChronyTimeReport {
magic: c_int, magic: c_int,
} }
impl ChronySockServer { impl ChronySockServer {}
pub fn new(config: ChronySockConfig) -> Self {
ChronySockServer {
sock_path: config.sock.into(),
}
}
}
#[async_trait] #[async_trait]
impl ChimemonTarget for ChronySockServer { impl ChimemonTarget for ChronySockServer {
type Config = ChronySockConfig;
const TASK_NAME: &'static str = "chrony-refclock-task";
fn new(name: &str, config: ChronySockConfig) -> Self {
ChronySockServer {
name: name.to_owned(),
sock_path: config.sock.into(),
}
}
async fn run(mut self, mut chan: ChimemonTargetChannel, cancel: CancellationToken) { async fn run(mut self, mut chan: ChimemonTargetChannel, cancel: CancellationToken) {
info!("Chrony refclock task started");
loop { loop {
select! { select! {
_ = cancel.cancelled() => { return } _ = cancel.cancelled() => { return }
msg = chan.recv() => { msg = chan.recv() => {
match msg { match msg {
Ok(ChimemonMessage::TimeReport(tr)) => { Ok(ChimemonMessage::TimeReport(tr)) => {
debug!(tr = ?tr, "Got timereport");
if tr.valid { if tr.valid {
{ {
let frame = ChronyTimeReport { let frame = ChronyTimeReport {

110
src/targets/influx.rs Normal file
View File

@@ -0,0 +1,110 @@
use async_trait::async_trait;
use futures::stream;
use influxdb2::{
Client,
models::{DataPoint, FieldValue},
};
use tokio::{select, sync::broadcast, time::timeout};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, instrument};
use crate::{
ChimemonMessage, ChimemonTarget, ChimemonTargetChannel, MetricValue, SourceReport,
config::InfluxConfig, fatal,
};
pub struct InfluxTarget {
name: String,
config: InfluxConfig,
influx: Client,
}
impl From<MetricValue> for FieldValue {
fn from(value: MetricValue) -> Self {
match value {
MetricValue::Bool(b) => FieldValue::Bool(b),
MetricValue::Float(f) => FieldValue::F64(f),
MetricValue::Int(i) => FieldValue::I64(i),
}
}
}
#[async_trait]
impl ChimemonTarget for InfluxTarget {
type Config = InfluxConfig;
const TASK_NAME: &'static str = "influx-task";
fn new(name: &str, config: Self::Config) -> Self {
let influx = Client::new(&config.url, &config.org, &config.token);
Self {
name: name.to_owned(),
config: config,
influx,
}
}
async fn run(self, mut chan: ChimemonTargetChannel, cancel: CancellationToken) {
info!("Influx task started");
loop {
let msg = select! {
_ = cancel.cancelled() => { return },
msg = chan.recv() => msg
};
debug!(msg = ?msg, "Got msg");
let msg = match msg {
Ok(msg) => msg,
Err(broadcast::error::RecvError::Closed) => {
fatal!("Permanent channel closed, terminating")
}
Err(broadcast::error::RecvError::Lagged(_)) => {
error!("Channel lagged");
continue;
}
};
if let Err(e) = self.handle_msg(&msg).await {
error!(error = ?e, msg=?&msg, "Error handling message");
}
}
}
}
impl InfluxTarget {
#[instrument(skip_all)]
async fn handle_source_report(
&self,
sr: &SourceReport,
) -> Result<(), Box<dyn std::error::Error>> {
debug!("Handling source report {}", sr.name);
let mut dps = Vec::new();
for metric_set in &sr.details.to_metrics() {
let mut builder = DataPoint::builder(&sr.name);
builder = self
.config
.tags
.iter()
.fold(builder, |builder, (k, v)| builder.tag(k, v));
builder = metric_set
.tags
.iter()
.fold(builder, |builder, (k, v)| builder.tag(*k, v));
builder = metric_set.metrics.iter().fold(builder, |builder, metric| {
builder.field(metric.name, metric.value)
});
dps.push(builder.build()?);
}
debug!("Sending {} datapoints to influx", dps.len());
timeout(
self.config.timeout,
self.influx.write(&self.config.bucket, stream::iter(dps)),
)
.await??;
debug!("All datapoints sent");
Ok(())
}
async fn handle_msg(&self, msg: &ChimemonMessage) -> Result<(), Box<dyn std::error::Error>> {
debug!(msg = ?msg, "Handling msg");
match msg {
ChimemonMessage::TimeReport(_tr) => Ok(()),
ChimemonMessage::SourceReport(sr) => self.handle_source_report(sr).await,
}
}
}

View File

@@ -1 +1,2 @@
pub mod chrony_refclock; pub mod chrony_refclock;
pub mod influx;