#[cfg(unix)] mod unix { use futures::stream::StreamExt; use std::{fs, time::Duration}; use tokio::{ io::split, net::{UnixListener, UnixStream}, time::sleep, }; use tokio_util::codec::{length_delimited::LengthDelimitedCodec, FramedRead, FramedWrite}; async fn uds_server() { let _ = fs::remove_file("/tmp/chmux_test"); let listener = UnixListener::bind("/tmp/chmux_test").unwrap(); let (socket, _) = listener.accept().await.unwrap(); let (socket_rx, socket_tx) = split(socket); let framed_tx = FramedWrite::new(socket_tx, LengthDelimitedCodec::new()); let framed_rx = FramedRead::new(socket_rx, LengthDelimitedCodec::new()); let framed_rx = framed_rx.map(|data| data.map(|b| b.freeze())); let mux_cfg = chmux::Cfg::default(); let (mux, _, mut server) = chmux::Multiplexer::new(&mux_cfg, framed_tx, framed_rx).await.unwrap(); let mux_run = tokio::spawn(async move { mux.run().await.unwrap() }); while let Some((mut tx, mut rx)) = server.accept().await.unwrap() { println!("Server accepting request"); tx.send("Hi from server".into()).await.unwrap(); println!("Server sent Hi message"); println!("Server dropping sender"); drop(tx); println!("Server dropped sender"); while let Some(msg) = rx.recv().await.unwrap() { println!("Server received: {}", String::from_utf8_lossy(&Vec::from(msg))); } } println!("Waiting for server mux to terminate..."); mux_run.await.unwrap(); } async fn uds_client() { let socket = UnixStream::connect("/tmp/chmux_test").await.unwrap(); let (socket_rx, socket_tx) = split(socket); let framed_tx = FramedWrite::new(socket_tx, LengthDelimitedCodec::new()); let framed_rx = FramedRead::new(socket_rx, LengthDelimitedCodec::new()); let framed_rx = framed_rx.map(|data| data.map(|b| b.freeze())); let mux_cfg = chmux::Cfg::default(); let (mux, client, _) = chmux::Multiplexer::new(&mux_cfg, framed_tx, framed_rx).await.unwrap(); let mux_run = tokio::spawn(async move { mux.run().await.unwrap() }); { let client = client; println!("Client connecting..."); let (mut tx, mut rx) = client.connect().await.unwrap(); println!("Client connected"); tx.send("Hi from client".into()).await.unwrap(); println!("Client sent Hi message"); println!("Client dropping sender"); drop(tx); println!("Client dropped sender"); while let Some(msg) = rx.recv().await.unwrap() { println!("Client received: {}", String::from_utf8_lossy(&Vec::from(msg))); } println!("Client closing connection..."); } println!("Waiting for client mux to terminate..."); mux_run.await.unwrap(); } #[tokio::test] async fn uds_test() { env_logger::init(); println!("Starting server task..."); let server_task = tokio::spawn(uds_server()); sleep(Duration::from_millis(100)).await; println!("String client thread..."); let client_task = tokio::spawn(uds_client()); println!("Waiting for server task..."); server_task.await.unwrap(); println!("Waiting for client thread..."); client_task.await.unwrap(); } }