improvements
This commit is contained in:
@@ -10,7 +10,7 @@ release-logs = ["tracing/max_level_info"]
|
|||||||
[dependencies]
|
[dependencies]
|
||||||
serde = "1.0"
|
serde = "1.0"
|
||||||
serde_derive = "1.0"
|
serde_derive = "1.0"
|
||||||
tokio = { version = "1", features = ["rt", "io-util"] }
|
tokio = { version = "1", features = ["rt-multi-thread", "io-util"] }
|
||||||
clap = { version = "4.0", features = ["derive"] }
|
clap = { version = "4.0", features = ["derive"] }
|
||||||
figment = { version = "0.10", features = ["toml"] }
|
figment = { version = "0.10", features = ["toml"] }
|
||||||
futures = "0.3.24"
|
futures = "0.3.24"
|
||||||
|
|||||||
@@ -1,18 +1,13 @@
|
|||||||
use std::{collections::BTreeMap, path::Path};
|
use figment::{Provider, providers::Serialized, util::map, value::Map};
|
||||||
|
|
||||||
use figment::{
|
|
||||||
Figment, Provider,
|
|
||||||
providers::{Format, Serialized, Toml},
|
|
||||||
util::map,
|
|
||||||
value::Map,
|
|
||||||
};
|
|
||||||
use gethostname::gethostname;
|
use gethostname::gethostname;
|
||||||
use serde_derive::{Deserialize, Serialize};
|
use serde_derive::{Deserialize, Serialize};
|
||||||
use serde_with::{DurationSeconds, serde_as};
|
use serde_with::{DurationSeconds, serde_as};
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Clone)]
|
#[derive(Serialize, Deserialize, Clone)]
|
||||||
|
#[serde(default)]
|
||||||
pub struct InfluxConfig {
|
pub struct InfluxConfig {
|
||||||
pub enabled: bool,
|
pub enabled: bool,
|
||||||
|
pub timeout: std::time::Duration,
|
||||||
pub url: String,
|
pub url: String,
|
||||||
pub org: String,
|
pub org: String,
|
||||||
pub bucket: String,
|
pub bucket: String,
|
||||||
@@ -25,6 +20,7 @@ impl Default for InfluxConfig {
|
|||||||
let host = gethostname().into_string().unwrap();
|
let host = gethostname().into_string().unwrap();
|
||||||
InfluxConfig {
|
InfluxConfig {
|
||||||
enabled: false,
|
enabled: false,
|
||||||
|
timeout: std::time::Duration::from_secs(10),
|
||||||
url: "http://localhost:8086".into(),
|
url: "http://localhost:8086".into(),
|
||||||
org: "default".into(),
|
org: "default".into(),
|
||||||
bucket: "default".into(),
|
bucket: "default".into(),
|
||||||
@@ -199,14 +195,13 @@ pub enum SourceConfig {
|
|||||||
#[serde(tag = "type", rename_all = "snake_case")]
|
#[serde(tag = "type", rename_all = "snake_case")]
|
||||||
pub enum TargetConfig {
|
pub enum TargetConfig {
|
||||||
ChronySock(ChronySockConfig),
|
ChronySock(ChronySockConfig),
|
||||||
InfluxDb(InfluxConfig),
|
Influxdb(InfluxConfig),
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Clone, Default)]
|
#[derive(Serialize, Deserialize, Clone, Default)]
|
||||||
pub struct Config {
|
pub struct Config {
|
||||||
pub influxdb: InfluxConfig,
|
pub sources: Map<String, SourceConfig>,
|
||||||
pub sources: BTreeMap<String, SourceConfig>,
|
pub targets: Map<String, TargetConfig>,
|
||||||
pub targets: BTreeMap<String, TargetConfig>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Provider for Config {
|
impl Provider for Config {
|
||||||
|
|||||||
30
src/lib.rs
30
src/lib.rs
@@ -13,7 +13,7 @@ macro_rules! fatal {
|
|||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
|
|
||||||
use tokio::sync::broadcast::*;
|
use tokio::sync::broadcast;
|
||||||
use tokio_util::sync::CancellationToken;
|
use tokio_util::sync::CancellationToken;
|
||||||
|
|
||||||
use std::{fmt::Debug, sync::Arc};
|
use std::{fmt::Debug, sync::Arc};
|
||||||
@@ -50,37 +50,39 @@ type MetricTags = Vec<MetricTag>;
|
|||||||
pub struct SourceMetric {
|
pub struct SourceMetric {
|
||||||
name: &'static str,
|
name: &'static str,
|
||||||
value: MetricValue,
|
value: MetricValue,
|
||||||
tags: Arc<MetricTags>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SourceMetric {
|
impl SourceMetric {
|
||||||
pub fn new_int(name: &'static str, value: i64, tags: Arc<MetricTags>) -> Self {
|
pub fn new_int(name: &'static str, value: i64) -> Self {
|
||||||
Self {
|
Self {
|
||||||
name: name,
|
name: name,
|
||||||
value: MetricValue::Int(value),
|
value: MetricValue::Int(value),
|
||||||
tags,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn new_float(name: &'static str, value: f64, tags: Arc<MetricTags>) -> Self {
|
pub fn new_float(name: &'static str, value: f64) -> Self {
|
||||||
Self {
|
Self {
|
||||||
name: name,
|
name: name,
|
||||||
value: MetricValue::Float(value),
|
value: MetricValue::Float(value),
|
||||||
tags,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn new_bool(name: &'static str, value: bool, tags: Arc<MetricTags>) -> Self {
|
pub fn new_bool(name: &'static str, value: bool) -> Self {
|
||||||
Self {
|
Self {
|
||||||
name: name,
|
name: name,
|
||||||
value: MetricValue::Bool(value),
|
value: MetricValue::Bool(value),
|
||||||
tags,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct SourceMetricSet {
|
||||||
|
metrics: Vec<SourceMetric>,
|
||||||
|
tags: Arc<MetricTags>,
|
||||||
|
}
|
||||||
|
|
||||||
pub trait SourceReportDetails: Debug + Send + Sync {
|
pub trait SourceReportDetails: Debug + Send + Sync {
|
||||||
fn to_metrics(&self) -> Vec<SourceMetric>;
|
fn to_metrics(&self) -> Vec<SourceMetricSet>;
|
||||||
fn is_healthy(&self) -> bool;
|
fn is_healthy(&self) -> bool;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -88,7 +90,7 @@ pub trait SourceReportDetails: Debug + Send + Sync {
|
|||||||
pub struct SourceReport {
|
pub struct SourceReport {
|
||||||
pub name: String,
|
pub name: String,
|
||||||
pub status: SourceStatus,
|
pub status: SourceStatus,
|
||||||
pub details: Arc<dyn SourceReportDetails>,
|
pub details: Arc<dyn SourceReportDetails + Send + Sync>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
@@ -109,12 +111,13 @@ impl From<SourceReport> for ChimemonMessage {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type ChimemonSourceChannel = Sender<ChimemonMessage>;
|
pub type ChimemonSourceChannel = broadcast::Sender<ChimemonMessage>;
|
||||||
pub type ChimemonTargetChannel = Receiver<ChimemonMessage>;
|
pub type ChimemonTargetChannel = broadcast::Receiver<ChimemonMessage>;
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
pub trait ChimemonSource {
|
pub trait ChimemonSource {
|
||||||
type Config;
|
type Config;
|
||||||
|
const TASK_NAME: &'static str;
|
||||||
fn new(name: &str, config: Self::Config) -> Self;
|
fn new(name: &str, config: Self::Config) -> Self;
|
||||||
async fn run(self, chan: ChimemonSourceChannel, cancel: CancellationToken);
|
async fn run(self, chan: ChimemonSourceChannel, cancel: CancellationToken);
|
||||||
}
|
}
|
||||||
@@ -122,6 +125,7 @@ pub trait ChimemonSource {
|
|||||||
#[async_trait]
|
#[async_trait]
|
||||||
pub trait ChimemonTarget {
|
pub trait ChimemonTarget {
|
||||||
type Config;
|
type Config;
|
||||||
|
const TASK_NAME: &'static str;
|
||||||
fn new(name: &str, config: Self::Config) -> Self;
|
fn new(name: &str, config: Self::Config) -> Self;
|
||||||
async fn run(self, chan: ChimemonTargetChannel, cancel: CancellationToken);
|
async fn run(self, mut chan: ChimemonTargetChannel, cancel: CancellationToken);
|
||||||
}
|
}
|
||||||
|
|||||||
148
src/main.rs
148
src/main.rs
@@ -1,3 +1,6 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use async_trait::async_trait;
|
||||||
use clap::{Parser, ValueEnum};
|
use clap::{Parser, ValueEnum};
|
||||||
use figment::{
|
use figment::{
|
||||||
Figment,
|
Figment,
|
||||||
@@ -11,6 +14,7 @@ use tracing_subscriber::{self, EnvFilter, fmt::format::FmtSpan};
|
|||||||
|
|
||||||
use chimemon::{
|
use chimemon::{
|
||||||
config::{SourceConfig, TargetConfig},
|
config::{SourceConfig, TargetConfig},
|
||||||
|
targets::influx::InfluxTarget,
|
||||||
*,
|
*,
|
||||||
};
|
};
|
||||||
use config::Config;
|
use config::Config;
|
||||||
@@ -55,46 +59,36 @@ struct Args {
|
|||||||
config_file: String,
|
config_file: String,
|
||||||
#[arg(value_enum, default_value_t = Level::Info)]
|
#[arg(value_enum, default_value_t = Level::Info)]
|
||||||
log_level: Level,
|
log_level: Level,
|
||||||
|
#[arg(short, long, default_value_t = false)]
|
||||||
|
echo_task: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
fn run_source(
|
fn run_source(
|
||||||
name: &str,
|
name: &str,
|
||||||
source: SourceConfig,
|
source: SourceConfig,
|
||||||
chan: ChimemonSourceChannel,
|
chan: ChimemonSourceChannel,
|
||||||
shutdown: CancellationToken,
|
cancel: CancellationToken,
|
||||||
) -> Option<JoinHandle<()>> {
|
) -> Option<JoinHandle<()>> {
|
||||||
match source {
|
match source {
|
||||||
SourceConfig::Chrony(source_config) if source_config.enabled => {
|
SourceConfig::Chrony(cfg) if cfg.enabled => {
|
||||||
let c = ChronyClient::new(&name, source_config);
|
spawn_source::<ChronyClient>(name, cfg, chan, cancel)
|
||||||
Some(tokio::spawn(
|
|
||||||
c.run(chan, shutdown).instrument(info_span!("chrony-task")),
|
|
||||||
))
|
|
||||||
}
|
}
|
||||||
SourceConfig::Gpsd(source_config) if source_config.enabled => {
|
SourceConfig::Gpsd(cfg) if cfg.enabled => {
|
||||||
let c = GpsdSource::new(&name, source_config);
|
spawn_source::<GpsdSource>(name, cfg, chan, cancel)
|
||||||
Some(tokio::spawn(
|
|
||||||
c.run(chan, shutdown).instrument(info_span!("gpsd-task")),
|
|
||||||
))
|
|
||||||
}
|
}
|
||||||
SourceConfig::Hwmon(source_config) if source_config.enabled => {
|
SourceConfig::Hwmon(cfg) if cfg.enabled => {
|
||||||
let c = HwmonSource::new(&name, source_config);
|
spawn_source::<HwmonSource>(name, cfg, chan, cancel)
|
||||||
Some(tokio::spawn(
|
|
||||||
c.run(chan, shutdown).instrument(info_span!("hwmon-task")),
|
|
||||||
))
|
|
||||||
}
|
}
|
||||||
SourceConfig::Prs10(source_config) if source_config.enabled => {
|
SourceConfig::Prs10(cfg) if cfg.enabled => {
|
||||||
let c = Prs10Monitor::new(&name, source_config);
|
spawn_source::<Prs10Monitor>(name, cfg, chan, cancel)
|
||||||
Some(tokio::spawn(
|
|
||||||
c.run(chan, shutdown).instrument(info_span!("prs10-task")),
|
|
||||||
))
|
|
||||||
}
|
}
|
||||||
SourceConfig::Uccm(source_config) if source_config.enabled => {
|
SourceConfig::Uccm(cfg) if cfg.enabled => {
|
||||||
let c = UCCMMonitor::new(&name, source_config);
|
spawn_source::<UCCMMonitor>(name, cfg, chan, cancel)
|
||||||
Some(tokio::spawn(
|
}
|
||||||
c.run(chan, shutdown).instrument(info_span!("uccm-task")),
|
_ => {
|
||||||
))
|
debug!("Disabled source {name} skipped");
|
||||||
|
None
|
||||||
}
|
}
|
||||||
_ => None,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -105,44 +99,79 @@ fn run_target(
|
|||||||
cancel: CancellationToken,
|
cancel: CancellationToken,
|
||||||
) -> Option<JoinHandle<()>> {
|
) -> Option<JoinHandle<()>> {
|
||||||
match target {
|
match target {
|
||||||
TargetConfig::ChronySock(source_config) if source_config.enabled => {
|
TargetConfig::ChronySock(cfg) if cfg.enabled => {
|
||||||
let c = ChronySockServer::new(name, source_config);
|
spawn_target::<ChronySockServer>(name, cfg, chan, cancel)
|
||||||
Some(tokio::spawn(
|
|
||||||
c.run(chan, cancel)
|
|
||||||
.instrument(info_span!("chronysock-task")),
|
|
||||||
))
|
|
||||||
}
|
}
|
||||||
TargetConfig::InfluxDb(source_config) if source_config.enabled => {
|
TargetConfig::Influxdb(cfg) if cfg.enabled => {
|
||||||
warn!("influx not implemented");
|
spawn_target::<InfluxTarget>(name, cfg, chan, cancel)
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
debug!("Disabled target {name} skipped");
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
_ => None,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn dummy_consumer(mut chan: ChimemonTargetChannel, cancel: CancellationToken) {
|
fn spawn_source<T: ChimemonSource + Send + Sync + 'static>(
|
||||||
|
name: &str,
|
||||||
|
config: T::Config,
|
||||||
|
chan: ChimemonSourceChannel,
|
||||||
|
cancel: CancellationToken,
|
||||||
|
) -> Option<JoinHandle<()>> {
|
||||||
|
let span = info_span!("source", task = name);
|
||||||
|
let s = T::new(name, config);
|
||||||
|
Some(tokio::spawn(s.run(chan, cancel).instrument(span)))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn spawn_target<T: ChimemonTarget + Send + Sync + 'static>(
|
||||||
|
name: &str,
|
||||||
|
config: T::Config,
|
||||||
|
chan: ChimemonTargetChannel,
|
||||||
|
cancel: CancellationToken,
|
||||||
|
) -> Option<JoinHandle<()>> {
|
||||||
|
let span = info_span!("target", task = name);
|
||||||
|
let t = T::new(name, config);
|
||||||
|
Some(tokio::spawn(async move {
|
||||||
|
t.run(chan, cancel).instrument(span).await
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
struct EchoTarget {}
|
||||||
|
struct EchoTargetConfig {}
|
||||||
|
#[async_trait]
|
||||||
|
impl ChimemonTarget for EchoTarget {
|
||||||
|
type Config = EchoTargetConfig;
|
||||||
|
const TASK_NAME: &'static str = "echo-task";
|
||||||
|
|
||||||
|
fn new(_name: &str, _config: Self::Config) -> Self {
|
||||||
|
EchoTarget {}
|
||||||
|
}
|
||||||
|
async fn run(self, mut chan: ChimemonTargetChannel, cancel: CancellationToken) {
|
||||||
info!("Dummy receiver task started");
|
info!("Dummy receiver task started");
|
||||||
loop {
|
loop {
|
||||||
select! {
|
let msg = select! {
|
||||||
_ = cancel.cancelled() => {
|
_ = cancel.cancelled() => {
|
||||||
return
|
return
|
||||||
},
|
},
|
||||||
Ok(m) = chan.recv() => {
|
msg = chan.recv() => msg
|
||||||
match m {
|
};
|
||||||
ChimemonMessage::SourceReport(report) => {
|
match msg {
|
||||||
|
Ok(ChimemonMessage::SourceReport(report)) => {
|
||||||
let metrics = report.details.to_metrics();
|
let metrics = report.details.to_metrics();
|
||||||
info!("instance: {} metrics: {metrics:?}", report.name);
|
info!("instance: {} metrics: {metrics:?}", report.name);
|
||||||
}
|
}
|
||||||
msg => {
|
Ok(msg) => {
|
||||||
info!("message: {msg:?}");
|
info!("message: {msg:?}");
|
||||||
}
|
}
|
||||||
|
Err(e) => {
|
||||||
|
warn!(error = ?e, "Error receiving message");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::main(flavor = "current_thread")]
|
#[tokio::main(flavor = "multi_thread")]
|
||||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
let args = Args::parse();
|
let args = Args::parse();
|
||||||
tracing_subscriber::fmt()
|
tracing_subscriber::fmt()
|
||||||
@@ -162,17 +191,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||||||
let config: Config = fig.extract()?;
|
let config: Config = fig.extract()?;
|
||||||
|
|
||||||
let mut tasks = Vec::new();
|
let mut tasks = Vec::new();
|
||||||
let (tx, _) = broadcast::channel(16);
|
let (sourcechan, _) = broadcast::channel(16);
|
||||||
let sourcechan: ChimemonSourceChannel = tx;
|
|
||||||
|
|
||||||
let shutdown_token = CancellationToken::new();
|
let shutdown_token = CancellationToken::new();
|
||||||
|
|
||||||
for (name, source) in config.sources {
|
|
||||||
if let Some(task) = run_source(&name, source, sourcechan.clone(), shutdown_token.clone()) {
|
|
||||||
tasks.push(task)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for (name, target) in config.targets {
|
for (name, target) in config.targets {
|
||||||
if let Some(task) = run_target(
|
if let Some(task) = run_target(
|
||||||
&name,
|
&name,
|
||||||
@@ -184,21 +206,31 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for (name, source) in config.sources {
|
||||||
|
if let Some(task) = run_source(&name, source, sourcechan.clone(), shutdown_token.clone()) {
|
||||||
|
tasks.push(task)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if tasks.len() == 0 {
|
if tasks.len() == 0 {
|
||||||
error!("No tasks configured, exiting.");
|
error!("No tasks configured, exiting.");
|
||||||
return Ok(()); // not an error, but exit before starting a dummy task
|
return Ok(()); // not an error, but exit before starting a dummy task
|
||||||
}
|
}
|
||||||
|
if sourcechan.strong_count() == 0 {
|
||||||
|
warn!("No sources configured, no events will be generated");
|
||||||
|
}
|
||||||
if sourcechan.receiver_count() == 0 {
|
if sourcechan.receiver_count() == 0 {
|
||||||
warn!("No consumers configured, events will be discarded");
|
warn!("No targets configured, events will be discarded");
|
||||||
tasks.push(tokio::spawn(
|
}
|
||||||
dummy_consumer(sourcechan.subscribe(), shutdown_token.clone())
|
if args.echo_task || sourcechan.receiver_count() == 0 {
|
||||||
.instrument(info_span!("dummy-consumer-task")),
|
let c = EchoTargetConfig {};
|
||||||
));
|
tasks.push(
|
||||||
|
spawn_target::<EchoTarget>("echo", c, sourcechan.subscribe(), shutdown_token.clone())
|
||||||
|
.unwrap(),
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
debug!("Task setup complete, tasks: {}", tasks.len());
|
debug!("Task setup complete, tasks: {}", tasks.len());
|
||||||
|
|
||||||
ctrlc::set_handler(move || {
|
ctrlc::set_handler(move || {
|
||||||
if shutdown_token.is_cancelled() {
|
if shutdown_token.is_cancelled() {
|
||||||
info!("Forced shutdown");
|
info!("Forced shutdown");
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ use tokio::select;
|
|||||||
use tokio_util::sync::CancellationToken;
|
use tokio_util::sync::CancellationToken;
|
||||||
use tracing::{info, warn};
|
use tracing::{info, warn};
|
||||||
|
|
||||||
|
use crate::SourceMetricSet;
|
||||||
use crate::{
|
use crate::{
|
||||||
ChimemonSource, ChimemonSourceChannel, MetricTags, SourceMetric, SourceReport,
|
ChimemonSource, ChimemonSourceChannel, MetricTags, SourceMetric, SourceReport,
|
||||||
SourceReportDetails, SourceStatus, config::ChronyConfig,
|
SourceReportDetails, SourceStatus, config::ChronyConfig,
|
||||||
@@ -43,26 +44,24 @@ impl SourceReportDetails for ChronyTrackingReport {
|
|||||||
fn is_healthy(&self) -> bool {
|
fn is_healthy(&self) -> bool {
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
fn to_metrics(&self) -> Vec<SourceMetric> {
|
fn to_metrics(&self) -> Vec<SourceMetricSet> {
|
||||||
let tags = &self.tags;
|
vec![SourceMetricSet {
|
||||||
vec![
|
tags: self.tags.clone(),
|
||||||
SourceMetric::new_int("ref_id", self.ref_id, tags.clone()),
|
metrics: vec![
|
||||||
SourceMetric::new_int("stratum", self.stratum, tags.clone()),
|
SourceMetric::new_int("ref_id", self.ref_id),
|
||||||
SourceMetric::new_int("leap_status", self.leap_status, tags.clone()),
|
SourceMetric::new_int("stratum", self.stratum),
|
||||||
SourceMetric::new_float("current_correction", self.current_correction, tags.clone()),
|
SourceMetric::new_int("leap_status", self.leap_status),
|
||||||
SourceMetric::new_float("last_offset", self.last_offset, tags.clone()),
|
SourceMetric::new_float("current_correction", self.current_correction),
|
||||||
SourceMetric::new_float("rms_offset", self.rms_offset, tags.clone()),
|
SourceMetric::new_float("last_offset", self.last_offset),
|
||||||
SourceMetric::new_float("freq_ppm", self.freq_ppm, tags.clone()),
|
SourceMetric::new_float("rms_offset", self.rms_offset),
|
||||||
SourceMetric::new_float("resid_freq_ppm", self.resid_freq_ppm, tags.clone()),
|
SourceMetric::new_float("freq_ppm", self.freq_ppm),
|
||||||
SourceMetric::new_float("skew_ppm", self.skew_ppm, tags.clone()),
|
SourceMetric::new_float("resid_freq_ppm", self.resid_freq_ppm),
|
||||||
SourceMetric::new_float("root_delay", self.root_delay, tags.clone()),
|
SourceMetric::new_float("skew_ppm", self.skew_ppm),
|
||||||
SourceMetric::new_float("root_dispersion", self.root_dispersion, tags.clone()),
|
SourceMetric::new_float("root_delay", self.root_delay),
|
||||||
SourceMetric::new_float(
|
SourceMetric::new_float("root_dispersion", self.root_dispersion),
|
||||||
"last_update_interval",
|
SourceMetric::new_float("last_update_interval", self.last_update_interval),
|
||||||
self.last_update_interval,
|
],
|
||||||
tags.clone(),
|
}]
|
||||||
),
|
|
||||||
]
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -76,8 +75,8 @@ impl SourceReportDetails for ChronySourcesReport {
|
|||||||
//TODO: think about whether there is an idea of unhealthy sources
|
//TODO: think about whether there is an idea of unhealthy sources
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
fn to_metrics(&self) -> Vec<SourceMetric> {
|
fn to_metrics(&self) -> Vec<SourceMetricSet> {
|
||||||
let mut metrics = Vec::with_capacity(8 * self.sources.len());
|
let mut metrics = Vec::with_capacity(self.sources.len());
|
||||||
|
|
||||||
for source in &self.sources {
|
for source in &self.sources {
|
||||||
let tags = Arc::new(vec![
|
let tags = Arc::new(vec![
|
||||||
@@ -102,28 +101,19 @@ impl SourceReportDetails for ChronySourcesReport {
|
|||||||
},
|
},
|
||||||
),
|
),
|
||||||
]);
|
]);
|
||||||
metrics.extend([
|
metrics.push(SourceMetricSet {
|
||||||
SourceMetric::new_int("poll", source.poll as i64, tags.clone()),
|
tags: tags,
|
||||||
SourceMetric::new_int("stratum", source.stratum as i64, tags.clone()),
|
metrics: vec![
|
||||||
SourceMetric::new_int("flags", source.flags.bits() as i64, tags.clone()),
|
SourceMetric::new_int("poll", source.poll as i64),
|
||||||
SourceMetric::new_int(
|
SourceMetric::new_int("stratum", source.stratum as i64),
|
||||||
"reachability",
|
SourceMetric::new_int("flags", source.flags.bits() as i64),
|
||||||
source.reachability.count_ones() as i64,
|
SourceMetric::new_int("reachability", source.reachability.count_ones() as i64),
|
||||||
tags.clone(),
|
SourceMetric::new_int("since_sample", source.since_sample as i64),
|
||||||
),
|
SourceMetric::new_float("orig_latest_meas", source.orig_latest_meas.into()),
|
||||||
SourceMetric::new_int("since_sample", source.since_sample as i64, tags.clone()),
|
SourceMetric::new_float("latest_meas", source.latest_meas.into()),
|
||||||
SourceMetric::new_float(
|
SourceMetric::new_float("latest_meas_err", source.latest_meas_err.into()),
|
||||||
"orig_latest_meas",
|
],
|
||||||
source.orig_latest_meas.into(),
|
});
|
||||||
tags.clone(),
|
|
||||||
),
|
|
||||||
SourceMetric::new_float("latest_meas", source.latest_meas.into(), tags.clone()),
|
|
||||||
SourceMetric::new_float(
|
|
||||||
"latest_meas_err",
|
|
||||||
source.latest_meas_err.into(),
|
|
||||||
tags.clone(),
|
|
||||||
),
|
|
||||||
]);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
metrics
|
metrics
|
||||||
@@ -280,6 +270,7 @@ impl ChronyClient {
|
|||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl ChimemonSource for ChronyClient {
|
impl ChimemonSource for ChronyClient {
|
||||||
type Config = ChronyConfig;
|
type Config = ChronyConfig;
|
||||||
|
const TASK_NAME: &'static str = "chrony-task";
|
||||||
fn new(name: &str, config: Self::Config) -> Self {
|
fn new(name: &str, config: Self::Config) -> Self {
|
||||||
let server = config
|
let server = config
|
||||||
.host
|
.host
|
||||||
|
|||||||
@@ -17,9 +17,10 @@ use tokio_util::codec::{Framed, LinesCodec};
|
|||||||
use tokio_util::sync::CancellationToken;
|
use tokio_util::sync::CancellationToken;
|
||||||
use tracing::{debug, debug_span, error, info, instrument, warn};
|
use tracing::{debug, debug_span, error, info, instrument, warn};
|
||||||
|
|
||||||
|
use crate::SourceMetricSet;
|
||||||
use crate::{
|
use crate::{
|
||||||
ChimemonMessage, ChimemonSource, ChimemonSourceChannel, SourceMetric, SourceReport,
|
ChimemonSource, ChimemonSourceChannel, SourceMetric, SourceReport, SourceReportDetails,
|
||||||
SourceReportDetails, SourceStatus, config::GpsdConfig,
|
SourceStatus, config::GpsdConfig,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub struct GpsdSource {
|
pub struct GpsdSource {
|
||||||
@@ -76,13 +77,16 @@ impl SourceReportDetails for GpsdSourceReport {
|
|||||||
fn is_healthy(&self) -> bool {
|
fn is_healthy(&self) -> bool {
|
||||||
self.fix_type != GpsdFixType::Unknown && self.fix_type != GpsdFixType::NoFix
|
self.fix_type != GpsdFixType::Unknown && self.fix_type != GpsdFixType::NoFix
|
||||||
}
|
}
|
||||||
fn to_metrics(&self) -> Vec<SourceMetric> {
|
fn to_metrics(&self) -> Vec<SourceMetricSet> {
|
||||||
let tags = Arc::new(vec![]);
|
let tags = Arc::new(vec![]);
|
||||||
vec![
|
vec![SourceMetricSet {
|
||||||
SourceMetric::new_int("sats_visible", self.sats_visible as i64, tags.clone()),
|
tags,
|
||||||
SourceMetric::new_int("sats_tracked", self.sats_tracked as i64, tags.clone()),
|
metrics: vec![
|
||||||
SourceMetric::new_float("tdop", self.tdop, tags.clone()),
|
SourceMetric::new_int("sats_visible", self.sats_visible as i64),
|
||||||
]
|
SourceMetric::new_int("sats_tracked", self.sats_tracked as i64),
|
||||||
|
SourceMetric::new_float("tdop", self.tdop),
|
||||||
|
],
|
||||||
|
}]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -167,6 +171,7 @@ impl GpsdSource {
|
|||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl ChimemonSource for GpsdSource {
|
impl ChimemonSource for GpsdSource {
|
||||||
type Config = GpsdConfig;
|
type Config = GpsdConfig;
|
||||||
|
const TASK_NAME: &'static str = "gpsd-task";
|
||||||
fn new(name: &str, config: Self::Config) -> Self {
|
fn new(name: &str, config: Self::Config) -> Self {
|
||||||
// TODO: refactor so this mess isn't necessary
|
// TODO: refactor so this mess isn't necessary
|
||||||
// Should do async setup at the start of run(), not here
|
// Should do async setup at the start of run(), not here
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ use tokio_util::sync::CancellationToken;
|
|||||||
use tracing::{debug, error, info, warn};
|
use tracing::{debug, error, info, warn};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
ChimemonSource, ChimemonSourceChannel, MetricTags, SourceMetric, SourceReport,
|
ChimemonSource, ChimemonSourceChannel, MetricTags, SourceMetric, SourceMetricSet, SourceReport,
|
||||||
SourceReportDetails, SourceStatus, config::HwmonConfig,
|
SourceReportDetails, SourceStatus, config::HwmonConfig,
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -73,14 +73,13 @@ impl SourceReportDetails for HwmonReport {
|
|||||||
//self.alarms.iter().any(|(_sensor, alarm)| *alarm)
|
//self.alarms.iter().any(|(_sensor, alarm)| *alarm)
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
fn to_metrics(&self) -> Vec<SourceMetric> {
|
fn to_metrics(&self) -> Vec<SourceMetricSet> {
|
||||||
let mut metrics = Vec::new();
|
let mut metrics = Vec::new();
|
||||||
for (sensor, value) in &self.values {
|
for (sensor, value) in &self.values {
|
||||||
metrics.push(SourceMetric::new_float(
|
metrics.push(SourceMetricSet {
|
||||||
"hwmon_value",
|
tags: sensor.tags.clone(),
|
||||||
*value,
|
metrics: vec![SourceMetric::new_float("hwmon_value", *value)],
|
||||||
sensor.tags.clone(),
|
})
|
||||||
))
|
|
||||||
}
|
}
|
||||||
// for (sensor, alarm) in &self.alarms {
|
// for (sensor, alarm) in &self.alarms {
|
||||||
// metrics.push(SourceMetric::new_bool(
|
// metrics.push(SourceMetric::new_bool(
|
||||||
@@ -105,6 +104,7 @@ impl HwmonSource {
|
|||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl ChimemonSource for HwmonSource {
|
impl ChimemonSource for HwmonSource {
|
||||||
type Config = HwmonConfig;
|
type Config = HwmonConfig;
|
||||||
|
const TASK_NAME: &'static str = "hwmon-task";
|
||||||
fn new(name: &str, config: Self::Config) -> Self {
|
fn new(name: &str, config: Self::Config) -> Self {
|
||||||
let sensors = config
|
let sensors = config
|
||||||
.sensors
|
.sensors
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ use tokio_serial::{SerialPort, SerialStream};
|
|||||||
use tokio_util::sync::CancellationToken;
|
use tokio_util::sync::CancellationToken;
|
||||||
use tracing::{debug, debug_span, error, info, instrument, warn};
|
use tracing::{debug, debug_span, error, info, instrument, warn};
|
||||||
|
|
||||||
|
use crate::SourceMetricSet;
|
||||||
use crate::{
|
use crate::{
|
||||||
ChimemonMessage, ChimemonSource, ChimemonSourceChannel, MetricTags, SourceMetric, SourceReport,
|
ChimemonMessage, ChimemonSource, ChimemonSourceChannel, MetricTags, SourceMetric, SourceReport,
|
||||||
SourceReportDetails, SourceStatus, config::Prs10Config, fatal,
|
SourceReportDetails, SourceStatus, config::Prs10Config, fatal,
|
||||||
@@ -76,7 +77,7 @@ impl Prs10PowerLampFlags {
|
|||||||
.iter()
|
.iter()
|
||||||
.map(|(flag, label)| {
|
.map(|(flag, label)| {
|
||||||
// We track whether each flag is set (true) or not (false)
|
// We track whether each flag is set (true) or not (false)
|
||||||
SourceMetric::new_bool(*label, self.contains(**flag), tags.clone())
|
SourceMetric::new_bool(*label, self.contains(**flag))
|
||||||
})
|
})
|
||||||
.collect()
|
.collect()
|
||||||
}
|
}
|
||||||
@@ -184,20 +185,19 @@ impl SourceReportDetails for Prs10Status {
|
|||||||
&& self.pps_flags == HEALTHY_PPS
|
&& self.pps_flags == HEALTHY_PPS
|
||||||
}
|
}
|
||||||
|
|
||||||
fn to_metrics(&self) -> Vec<SourceMetric> {
|
fn to_metrics(&self) -> Vec<SourceMetricSet> {
|
||||||
let tags = Arc::new(vec![]);
|
let tags = Arc::new(vec![]);
|
||||||
vec![
|
vec![SourceMetricSet {
|
||||||
SourceMetric::new_int(
|
tags,
|
||||||
"volt_lamp_flags",
|
metrics: vec![
|
||||||
self.volt_lamp_flags.bits() as i64,
|
SourceMetric::new_int("volt_lamp_flags", self.volt_lamp_flags.bits() as i64),
|
||||||
tags.clone(),
|
SourceMetric::new_int("rf_flags", self.rf_flags.bits() as i64),
|
||||||
),
|
SourceMetric::new_int("temp_flags", self.temp_flags.bits() as i64),
|
||||||
SourceMetric::new_int("rf_flags", self.rf_flags.bits() as i64, tags.clone()),
|
SourceMetric::new_int("fll_flags", self.fll_flags.bits() as i64),
|
||||||
SourceMetric::new_int("temp_flags", self.temp_flags.bits() as i64, tags.clone()),
|
SourceMetric::new_int("pps_flags", self.pps_flags.bits() as i64),
|
||||||
SourceMetric::new_int("fll_flags", self.fll_flags.bits() as i64, tags.clone()),
|
|
||||||
SourceMetric::new_int("pps_flags", self.pps_flags.bits() as i64, tags.clone()),
|
|
||||||
// system flags are kind of useless because we can't guarantee they get upstreamed and will only appear once since they are 'event flags'
|
// system flags are kind of useless because we can't guarantee they get upstreamed and will only appear once since they are 'event flags'
|
||||||
]
|
],
|
||||||
|
}]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -267,57 +267,40 @@ impl SourceReportDetails for Prs10Stats {
|
|||||||
fn is_healthy(&self) -> bool {
|
fn is_healthy(&self) -> bool {
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
fn to_metrics(&self) -> Vec<SourceMetric> {
|
fn to_metrics(&self) -> Vec<SourceMetricSet> {
|
||||||
let tags = Arc::new(vec![]);
|
let tags = Arc::new(vec![]);
|
||||||
vec![
|
vec![SourceMetricSet {
|
||||||
|
tags,
|
||||||
|
metrics: vec![
|
||||||
// Integer Metrics
|
// Integer Metrics
|
||||||
SourceMetric::new_int("ocxo_efc", self.ocxo_efc as i64, tags.clone()),
|
SourceMetric::new_int("ocxo_efc", self.ocxo_efc as i64),
|
||||||
// Float Metrics
|
// Float Metrics
|
||||||
SourceMetric::new_float("error_signal_volts", self.error_signal_volts, tags.clone()),
|
SourceMetric::new_float("error_signal_volts", self.error_signal_volts),
|
||||||
SourceMetric::new_float(
|
SourceMetric::new_float("detect_signal_volts", self.detect_signal_volts),
|
||||||
"detect_signal_volts",
|
SourceMetric::new_float("heat_volts", self.heat_volts),
|
||||||
self.detect_signal_volts,
|
SourceMetric::new_float("elec_volts", self.elec_volts),
|
||||||
tags.clone(),
|
SourceMetric::new_float("lamp_fet_drain_volts", self.lamp_fet_drain_volts),
|
||||||
),
|
SourceMetric::new_float("lamp_fet_gate_volts", self.lamp_fet_gate_volts),
|
||||||
SourceMetric::new_float("heat_volts", self.heat_volts, tags.clone()),
|
SourceMetric::new_float("ocxo_heat_volts", self.ocxo_heat_volts),
|
||||||
SourceMetric::new_float("elec_volts", self.elec_volts, tags.clone()),
|
SourceMetric::new_float("cell_heat_volts", self.cell_heat_volts),
|
||||||
SourceMetric::new_float(
|
SourceMetric::new_float("lamp_heat_volts", self.lamp_heat_volts),
|
||||||
"lamp_fet_drain_volts",
|
SourceMetric::new_float("rb_photo", self.rb_photo),
|
||||||
self.lamp_fet_drain_volts,
|
SourceMetric::new_float("rb_photo_iv", self.rb_photo_iv),
|
||||||
tags.clone(),
|
SourceMetric::new_float("case_temp", self.case_temp),
|
||||||
),
|
SourceMetric::new_float("ocxo_therm", self.ocxo_therm),
|
||||||
SourceMetric::new_float(
|
SourceMetric::new_float("cell_therm", self.cell_therm),
|
||||||
"lamp_fet_gate_volts",
|
SourceMetric::new_float("lamp_therm", self.lamp_therm),
|
||||||
self.lamp_fet_gate_volts,
|
SourceMetric::new_float("ext_cal_volts", self.ext_cal_volts),
|
||||||
tags.clone(),
|
SourceMetric::new_float("analog_gnd_volts", self.analog_gnd_volts),
|
||||||
),
|
SourceMetric::new_float("if_vco_varactor_volts", self.if_vco_varactor_volts),
|
||||||
SourceMetric::new_float("ocxo_heat_volts", self.ocxo_heat_volts, tags.clone()),
|
SourceMetric::new_float("op_vco_varactor_volts", self.op_vco_varactor_volts),
|
||||||
SourceMetric::new_float("cell_heat_volts", self.cell_heat_volts, tags.clone()),
|
SourceMetric::new_float("mul_amp_gain_volts", self.mul_amp_gain_volts),
|
||||||
SourceMetric::new_float("lamp_heat_volts", self.lamp_heat_volts, tags.clone()),
|
SourceMetric::new_float("rf_lock_volts", self.rf_lock_volts),
|
||||||
SourceMetric::new_float("rb_photo", self.rb_photo, tags.clone()),
|
|
||||||
SourceMetric::new_float("rb_photo_iv", self.rb_photo_iv, tags.clone()),
|
|
||||||
SourceMetric::new_float("case_temp", self.case_temp, tags.clone()),
|
|
||||||
SourceMetric::new_float("ocxo_therm", self.ocxo_therm, tags.clone()),
|
|
||||||
SourceMetric::new_float("cell_therm", self.cell_therm, tags.clone()),
|
|
||||||
SourceMetric::new_float("lamp_therm", self.lamp_therm, tags.clone()),
|
|
||||||
SourceMetric::new_float("ext_cal_volts", self.ext_cal_volts, tags.clone()),
|
|
||||||
SourceMetric::new_float("analog_gnd_volts", self.analog_gnd_volts, tags.clone()),
|
|
||||||
SourceMetric::new_float(
|
|
||||||
"if_vco_varactor_volts",
|
|
||||||
self.if_vco_varactor_volts,
|
|
||||||
tags.clone(),
|
|
||||||
),
|
|
||||||
SourceMetric::new_float(
|
|
||||||
"op_vco_varactor_volts",
|
|
||||||
self.op_vco_varactor_volts,
|
|
||||||
tags.clone(),
|
|
||||||
),
|
|
||||||
SourceMetric::new_float("mul_amp_gain_volts", self.mul_amp_gain_volts, tags.clone()),
|
|
||||||
SourceMetric::new_float("rf_lock_volts", self.rf_lock_volts, tags.clone()),
|
|
||||||
// U16 Metrics (optional, but can be treated as integers)
|
// U16 Metrics (optional, but can be treated as integers)
|
||||||
SourceMetric::new_int("freq_offset_ppt", self.freq_offset_ppt as i64, tags.clone()),
|
SourceMetric::new_int("freq_offset_ppt", self.freq_offset_ppt as i64),
|
||||||
SourceMetric::new_int("mag_efc", self.mag_efc as i64, tags.clone()),
|
SourceMetric::new_int("mag_efc", self.mag_efc as i64),
|
||||||
]
|
],
|
||||||
|
}]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -511,6 +494,7 @@ impl Prs10Monitor {
|
|||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl ChimemonSource for Prs10Monitor {
|
impl ChimemonSource for Prs10Monitor {
|
||||||
type Config = Prs10Config;
|
type Config = Prs10Config;
|
||||||
|
const TASK_NAME: &'static str = "prs10-task";
|
||||||
fn new(name: &str, config: Self::Config) -> Self {
|
fn new(name: &str, config: Self::Config) -> Self {
|
||||||
let builder = tokio_serial::new(&config.port, config.baud)
|
let builder = tokio_serial::new(&config.port, config.baud)
|
||||||
.timeout(config.timeout)
|
.timeout(config.timeout)
|
||||||
|
|||||||
@@ -17,9 +17,10 @@ use tokio_util::sync::CancellationToken;
|
|||||||
use tracing::{debug, error, info, warn};
|
use tracing::{debug, error, info, warn};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
ChimemonMessage, ChimemonSource, ChimemonSourceChannel, SourceMetric, SourceReport,
|
ChimemonSource, ChimemonSourceChannel, SourceMetric, SourceReport, SourceReportDetails,
|
||||||
SourceReportDetails, SourceStatus, TimeReport, config::UCCMConfig,
|
SourceStatus, TimeReport, config::UCCMConfig,
|
||||||
};
|
};
|
||||||
|
use crate::{SourceMetricSet, fatal};
|
||||||
|
|
||||||
pub const GPS_EPOCH: i64 = 315964800; // Doesn't seem possible to have a const DateTime object
|
pub const GPS_EPOCH: i64 = 315964800; // Doesn't seem possible to have a const DateTime object
|
||||||
pub type UccmEndian = BigEndian;
|
pub type UccmEndian = BigEndian;
|
||||||
@@ -59,61 +60,33 @@ impl SourceReportDetails for UCCMTODReport {
|
|||||||
&& !self.flags.contains(UCCMFlags::GPS_LOS)
|
&& !self.flags.contains(UCCMFlags::GPS_LOS)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn to_metrics(&self) -> Vec<SourceMetric> {
|
fn to_metrics(&self) -> Vec<SourceMetricSet> {
|
||||||
let no_tags = Arc::new(vec![]);
|
let tags = Arc::new(vec![]);
|
||||||
vec![
|
vec![SourceMetricSet {
|
||||||
SourceMetric::new_int("leaps", self.leaps as i64, no_tags.clone()),
|
tags,
|
||||||
SourceMetric::new_bool(
|
metrics: vec![
|
||||||
"osc_lock",
|
SourceMetric::new_int("leaps", self.leaps as i64),
|
||||||
self.flags.contains(UCCMFlags::OSC_LOCK),
|
SourceMetric::new_bool("osc_lock", self.flags.contains(UCCMFlags::OSC_LOCK)),
|
||||||
no_tags.clone(),
|
SourceMetric::new_bool("leap_flag", self.flags.contains(UCCMFlags::LEAP_FLAG)),
|
||||||
),
|
SourceMetric::new_bool("init_unlock", self.flags.contains(UCCMFlags::INIT_UNLOCK)),
|
||||||
SourceMetric::new_bool(
|
|
||||||
"leap_flag",
|
|
||||||
self.flags.contains(UCCMFlags::LEAP_FLAG),
|
|
||||||
no_tags.clone(),
|
|
||||||
),
|
|
||||||
SourceMetric::new_bool(
|
|
||||||
"init_unlock",
|
|
||||||
self.flags.contains(UCCMFlags::INIT_UNLOCK),
|
|
||||||
no_tags.clone(),
|
|
||||||
),
|
|
||||||
SourceMetric::new_bool(
|
SourceMetric::new_bool(
|
||||||
"init_no_sats",
|
"init_no_sats",
|
||||||
self.flags.contains(UCCMFlags::INIT_NO_SATS),
|
self.flags.contains(UCCMFlags::INIT_NO_SATS),
|
||||||
no_tags.clone(),
|
|
||||||
),
|
),
|
||||||
SourceMetric::new_bool(
|
SourceMetric::new_bool(
|
||||||
"have_gps_time",
|
"have_gps_time",
|
||||||
self.flags.contains(UCCMFlags::HAVE_GPS_TIME),
|
self.flags.contains(UCCMFlags::HAVE_GPS_TIME),
|
||||||
no_tags.clone(),
|
|
||||||
),
|
|
||||||
SourceMetric::new_bool(
|
|
||||||
"power_fail",
|
|
||||||
self.flags.contains(UCCMFlags::POWER_FAIL),
|
|
||||||
no_tags.clone(),
|
|
||||||
),
|
|
||||||
SourceMetric::new_bool(
|
|
||||||
"no_gps_sync",
|
|
||||||
self.flags.contains(UCCMFlags::NO_GPS_SYNC),
|
|
||||||
no_tags.clone(),
|
|
||||||
),
|
),
|
||||||
|
SourceMetric::new_bool("power_fail", self.flags.contains(UCCMFlags::POWER_FAIL)),
|
||||||
|
SourceMetric::new_bool("no_gps_sync", self.flags.contains(UCCMFlags::NO_GPS_SYNC)),
|
||||||
SourceMetric::new_bool(
|
SourceMetric::new_bool(
|
||||||
"no_gps_sync2",
|
"no_gps_sync2",
|
||||||
self.flags.contains(UCCMFlags::NO_GPS_SYNC2),
|
self.flags.contains(UCCMFlags::NO_GPS_SYNC2),
|
||||||
no_tags.clone(),
|
|
||||||
),
|
),
|
||||||
SourceMetric::new_bool(
|
SourceMetric::new_bool("ant_fault", self.flags.contains(UCCMFlags::NO_ANT)),
|
||||||
"ant_fault",
|
SourceMetric::new_bool("gps_los", self.flags.contains(UCCMFlags::GPS_LOS)),
|
||||||
self.flags.contains(UCCMFlags::NO_ANT),
|
],
|
||||||
no_tags.clone(),
|
}]
|
||||||
),
|
|
||||||
SourceMetric::new_bool(
|
|
||||||
"gps_los",
|
|
||||||
self.flags.contains(UCCMFlags::GPS_LOS),
|
|
||||||
no_tags.clone(),
|
|
||||||
),
|
|
||||||
]
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -126,13 +99,12 @@ impl SourceReportDetails for UCCMLoopDiagReport {
|
|||||||
fn is_healthy(&self) -> bool {
|
fn is_healthy(&self) -> bool {
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
fn to_metrics(&self) -> Vec<SourceMetric> {
|
fn to_metrics(&self) -> Vec<SourceMetricSet> {
|
||||||
let tags = Arc::new(vec![]);
|
let tags = Arc::new(vec![]);
|
||||||
vec![SourceMetric::new_float(
|
vec![SourceMetricSet {
|
||||||
"ocxo_offset",
|
|
||||||
self.ocxo as f64,
|
|
||||||
tags,
|
tags,
|
||||||
)]
|
metrics: vec![SourceMetric::new_float("ocxo_offset", self.ocxo as f64)],
|
||||||
|
}]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -144,11 +116,7 @@ pub struct UCCMGpsSvTracking {
|
|||||||
|
|
||||||
impl From<&UCCMGpsSvTracking> for SourceMetric {
|
impl From<&UCCMGpsSvTracking> for SourceMetric {
|
||||||
fn from(value: &UCCMGpsSvTracking) -> Self {
|
fn from(value: &UCCMGpsSvTracking) -> Self {
|
||||||
SourceMetric::new_int(
|
SourceMetric::new_int("sv_cno", value.cno as i64)
|
||||||
"sv_cno",
|
|
||||||
value.cno as i64,
|
|
||||||
Arc::new(vec![("sv_id", value.sv.to_string())]),
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -161,8 +129,11 @@ impl SourceReportDetails for UCCMGPSSatsReport {
|
|||||||
fn is_healthy(&self) -> bool {
|
fn is_healthy(&self) -> bool {
|
||||||
self.tracked_svs.len() >= 4
|
self.tracked_svs.len() >= 4
|
||||||
}
|
}
|
||||||
fn to_metrics(&self) -> Vec<SourceMetric> {
|
fn to_metrics(&self) -> Vec<SourceMetricSet> {
|
||||||
self.tracked_svs.iter().map(|sv| sv.into()).collect()
|
vec![SourceMetricSet {
|
||||||
|
tags: Arc::new(vec![]),
|
||||||
|
metrics: self.tracked_svs.iter().map(|sv| sv.into()).collect(),
|
||||||
|
}]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -245,20 +216,23 @@ impl SourceReportDetails for UCCMStatusReport {
|
|||||||
fn is_healthy(&self) -> bool {
|
fn is_healthy(&self) -> bool {
|
||||||
self.gps_pps_valid
|
self.gps_pps_valid
|
||||||
}
|
}
|
||||||
fn to_metrics(&self) -> Vec<SourceMetric> {
|
fn to_metrics(&self) -> Vec<SourceMetricSet> {
|
||||||
let no_tags = Arc::new(vec![]);
|
let tags = Arc::new(vec![]);
|
||||||
vec![
|
vec![SourceMetricSet {
|
||||||
SourceMetric::new_int("tfom", self.tfom as i64, no_tags.clone()),
|
tags,
|
||||||
SourceMetric::new_int("ffom", self.ffom as i64, no_tags.clone()),
|
metrics: vec![
|
||||||
SourceMetric::new_float("gps_phase", self.gps_phase as f64, no_tags.clone()),
|
SourceMetric::new_int("tfom", self.tfom as i64),
|
||||||
|
SourceMetric::new_int("ffom", self.ffom as i64),
|
||||||
|
SourceMetric::new_float("gps_phase", self.gps_phase as f64),
|
||||||
// TODO: sv info
|
// TODO: sv info
|
||||||
// TOOD: timestamp
|
// TOOD: timestamp
|
||||||
SourceMetric::new_float("ant_voltage", self.ant_voltage as f64, no_tags.clone()),
|
SourceMetric::new_float("ant_voltage", self.ant_voltage as f64),
|
||||||
SourceMetric::new_float("ant_current", self.ant_current as f64, no_tags.clone()),
|
SourceMetric::new_float("ant_current", self.ant_current as f64),
|
||||||
SourceMetric::new_float("temp", self.temp as f64, no_tags.clone()),
|
SourceMetric::new_float("temp", self.temp as f64),
|
||||||
SourceMetric::new_int("efc_dac", self.efc_dac as i64, no_tags.clone()),
|
SourceMetric::new_int("efc_dac", self.efc_dac as i64),
|
||||||
SourceMetric::new_float("freq_error", self.freq_error as f64, no_tags.clone()),
|
SourceMetric::new_float("freq_error", self.freq_error as f64),
|
||||||
]
|
],
|
||||||
|
}]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -545,6 +519,7 @@ impl UCCMMonitor {
|
|||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl ChimemonSource for UCCMMonitor {
|
impl ChimemonSource for UCCMMonitor {
|
||||||
type Config = UCCMConfig;
|
type Config = UCCMConfig;
|
||||||
|
const TASK_NAME: &'static str = "uccm-task";
|
||||||
fn new(name: &str, config: Self::Config) -> Self {
|
fn new(name: &str, config: Self::Config) -> Self {
|
||||||
let builder = tokio_serial::new(&config.port, config.baud)
|
let builder = tokio_serial::new(&config.port, config.baud)
|
||||||
.timeout(config.timeout)
|
.timeout(config.timeout)
|
||||||
@@ -552,8 +527,13 @@ impl ChimemonSource for UCCMMonitor {
|
|||||||
.parity(tokio_serial::Parity::None)
|
.parity(tokio_serial::Parity::None)
|
||||||
.stop_bits(tokio_serial::StopBits::One)
|
.stop_bits(tokio_serial::StopBits::One)
|
||||||
.flow_control(tokio_serial::FlowControl::None);
|
.flow_control(tokio_serial::FlowControl::None);
|
||||||
let mut port = SerialStream::open(&builder).expect("Must be able to open serial port");
|
let mut port = match SerialStream::open(&builder) {
|
||||||
port.set_exclusive(true).expect("Can't lock serial port");
|
Ok(port) => port,
|
||||||
|
Err(e) => fatal!(error = ?e, "Error opening port {}", &config.port),
|
||||||
|
};
|
||||||
|
if let Err(e) = port.set_exclusive(true) {
|
||||||
|
fatal!(error= ?e, "Can't lock serial port");
|
||||||
|
};
|
||||||
info!(
|
info!(
|
||||||
"Opened serial port {}@{}",
|
"Opened serial port {}@{}",
|
||||||
port.name().unwrap(),
|
port.name().unwrap(),
|
||||||
|
|||||||
@@ -33,6 +33,7 @@ impl ChronySockServer {}
|
|||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl ChimemonTarget for ChronySockServer {
|
impl ChimemonTarget for ChronySockServer {
|
||||||
type Config = ChronySockConfig;
|
type Config = ChronySockConfig;
|
||||||
|
const TASK_NAME: &'static str = "chrony-refclock-task";
|
||||||
fn new(name: &str, config: ChronySockConfig) -> Self {
|
fn new(name: &str, config: ChronySockConfig) -> Self {
|
||||||
ChronySockServer {
|
ChronySockServer {
|
||||||
name: name.to_owned(),
|
name: name.to_owned(),
|
||||||
@@ -47,6 +48,7 @@ impl ChimemonTarget for ChronySockServer {
|
|||||||
msg = chan.recv() => {
|
msg = chan.recv() => {
|
||||||
match msg {
|
match msg {
|
||||||
Ok(ChimemonMessage::TimeReport(tr)) => {
|
Ok(ChimemonMessage::TimeReport(tr)) => {
|
||||||
|
debug!(tr = ?tr, "Got timereport");
|
||||||
if tr.valid {
|
if tr.valid {
|
||||||
{
|
{
|
||||||
let frame = ChronyTimeReport {
|
let frame = ChronyTimeReport {
|
||||||
|
|||||||
110
src/targets/influx.rs
Normal file
110
src/targets/influx.rs
Normal file
@@ -0,0 +1,110 @@
|
|||||||
|
use async_trait::async_trait;
|
||||||
|
use futures::stream;
|
||||||
|
use influxdb2::{
|
||||||
|
Client,
|
||||||
|
models::{DataPoint, FieldValue},
|
||||||
|
};
|
||||||
|
use tokio::{select, sync::broadcast, time::timeout};
|
||||||
|
use tokio_util::sync::CancellationToken;
|
||||||
|
use tracing::{debug, error, info, instrument};
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
ChimemonMessage, ChimemonTarget, ChimemonTargetChannel, MetricValue, SourceReport,
|
||||||
|
config::InfluxConfig, fatal,
|
||||||
|
};
|
||||||
|
|
||||||
|
pub struct InfluxTarget {
|
||||||
|
name: String,
|
||||||
|
config: InfluxConfig,
|
||||||
|
influx: Client,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<MetricValue> for FieldValue {
|
||||||
|
fn from(value: MetricValue) -> Self {
|
||||||
|
match value {
|
||||||
|
MetricValue::Bool(b) => FieldValue::Bool(b),
|
||||||
|
MetricValue::Float(f) => FieldValue::F64(f),
|
||||||
|
MetricValue::Int(i) => FieldValue::I64(i),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl ChimemonTarget for InfluxTarget {
|
||||||
|
type Config = InfluxConfig;
|
||||||
|
const TASK_NAME: &'static str = "influx-task";
|
||||||
|
fn new(name: &str, config: Self::Config) -> Self {
|
||||||
|
let influx = Client::new(&config.url, &config.org, &config.token);
|
||||||
|
Self {
|
||||||
|
name: name.to_owned(),
|
||||||
|
config: config,
|
||||||
|
influx,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
async fn run(self, mut chan: ChimemonTargetChannel, cancel: CancellationToken) {
|
||||||
|
info!("Influx task started");
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let msg = select! {
|
||||||
|
_ = cancel.cancelled() => { return },
|
||||||
|
msg = chan.recv() => msg
|
||||||
|
};
|
||||||
|
debug!(msg = ?msg, "Got msg");
|
||||||
|
let msg = match msg {
|
||||||
|
Ok(msg) => msg,
|
||||||
|
Err(broadcast::error::RecvError::Closed) => {
|
||||||
|
fatal!("Permanent channel closed, terminating")
|
||||||
|
}
|
||||||
|
Err(broadcast::error::RecvError::Lagged(_)) => {
|
||||||
|
error!("Channel lagged");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
if let Err(e) = self.handle_msg(&msg).await {
|
||||||
|
error!(error = ?e, msg=?&msg, "Error handling message");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl InfluxTarget {
|
||||||
|
#[instrument(skip_all)]
|
||||||
|
async fn handle_source_report(
|
||||||
|
&self,
|
||||||
|
sr: &SourceReport,
|
||||||
|
) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
debug!("Handling source report {}", sr.name);
|
||||||
|
let mut dps = Vec::new();
|
||||||
|
for metric_set in &sr.details.to_metrics() {
|
||||||
|
let mut builder = DataPoint::builder(&sr.name);
|
||||||
|
builder = self
|
||||||
|
.config
|
||||||
|
.tags
|
||||||
|
.iter()
|
||||||
|
.fold(builder, |builder, (k, v)| builder.tag(k, v));
|
||||||
|
builder = metric_set
|
||||||
|
.tags
|
||||||
|
.iter()
|
||||||
|
.fold(builder, |builder, (k, v)| builder.tag(*k, v));
|
||||||
|
builder = metric_set.metrics.iter().fold(builder, |builder, metric| {
|
||||||
|
builder.field(metric.name, metric.value)
|
||||||
|
});
|
||||||
|
dps.push(builder.build()?);
|
||||||
|
}
|
||||||
|
debug!("Sending {} datapoints to influx", dps.len());
|
||||||
|
timeout(
|
||||||
|
self.config.timeout,
|
||||||
|
self.influx.write(&self.config.bucket, stream::iter(dps)),
|
||||||
|
)
|
||||||
|
.await??;
|
||||||
|
debug!("All datapoints sent");
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
async fn handle_msg(&self, msg: &ChimemonMessage) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
debug!(msg = ?msg, "Handling msg");
|
||||||
|
match msg {
|
||||||
|
ChimemonMessage::TimeReport(_tr) => Ok(()),
|
||||||
|
ChimemonMessage::SourceReport(sr) => self.handle_source_report(sr).await,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,2 +1,2 @@
|
|||||||
pub mod chrony_refclock;
|
pub mod chrony_refclock;
|
||||||
// pub mod influx;
|
pub mod influx;
|
||||||
|
|||||||
Reference in New Issue
Block a user