use std::any::type_name; use std::str::FromStr; use std::sync::Arc; use async_trait::async_trait; use bitflags::bitflags; use itertools::Itertools; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, ReadHalf, WriteHalf}; use tokio::select; use tokio::sync::OnceCell; use tokio::time::{interval, timeout}; use tokio_serial::{SerialPort, SerialStream}; use tokio_util::sync::CancellationToken; use tracing::{debug, debug_span, error, info, instrument, warn}; use crate::{ ChimemonMessage, ChimemonSource, ChimemonSourceChannel, MetricTags, Prs10Config, SourceMetric, SourceReport, SourceReportDetails, SourceStatus, fatal, }; #[derive(Debug)] pub struct Prs10Info { pub model: String, pub version: String, pub serial: String, } impl TryFrom<&[u8]> for Prs10Info { type Error = Box; fn try_from(value: &[u8]) -> Result { let parts = value.splitn(3, |c| *c == b'_'); let (model, version, serial) = parts .collect_tuple() .ok_or("Not enough parts in ID response")?; Ok(Self { model: str::from_utf8(model)?.to_string(), version: str::from_utf8(version)?.to_string(), serial: str::from_utf8(serial)?.to_string(), }) } } bitflags! { #[derive(Copy, Clone, Debug, Eq, PartialEq)] pub struct Prs10PowerLampFlags: u8 { const ELEC_VOLTAGE_LOW = (1<<0); const ELEC_VOLTAGE_HIGH = (1<<1); const HEAT_VOLTAGE_LOW = (1<<2); const HEAT_VOLTAGE_HIGH = (1<<3); const LAMP_LIGHT_LOW = (1<<4); const LAMP_LIGHT_HIGH = (1<<5); const GATE_VOLTAGE_LOW = (1<<6); const GATE_VOLTAGE_HIGH = (1<<7); } } impl Prs10PowerLampFlags { pub fn get_metrics(&self, tags: Arc) -> Vec { // Define the mapping statically const FLAG_LABELS: [(&Prs10PowerLampFlags, &str); 8] = [ (&Prs10PowerLampFlags::ELEC_VOLTAGE_LOW, "elec_voltage_low"), (&Prs10PowerLampFlags::ELEC_VOLTAGE_HIGH, "elec_voltage_high"), (&Prs10PowerLampFlags::HEAT_VOLTAGE_LOW, "heat_voltage_low"), (&Prs10PowerLampFlags::HEAT_VOLTAGE_HIGH, "heat_voltage_high"), (&Prs10PowerLampFlags::LAMP_LIGHT_LOW, "lamp_light_low"), (&Prs10PowerLampFlags::LAMP_LIGHT_HIGH, "lamp_light_high"), (&Prs10PowerLampFlags::GATE_VOLTAGE_LOW, "gate_voltage_low"), (&Prs10PowerLampFlags::GATE_VOLTAGE_HIGH, "gate_voltage_high"), ]; // Generate metrics based on flag availability FLAG_LABELS .iter() .map(|(flag, label)| { // We track whether each flag is set (true) or not (false) SourceMetric::new_bool(*label, self.contains(**flag), tags.clone()) }) .collect() } } bitflags! { #[derive(Copy, Clone, Debug, Eq, PartialEq)] pub struct Prs10RfFlags: u8 { const PLL_UNLOCK = (1<<0); const XTAL_VAR_LOW = (1<<1); const XTAL_VAR_HIGH = (1<<2); const VCO_CTRL_LOW = (1<<3); const VCO_CTRL_HIGH = (1<<4); const AGC_CTRL_LOW = (1<<5); const AGC_CTRL_HIGH = (1<<6); const PLL_BAD_PARAM = (1<<7); } } bitflags! { #[derive(Copy, Clone, Debug, Eq, PartialEq)] pub struct Prs10TempFlags: u8 { const LAMP_TEMP_LOW = (1<<0); const LAMP_TEMP_HIGH = (1<<1); const XTAL_TEMP_LOW = (1<<2); const XTAL_TEMP_HIGH = (1<<3); const CELL_TEMP_LOW = (1<<4); const CELL_TEMP_HIGH = (1<<5); const CASE_TEMP_LOW = (1<<6); const CASE_TEMP_HIGH = (1<<7); } } bitflags! { #[derive(Copy, Clone, Debug, Eq, PartialEq)] pub struct Prs10FllFlags: u8 { const FLL_OFF = (1<<0); const FLL_DISABLED = (1<<1); const EFC_HIGH = (1<<2); const EFC_LOW = (1<<3); const CAL_VOLTAGE_HIGH = (1<<4); const CAL_VOLTAGE_LOW = (1<<5); } } bitflags! { #[derive(Copy, Clone, Debug, Eq, PartialEq)] pub struct Prs10PpsFlags: u8 { const PLL_DISABLED = (1<<0); const PPS_WARMUP = (1<<1); const PLL_ACTIVE = (1<<2); const PPS_BAD = (1<<3); const PPS_INTERVAL_LONG = (1<<4); const PLL_RESTART = (1<<5); const PLL_SATURATED = (1<<6); const PPS_MISSING = (1<<7); } } bitflags! { #[derive(Copy, Clone, Debug, Eq, PartialEq)] pub struct Prs10SystemFlags: u8 { const LAMP_RESTART = (1<<0); const WDT_RESET = (1<<1); const BAD_INT_VECTOR = (1<<2); const EEPROM_WRITE_FAIL = (1<<3); const EEPROM_CORRUPT = (1<<4); const BAD_COMMAND = (1<<5); const BAD_COMMAND_PARAM = (1<<6); const SYSTEM_RESET = (1<<7); } } #[derive(Debug, Eq, PartialEq, Clone)] pub struct Prs10Status { pub volt_lamp_flags: Prs10PowerLampFlags, pub rf_flags: Prs10RfFlags, pub temp_flags: Prs10TempFlags, pub fll_flags: Prs10FllFlags, pub pps_flags: Prs10PpsFlags, pub system_flags: Prs10SystemFlags, } impl Default for Prs10Status { fn default() -> Self { Self { volt_lamp_flags: Prs10PowerLampFlags::empty(), rf_flags: Prs10RfFlags::empty(), temp_flags: Prs10TempFlags::empty(), fll_flags: Prs10FllFlags::empty(), pps_flags: Prs10PpsFlags::empty(), system_flags: Prs10SystemFlags::empty(), } } } impl SourceReportDetails for Prs10Status { fn is_healthy(&self) -> bool { const HEALTHY_PPS: Prs10PpsFlags = Prs10PpsFlags::from_bits(4).unwrap(); self.volt_lamp_flags.is_empty() && self.rf_flags.is_empty() && self.temp_flags.is_empty() && self.fll_flags.is_empty() && self.pps_flags == HEALTHY_PPS } fn to_metrics(&self) -> Vec { let tags = Arc::new(vec![]); vec![ SourceMetric::new_int( "volt_lamp_flags", self.volt_lamp_flags.bits() as i64, tags.clone(), ), SourceMetric::new_int("rf_flags", self.rf_flags.bits() as i64, tags.clone()), SourceMetric::new_int("temp_flags", self.temp_flags.bits() as i64, tags.clone()), SourceMetric::new_int("fll_flags", self.fll_flags.bits() as i64, tags.clone()), SourceMetric::new_int("pps_flags", self.pps_flags.bits() as i64, tags.clone()), // system flags are kind of useless because we can't guarantee they get upstreamed and will only appear once since they are 'event flags' ] } } impl TryFrom<&[u8]> for Prs10Status { type Error = Box; fn try_from(value: &[u8]) -> Result { let (volt_lamp_flags, rf_flags, temp_flags, fll_flags, pps_flags, system_flags) = value .splitn(6, |c| *c == b',') .map(|s| str::from_utf8(s).unwrap().parse::()) .collect_tuple() .ok_or("Not enough parts in ST reply")?; let volt_lamp_flags = volt_lamp_flags?; let rf_flags = rf_flags?; let temp_flags = temp_flags?; let fll_flags = fll_flags?; let pps_flags = pps_flags?; let system_flags = system_flags?; Ok(Self { volt_lamp_flags: Prs10PowerLampFlags::from_bits(volt_lamp_flags).ok_or_else(|| { format!("Invalid bits set ({volt_lamp_flags}) for power/lamp flags") })?, rf_flags: Prs10RfFlags::from_bits(rf_flags) .ok_or_else(|| format!("Invalid bits set ({rf_flags}) for RF flags"))?, temp_flags: Prs10TempFlags::from_bits(temp_flags) .ok_or_else(|| format!("Invalid bits set ({temp_flags}) for temp flags"))?, fll_flags: Prs10FllFlags::from_bits(fll_flags) .ok_or_else(|| format!("Invalid bits set ({fll_flags}) for FLL flags"))?, pps_flags: Prs10PpsFlags::from_bits(pps_flags) .ok_or_else(|| format!("Invalid bits set ({pps_flags}) for PPS flags"))?, system_flags: Prs10SystemFlags::from_bits(system_flags) .ok_or_else(|| format!("Invalid bits set ({system_flags}) for system flags"))?, }) } } #[derive(Debug, Clone)] pub struct Prs10Stats { pub ocxo_efc: u32, pub error_signal_volts: f64, pub detect_signal_volts: f64, pub freq_offset_ppt: i16, pub mag_efc: u16, pub heat_volts: f64, pub elec_volts: f64, pub lamp_fet_drain_volts: f64, pub lamp_fet_gate_volts: f64, pub ocxo_heat_volts: f64, pub cell_heat_volts: f64, pub lamp_heat_volts: f64, pub rb_photo: f64, pub rb_photo_iv: f64, pub case_temp: f64, pub ocxo_therm: f64, pub cell_therm: f64, pub lamp_therm: f64, pub ext_cal_volts: f64, pub analog_gnd_volts: f64, pub if_vco_varactor_volts: f64, pub op_vco_varactor_volts: f64, pub mul_amp_gain_volts: f64, pub rf_lock_volts: f64, } impl SourceReportDetails for Prs10Stats { fn is_healthy(&self) -> bool { true } fn to_metrics(&self) -> Vec { let tags = Arc::new(vec![]); vec![ // Integer Metrics SourceMetric::new_int("ocxo_efc", self.ocxo_efc as i64, tags.clone()), // Float Metrics SourceMetric::new_float("error_signal_volts", self.error_signal_volts, tags.clone()), SourceMetric::new_float( "detect_signal_volts", self.detect_signal_volts, tags.clone(), ), SourceMetric::new_float("heat_volts", self.heat_volts, tags.clone()), SourceMetric::new_float("elec_volts", self.elec_volts, tags.clone()), SourceMetric::new_float( "lamp_fet_drain_volts", self.lamp_fet_drain_volts, tags.clone(), ), SourceMetric::new_float( "lamp_fet_gate_volts", self.lamp_fet_gate_volts, tags.clone(), ), SourceMetric::new_float("ocxo_heat_volts", self.ocxo_heat_volts, tags.clone()), SourceMetric::new_float("cell_heat_volts", self.cell_heat_volts, tags.clone()), SourceMetric::new_float("lamp_heat_volts", self.lamp_heat_volts, tags.clone()), SourceMetric::new_float("rb_photo", self.rb_photo, tags.clone()), SourceMetric::new_float("rb_photo_iv", self.rb_photo_iv, tags.clone()), SourceMetric::new_float("case_temp", self.case_temp, tags.clone()), SourceMetric::new_float("ocxo_therm", self.ocxo_therm, tags.clone()), SourceMetric::new_float("cell_therm", self.cell_therm, tags.clone()), SourceMetric::new_float("lamp_therm", self.lamp_therm, tags.clone()), SourceMetric::new_float("ext_cal_volts", self.ext_cal_volts, tags.clone()), SourceMetric::new_float("analog_gnd_volts", self.analog_gnd_volts, tags.clone()), SourceMetric::new_float( "if_vco_varactor_volts", self.if_vco_varactor_volts, tags.clone(), ), SourceMetric::new_float( "op_vco_varactor_volts", self.op_vco_varactor_volts, tags.clone(), ), SourceMetric::new_float("mul_amp_gain_volts", self.mul_amp_gain_volts, tags.clone()), SourceMetric::new_float("rf_lock_volts", self.rf_lock_volts, tags.clone()), // U16 Metrics (optional, but can be treated as integers) SourceMetric::new_int("freq_offset_ppt", self.freq_offset_ppt as i64, tags.clone()), SourceMetric::new_int("mag_efc", self.mag_efc as i64, tags.clone()), ] } } #[derive(Debug)] pub struct Prs10Monitor { name: String, config: Prs10Config, rx: ReadHalf, tx: WriteHalf, info: OnceCell, } impl Prs10Monitor { pub fn info(&self) -> &Prs10Info { self.info.get().expect("info() used before run()") } #[instrument(level = "debug", skip_all, fields(cmd = String::from_utf8_lossy(cmd).to_string()))] pub async fn cmd_response(&mut self, cmd: &[u8]) -> Result, std::io::Error> { self.tx.write_all(cmd).await.unwrap(); self.tx.write_u8(b'\r').await.unwrap(); let mut reader = BufReader::new(&mut self.rx); let mut buf = Vec::new(); let read = timeout(self.config.timeout, reader.read_until(b'\r', &mut buf)).await??; buf.truncate(buf.len() - 1); // strip "\r" debug!( "raw response: ({read}) `{}`", str::from_utf8(&buf).unwrap_or("") ); Ok(buf) } async fn set_info(&mut self) -> Result<(), Box> { let id = self.get_id().await?; self.info.set(id)?; debug!("Set info to {:?}", self.info); Ok(()) } pub async fn get_status(&mut self) -> Result> { debug!("Getting status"); let resp = self.cmd_response(b"ST?").await?; let status = resp.as_slice().try_into(); debug!("Got: {status:?}"); status } pub async fn get_id(&mut self) -> Result> { debug!("Getting identity"); let resp = self.cmd_response(b"ID?").await?; let id = resp.as_slice().try_into(); debug!("Got: {id:?}"); id } pub async fn get_analog(&mut self, id: u16) -> Result> { let mut cmd = b"AD".to_vec(); cmd.extend_from_slice(id.to_string().as_bytes()); cmd.push(b'?'); let value = self.get_parsed(&cmd).await?; debug!("Got: {value}"); Ok(value) } pub async fn get_parsed( &mut self, cmd: &[u8], ) -> Result> where T::Err: std::error::Error + 'static, { debug!( "Getting parsed <{}> value for command {}", type_name::(), str::from_utf8(cmd).unwrap_or(""), ); let resp = self.cmd_response(cmd).await?; let val = str::from_utf8(&resp)?.parse::()?; Ok(val) } pub async fn get_ocxo_efc(&mut self) -> Result> { debug!("Getting u16,u16 -> u32 for OCXO EFC value"); let resp = self.cmd_response(b"FC?").await?; let values = resp .splitn(2, |c| *c == b',') .map(|s| str::from_utf8(s).unwrap().parse::()) .collect_tuple() .ok_or("Not enough values in response to FC?")?; if let (Ok(high), Ok(low)) = values { Ok((high as u32) << 8 | low as u32) } else { Err("Unparseable numbers in response to FC?".into()) } } pub async fn get_detected_signals(&mut self) -> Result<(f64, f64), Box> { debug!("Getting i16,i16 -> f64,f64 detected signals pair"); let resp = self.cmd_response(b"DS?").await?; let (error, signal) = resp .splitn(2, |c| *c == b',') .map(|s| str::from_utf8(s).unwrap().parse::()) .collect_tuple() .ok_or("Not enough values in response to DS?".to_string())?; Ok((error? as f64 * 0.15e-6, signal? as f64 * 0.001)) } #[instrument(skip_all)] async fn status_poll(&mut self) -> Result> { let status = self.get_status().await?; Ok(ChimemonMessage::SourceReport(SourceReport { name: self.name.clone(), status: if status.is_healthy() { SourceStatus::Healthy } else { SourceStatus::Unknown }, details: Arc::new(status), })) } #[instrument(skip_all)] async fn stats_poll(&mut self) -> Result> { const ANALOG_SCALING: [f64; 20] = [ 0.0, 10.0, 10.0, 10.0, 10.0, 1.0, 1.0, 1.0, 1.0, 4.0, 100.0, 1.0, 1.0, 1.0, 1.0, 1.0, 4.0, 4.0, 4.0, 1.0, ]; let stats_span = debug_span!("get_stats_serial"); let stats_guard = stats_span.enter(); let ocxo_efc = self.get_ocxo_efc().await?; let (error_signal_volts, detect_signal_volts) = self.get_detected_signals().await?; let freq_offset_ppt = self.get_parsed(b"SF?").await?; let mag_efc = self.get_parsed(b"MR?").await?; let mut analog_values = [0.0; 20]; for i in 1u16..=19 { analog_values[i as usize] = self.get_analog(i).await? * ANALOG_SCALING[i as usize] } drop(stats_guard); Ok(ChimemonMessage::SourceReport(SourceReport { name: self.name.clone(), status: SourceStatus::Unknown, details: Arc::new(Prs10Stats { ocxo_efc, error_signal_volts, detect_signal_volts, freq_offset_ppt, mag_efc, heat_volts: analog_values[1], elec_volts: analog_values[2], lamp_fet_drain_volts: analog_values[3], lamp_fet_gate_volts: analog_values[4], ocxo_heat_volts: analog_values[5], cell_heat_volts: analog_values[6], lamp_heat_volts: analog_values[7], rb_photo: analog_values[8], rb_photo_iv: analog_values[9], case_temp: analog_values[10], ocxo_therm: analog_values[11], cell_therm: analog_values[12], lamp_therm: analog_values[13], ext_cal_volts: analog_values[14], analog_gnd_volts: analog_values[15], if_vco_varactor_volts: analog_values[16], op_vco_varactor_volts: analog_values[17], mul_amp_gain_volts: analog_values[18], rf_lock_volts: analog_values[19], }), })) } } #[async_trait] impl ChimemonSource for Prs10Monitor { type Config = Prs10Config; fn new(name: &str, config: Self::Config) -> Self { let builder = tokio_serial::new(&config.port, config.baud) .timeout(config.timeout) .data_bits(tokio_serial::DataBits::Eight) .parity(tokio_serial::Parity::None) .stop_bits(tokio_serial::StopBits::One) .flow_control(tokio_serial::FlowControl::None); let mut port = SerialStream::open(&builder).unwrap_or_else(|e| { fatal!( "Failed to open serial port `{}` ({})", config.port, e.to_string() ) }); port.set_exclusive(true).unwrap_or_else(|e| { fatal!( "Can't lock serial port `{}` ({})", config.port, e.to_string() ) }); info!( "Opened serial port {}@{}", port.name().unwrap(), port.baud_rate().unwrap() ); let (rx, tx) = tokio::io::split(port); Self { name: name.to_owned(), config, rx, tx, info: OnceCell::new(), } } async fn run(mut self, chan: ChimemonSourceChannel, cancel: CancellationToken) { info!("PRS10 task started"); if let Err(e) = self.reset_rx_state().await { error!(error = ?e, "Error clearing PRS10 RX state"); return; } if let Err(e) = self.set_info().await { error!(error = ?e, "Error starting PRS10"); return; } info!( "Connected to PRS10 model: {} version: {} serial: {}", self.info().model, self.info().version, self.info().serial ); let mut status_timer = interval(self.config.status_interval); let mut stats_timer = interval(self.config.stats_interval); loop { let msg = select! { _ = cancel.cancelled() => { return; }, _ = status_timer.tick() => { self.status_poll().await }, _ = stats_timer.tick() => { self.stats_poll().await } }; match msg { Ok(msg) => { if let Err(e) = chan.send(msg) { error!("Unable to send to channel {e}") } } Err(e) => error!("Error in poll task: {e}"), } } } } mod tests { use crate::sources::prs10::{Prs10Info, Prs10PowerLampFlags, Prs10PpsFlags, Prs10Status}; #[test] fn test_info_parse() -> Result<(), Box> { const INFO_VECTOR: &[u8] = b"PRS10_3.15_SN_12345"; let info: Prs10Info = INFO_VECTOR.try_into()?; assert_eq!(info.model, "PRS10"); assert_eq!(info.version, "3.15"); assert_eq!(info.serial, "SN_12345"); Ok(()) } #[test] fn test_status_parse() -> Result<(), Box> { //TODO: Add vectors for some more complicated state const STATUS_VECTOR1: &[u8] = b"0,0,0,0,4,0"; let status: Prs10Status = STATUS_VECTOR1.try_into()?; let mut expect = Prs10Status::default(); expect.pps_flags.set(Prs10PpsFlags::PLL_ACTIVE, true); assert_eq!(status, expect); Ok(()) } }