initial commit

This commit is contained in:
2025-11-10 18:08:04 -08:00
commit 644af22d0b
4 changed files with 1349 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
/target

1116
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

13
Cargo.toml Normal file
View File

@@ -0,0 +1,13 @@
[package]
name = "gpsd-nmea"
version = "0.1.0"
edition = "2024"
[dependencies]
backoff = { version = "0.4.0", features = ["async-std"] }
clap = { version = "4.5.51", features = ["derive"] }
env_logger = "0.11.8"
futures = "0.3.31"
json = "0.12.4"
log = "0.4.28"
smol = "2.0.2"

219
src/main.rs Normal file
View File

@@ -0,0 +1,219 @@
use std::future;
use std::time::Duration;
use backoff::SystemClock;
use backoff::exponential::ExponentialBackoff;
use clap::{Parser, value_parser};
use env_logger;
use futures::future::{Either, select};
use futures::pin_mut;
use io::BufReader;
use json::{JsonValue, object};
use log::{debug, error, info};
use smol::channel::TrySendError;
use smol::io::AsyncReadExt;
use smol::net::{SocketAddr, TcpListener, TcpStream};
use smol::{Task, channel, io, prelude::*};
type NMEAChanMsg = Vec<u8>;
#[derive(Parser)]
#[command(
version,
about = "An extremely simple program that connects to gpsd and streams NMEA strings to any connected clients"
)]
struct CmdArgs {
/// The gpsd host to connect to
#[arg(short = 'c', long, default_value = "[::1]:2947")]
host: SocketAddr,
/// The listen address to accept connections on
#[arg(long, default_value = "[::1]:2948")]
listen: SocketAddr,
/// The gpsd device to filter for (default all)
#[arg(short, long)]
device: Option<String>,
/// The NMEA 0183 messages to emit (default all)
#[arg(short, long="msg", num_args(1..), value_parser = clap::builder::ValueParser::from(|s: &str| {
let r: Result<[u8;5], _> = if s.len() != 5 || !s.chars().all(|c| c.is_ascii_alphabetic()) {
Err(format!("`{s}` is not a valid NMEA0183 message type (expected 5 letters)"))
} else {
let upper = s.trim().to_ascii_uppercase();
Ok(upper.as_bytes().try_into().unwrap())
};
r
} ))]
msgs: Option<Vec<[u8; 5]>>,
}
enum Error {
NMEAParseError,
JsonParseError(json::JsonError),
UnknownGpsdResponse,
}
#[derive(Debug)]
struct NMEAMsg {
id: [u8; 5],
msg: Vec<u8>,
}
impl TryFrom<&[u8]> for NMEAMsg {
type Error = Error;
fn try_from(value: &[u8]) -> Result<Self, Self::Error> {
if value.len() < 6 || value[0] != b'$' {
Err(Error::NMEAParseError)
} else {
Ok(NMEAMsg {
id: value[1..6].try_into().unwrap(),
msg: value.to_vec(),
})
}
}
}
#[derive(Debug)]
enum GpsdResponse {
JsonResponse(json::JsonValue),
NMEAResponse(NMEAMsg),
}
impl<'a> TryFrom<&str> for GpsdResponse {
type Error = Error;
fn try_from(value: &str) -> Result<Self, Self::Error> {
match value.as_bytes()[0] {
b'{' => Ok(GpsdResponse::JsonResponse(
json::parse(value).map_err(|e| Error::JsonParseError(e))?,
)),
b'$' => Ok(GpsdResponse::NMEAResponse(value.as_bytes().try_into()?)),
_ => Err(Error::UnknownGpsdResponse),
}
}
}
fn build_gpsd_watch_cmd(device: Option<String>) -> String {
let payload = object! {
enable: true,
nmea: true,
device: device,
};
format!("?WATCH={}\n", json::stringify(payload))
}
async fn gpsd_client_loop(
chan: channel::Sender<NMEAChanMsg>,
dest: SocketAddr,
device: Option<String>,
msgs: Option<Vec<[u8; 5]>>,
) -> io::Result<()> {
let watch_cmd = build_gpsd_watch_cmd(device);
loop {
let backoff = ExponentialBackoff::<SystemClock>::default();
let mut conn = backoff::future::retry_notify(
backoff,
|| async { Ok(TcpStream::connect(dest).await?) },
|e, dur: Duration| {
let dur_s = dur.as_secs();
error!("Error connecting to {dest} after {dur_s}s: {e}")
},
)
.await?;
info!("Connected to gpsd service at {}", dest);
conn.write(watch_cmd.as_bytes()).await?;
debug!("Sent command {}", watch_cmd);
let mut lines = BufReader::new(conn.boxed_reader()).lines();
while let Some(Ok(line)) = lines.next().await {
debug!("Got line `{:#?}", line);
let msg = line.as_str().try_into();
if let Ok(msg) = msg {
match msg {
GpsdResponse::JsonResponse(_) => continue,
GpsdResponse::NMEAResponse(mut n) => {
if let Some(msgs) = &msgs {
if !msgs.contains(&n.id) {
continue;
};
}
if !chan.is_closed() {
n.msg.push(b'\r');
n.msg.push(b'\n');
if let Err(e) = chan.try_send(n.msg)
&& !e.is_closed()
{
error!("Error sending to channel: {}", e)
}
}
}
}
}
}
}
}
async fn ptp_server_loop(
mut conn: TcpStream,
chan: channel::Receiver<NMEAChanMsg>,
) -> io::Result<()> {
debug!("Starting server loop for {}", conn.peer_addr()?);
let mut junk_buf = [0u8; 8192];
loop {
let msg_fut = chan.recv();
let read_fut = conn.read(&mut junk_buf);
pin_mut!(msg_fut, read_fut);
match select(msg_fut, read_fut).await {
Either::Left((Ok(msg), _)) => conn.write_all(&msg).await?,
Either::Left((Err(e), _)) => {
error!("Channel closed for {}: {}", conn.peer_addr()?, e);
return Ok(());
}
Either::Right((Ok(n), _)) => {
if n == 0 {
info!("Connection closed by peer {}", conn.peer_addr()?);
return Ok(());
} else {
debug!("Discarded {} bytes from {}", n, conn.peer_addr()?);
// We don't process these bytes, just ignore them
}
}
// TCP read error
Either::Right((Err(e), _)) => {
error!("Error reading from {}: {}", conn.peer_addr()?, e);
return Ok(());
}
}
}
}
fn main() -> io::Result<()> {
env_logger::init();
let cli = CmdArgs::parse();
smol::block_on(async {
let listen_socket = TcpListener::bind(cli.listen).await?;
let addr_s = listen_socket.local_addr()?;
info!("Listening on {addr_s}");
let (nmea_chan_tx, nmea_chan_rx) = channel::unbounded::<NMEAChanMsg>();
smol::spawn(
async move { gpsd_client_loop(nmea_chan_tx, cli.host, cli.device, cli.msgs).await },
)
.detach();
while let Ok((srv, addr)) = listen_socket.accept().await {
info!("Connection from {addr}");
let local_chan = nmea_chan_rx.clone();
smol::spawn(async move {
if let Err(e) = ptp_server_loop(srv, local_chan).await {
error!("Connection task error {}", e);
}
})
.detach();
}
Ok(())
})
}