use async_trait::async_trait; use bitflags::bitflags; use byteorder::{BigEndian, ReadBytesExt}; use bytes::{Buf, BytesMut}; use chimemon::{ ChimemonMessage, ChimemonSource, ChimemonSourceChannel, Config, TimeReport, UCCMConfig, }; use chrono::{Duration, NaiveDateTime, Utc}; use log::{debug, info, warn}; use std::io::Cursor; use std::str; use std::sync::Arc; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, ReadHalf, WriteHalf}; use tokio::join; use tokio::sync::Mutex; use tokio::time::{interval, sleep, Interval}; use tokio_serial::{SerialPort, SerialStream}; pub const GPS_EPOCH: i64 = 315964800; // Doesn't seem possible to have a const DateTime object pub type UccmEndian = BigEndian; pub enum UCCMMonitorParseState { Idle, ReadStatus, ReadLoopDiag, ReadTOD, } pub struct UCCMMonitor { // pub port: SerialStream, rx: ReadHalf, tx: WriteHalf, pub info: Option, config: UCCMConfig, } #[derive(Debug)] pub struct UCCMTODReport { pub time: NaiveDateTime, // TAI timestamp pub leaps: i8, pub flags: UCCMFlags, } bitflags! { pub struct UCCMFlags: u32 { const OSC_LOCK = (1<<29); const LEAP_FLAG = (1<<25); const INIT_UNLOCK = (1<<24); const INIT_NO_SATS = (1<<19); const HAVE_GPS_TIME = (1<<18); const POWER_FAIL = (1<<17); const MODEL_SYMMETRICOM = (1<<15); const MODEL_TRIMBLE = (1<<14); const NO_GPS_SYNC = (1<<11); const NO_GPS_SYNC2 = (1<<9); const MODEL_SYMMETRICOM2 = (1<<7); const MODEL_TRIMBLE2 = (1<<6); const NO_ANT = (1<<5); const GPS_LOS = (1<<4); } } /* FLAGS: MSB 31 x 30 ALWAYS 1 29 oscillator locked 28..26 x 25 leap flag 24 initial unlock 23..20 x 19 initialized no sats (trimble) 18 have gps time flag 17 power fail 16 x 15..14 model/protocol ( 01 = trimble 10 = symmetricom ) 13..12 x 11 no gps sync 10 always 1 9 no gps sync 8 always 1 7..6 model/protocol ( 01 = trimble 10 = symmetricom ) 5 no ant 4 gps los LSB 3..0 x */ #[derive(Debug)] pub struct GPSSVInfo { pub tracked: bool, pub in_view: bool, pub prn: u8, pub elevation: usize, pub azimuth: usize, pub cn0: usize, } #[derive(Debug)] pub struct UCCMStatusReport { pub tfom: u8, pub ffom: u8, pub gpsPhase: f32, pub gpsPPSValid: bool, pub gpsSVs: [GPSSVInfo; 32], pub gpsTime: NaiveDateTime, pub antVoltage: f32, pub antCurrent: f32, pub temp: f32, pub efcDac: u32, pub freqError: f32, } pub struct UCCMInfo { pub vendor: String, pub model: String, pub serial: String, pub version: String, } impl TryFrom<&[u8]> for UCCMTODReport { type Error = String; fn try_from(strbuf: &[u8]) -> Result { debug!("TOD buffer: `{:#?}`", String::from_utf8(strbuf.to_vec())); let resp: Vec = strbuf .split(|c| *c == ' ' as u8) .map(|x| u8::from_str_radix(str::from_utf8(x).unwrap(), 16).unwrap()) .collect(); let mut rdr = Cursor::new(resp); // Sync flag let c = rdr.read_u16::().unwrap(); if c != 0xc500 { return Err(format!("Missing start delimter (got: `{}`)", c)); } // Consume 25 unknown bytes rdr.advance(25); let time = rdr.read_u32::().unwrap(); rdr.advance(1); // consume padding let leaps = rdr.read_i8().unwrap(); let flags = rdr.read_u32::().unwrap(); debug!("Flags: {}", flags); rdr.advance(6); // Consume padding and checksum, don't check it let c = rdr.read_u8().unwrap(); if c != 0xca { return Err(format!("Missing end delimiter (got: `{}`)", c)); } debug!("TOD time: {} leaps: {} flags: {}", time, leaps, flags); Ok(UCCMTODReport { time: NaiveDateTime::from_timestamp_opt(GPS_EPOCH + time as i64, 0).unwrap(), leaps, flags: UCCMFlags::from_bits_truncate(flags), }) } } impl UCCMMonitor { pub fn new(config: Config) -> Self { let builder = tokio_serial::new(&config.sources.uccm.port, config.sources.uccm.baud) .timeout(config.sources.uccm.timeout) .data_bits(tokio_serial::DataBits::Eight) .parity(tokio_serial::Parity::None) .stop_bits(tokio_serial::StopBits::One) .flow_control(tokio_serial::FlowControl::None); let mut port = SerialStream::open(&builder).expect("Must be able to open serial port"); port.set_exclusive(true).expect("Can't lock serial port"); info!( "Opened serial port {}@{}", port.name().unwrap(), port.baud_rate().unwrap() ); let (rx, tx) = tokio::io::split(port); UCCMMonitor { // port, rx, tx, info: None, config: config.sources.uccm, } } pub async fn send_cmd(&mut self, cmd: &[u8]) -> Result { debug!("cmd: `{:?}`", String::from_utf8_lossy(cmd)); self.tx.write_all(cmd).await; self.tx.write(&[b'\n']).await; let mut reader = BufReader::new(&mut self.rx); let mut resp = String::new(); while !resp.contains("UCCM>") { let mut buf = Vec::new(); reader.read_until(b'>', &mut buf).await; resp.push_str(&String::from_utf8_lossy(&buf)); } // Remove the command we sent from the response resp.replace_range(0..resp.find('\n').unwrap_or(0) + 1, ""); resp = resp.replace("\r", ""); debug!("cmd response: `{:?}`", resp); Ok(resp) } pub async fn get_info(&mut self) -> Result<(), std::io::Error> { self.send_cmd(b"SCPI").await.unwrap_or_default(); self.send_cmd(b"TOD DI").await.unwrap_or_default(); sleep(tokio::time::Duration::from_secs(1)).await; let resp = self.send_cmd(b"*IDN?").await.unwrap_or_default(); let info: Vec<&str> = resp.lines().next().unwrap().split(',').collect(); debug!("Response length: {}", info.len()); if info.len() != 4 { return Err(std::io::Error::new( std::io::ErrorKind::Unsupported, "Unexpected *IDN? response", )); } self.info = Some(UCCMInfo { vendor: info[0].into(), model: info[1].into(), serial: info[2].into(), version: info[3].into(), }); info!( "Found {} {} s/n: {} version: {}", self.info.as_ref().unwrap().vendor, self.info.as_ref().unwrap().model, self.info.as_ref().unwrap().serial, self.info.as_ref().unwrap().version ); Ok(()) } } async fn rx_loop( mut rx: ReadHalf, chan: ChimemonSourceChannel, state: Arc>, ) { let mut rdbuf = BytesMut::with_capacity(1024); loop { match tokio::io::AsyncReadExt::read_buf(&mut rx, &mut rdbuf).await { Ok(n) => { if n == 0 { continue; } } Err(_) => continue, } match *state.lock().await { UCCMMonitorParseState::Idle => { while !rdbuf.starts_with(b"c5 00") && rdbuf.remaining() > 0 { rdbuf.advance(1); } if rdbuf.len() < (44 * 2 + 43) { // TOD frame is 44 bytes, plus 43 spaces continue; }; let frame = rdbuf.split_to(44 * 2 + 43); match UCCMTODReport::try_from(&frame[..]) { Ok(tod) => { let sysnow = Utc::now().naive_utc(); let offset = tod.time - Duration::seconds(tod.leaps as i64) - sysnow; debug!("System time: {:#?} GPS time: {:#?} Leaps: {:#?}", sysnow, tod.time, tod.leaps); debug!("TOD offset: {}ms", offset.num_milliseconds()); info!("{:#?}", tod); let valid = tod.leaps > 0 && tod .flags .contains(UCCMFlags::OSC_LOCK | UCCMFlags::HAVE_GPS_TIME); chan.send(ChimemonMessage::TimeReport(TimeReport { system_time: sysnow, offset, leaps: tod.leaps as isize, leap_flag: if tod.flags.contains(UCCMFlags::LEAP_FLAG) { true } else { false }, valid, })) .expect("Unable to send to channel"); } Err(e) => { warn!("Unable to parse TOD frame: {}", e); rdbuf.clear(); } } } UCCMMonitorParseState::ReadStatus => todo!(), UCCMMonitorParseState::ReadLoopDiag => todo!(), UCCMMonitorParseState::ReadTOD => todo!(), } } } #[async_trait] impl ChimemonSource for UCCMMonitor { async fn run(mut self, chan: ChimemonSourceChannel) { info!("UCCM task starting"); if self.get_info().await.is_err() { warn!("Error starting UCCM"); return; } self.send_cmd(b"SCPI").await.unwrap(); self.send_cmd(b"SYSTem:PRESet").await.unwrap(); self.send_cmd(b"OUTPut:TP:SELection PP1S").await.unwrap(); self.send_cmd(b"TESTMODE EN").await.unwrap(); self.send_cmd(b"TOD EN").await.unwrap(); let state = Arc::new(Mutex::::new( UCCMMonitorParseState::Idle, )); let rx_handle = tokio::spawn(rx_loop(self.rx, chan.clone(), state.clone())); // let tx_handle = tokio::spawn(async move { // let mut interval = interval(self.config.status_interval); // loop { // interval.tick().await; // let wfut = self.tx.write_all(b"SYST:STAT?\n"); // *state.lock().await = UCCMMonitorParseState::ReadStatus; // wfut.await; // } // }); join!(rx_handle); } }