use futures::{ready, SinkExt, StreamExt}; use serialport::SerialPort; use slip_codec::tokio::SlipCodec; use std::io::{Read, Write}; use std::task::Context; use tokio::io::unix::AsyncFd; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tokio::macros::support::{Pin, Poll}; type Result = std::result::Result; struct AsyncTTYPort { inner: AsyncFd, } impl AsyncTTYPort { pub fn new(mut inner: serialport::TTYPort) -> Result { const ZERO: std::time::Duration = std::time::Duration::from_secs(0); inner.set_timeout(ZERO)?; Ok(Self { inner: AsyncFd::new(inner)?, }) } pub fn pair() -> std::io::Result<(AsyncTTYPort, AsyncTTYPort)> { let (a, b) = serialport::TTYPort::pair()?; let a = AsyncTTYPort::new(a)?; let b = AsyncTTYPort::new(b)?; Ok((a, b)) } } impl AsyncRead for AsyncTTYPort { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll> { let mut guard = ready!(self.inner.poll_read_ready_mut(cx))?; match guard.try_io(|inner| { let read = inner.get_mut().read(buf.initialize_unfilled())?; buf.advance(read); Ok(()) }) { Ok(value) => Poll::Ready(value), Err(_) => Poll::Pending, } } } impl AsyncWrite for AsyncTTYPort { fn poll_write( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { let mut guard = ready!(self.inner.poll_write_ready_mut(cx))?; match guard.try_io(|io| io.get_mut().write(buf)) { Ok(value) => Poll::Ready(value), Err(_) => Poll::Pending, } } fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut guard = ready!(self.inner.poll_write_ready_mut(cx))?; match guard.try_io(|io| io.get_mut().flush()) { Ok(value) => Poll::Ready(value), Err(_) => Poll::Pending, } } fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } } async fn run_source(port: AsyncTTYPort) { let mut sink = tokio_util::codec::Framed::new(port, SlipCodec::new()); for message in ["foo", "bar", "baz"].iter() { let message = message.to_string().into(); println!("send {:?}", message); sink.send(message).await.unwrap(); } } async fn run_sink(port: AsyncTTYPort) { let mut source = tokio_util::codec::Framed::new(port, SlipCodec::new()); loop { if let Some(result) = source.next().await { match result { Ok(message) => println!("recv {:?}", message), Err(_) => break, } } } } #[tokio::main] pub async fn main() -> Result<()> { let (source, sink) = AsyncTTYPort::pair()?; let source_handle = tokio::spawn(run_source(source)); let sink_handle = tokio::spawn(run_sink(sink)); source_handle.await?; sink_handle.await?; Ok(()) }