From d3eab1da12de394a9c37b46598d83e57140c7005 Mon Sep 17 00:00:00 2001 From: Keenan Tims Date: Wed, 4 Feb 2026 11:50:27 -0800 Subject: [PATCH] prs10: improve serial handling --- src/sources/prs10.rs | 30 +++++++++++++++++++++++++----- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/src/sources/prs10.rs b/src/sources/prs10.rs index 9a63ef1..f83de01 100644 --- a/src/sources/prs10.rs +++ b/src/sources/prs10.rs @@ -1,11 +1,14 @@ use std::any::type_name; use std::str::FromStr; use std::sync::Arc; +use std::time::Duration; use async_trait::async_trait; use bitflags::bitflags; use itertools::Itertools; -use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, ReadHalf, WriteHalf}; +use tokio::io::{ + AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, BufWriter, ReadHalf, WriteHalf, +}; use tokio::select; use tokio::sync::OnceCell; use tokio::time::{interval, timeout}; @@ -116,6 +119,7 @@ bitflags! { const EFC_LOW = (1<<3); const CAL_VOLTAGE_HIGH = (1<<4); const CAL_VOLTAGE_LOW = (1<<5); + const _ = !0; } } @@ -322,7 +326,7 @@ pub struct Prs10Monitor { name: String, config: Prs10Config, rx: ReadHalf, - tx: WriteHalf, + tx: BufWriter>, info: OnceCell, } @@ -333,8 +337,9 @@ impl Prs10Monitor { #[instrument(level = "debug", skip_all, fields(cmd = String::from_utf8_lossy(cmd).to_string()))] pub async fn cmd_response(&mut self, cmd: &[u8]) -> Result, std::io::Error> { - self.tx.write_all(cmd).await.unwrap(); - self.tx.write_u8(b'\r').await.unwrap(); + self.tx.write_all(cmd).await?; + self.tx.write_u8(b'\r').await?; + self.tx.flush().await?; let mut reader = BufReader::new(&mut self.rx); let mut buf = Vec::new(); let read = timeout(self.config.timeout, reader.read_until(b'\r', &mut buf)).await??; @@ -485,6 +490,20 @@ impl Prs10Monitor { }), })) } + + async fn reset_rx_state(&mut self) -> Result<(), Box> { + // flush any pending input and potential responses from the receiver side + self.tx.write_u8(b'\r').await?; + self.tx.flush().await?; + let mut discard = vec![]; + loop { + match timeout(Duration::from_millis(100), self.rx.read_buf(&mut discard)).await { + Ok(_) => discard.clear(), + Err(_) => break, + } + } + Ok(()) + } } #[async_trait] @@ -516,8 +535,9 @@ impl ChimemonSource for Prs10Monitor { port.name().unwrap(), port.baud_rate().unwrap() ); - let (rx, tx) = tokio::io::split(port); + let (rx, tx) = tokio::io::split(port); + let tx = BufWriter::new(tx); Self { name: name.to_owned(), config,