use { super::util::{NameGen, TestResult}, anyhow::Context, futures::io::{AsyncReadExt, AsyncWriteExt}, interprocess::os::windows::named_pipe::{ tokio::{DuplexMsgPipeStream, PipeListenerOptionsExt}, PipeListenerOptions, PipeMode, }, std::{convert::TryInto, ffi::OsStr, io, sync::Arc, time::Duration}, tokio::{sync::oneshot::Sender, task, time::sleep, try_join}, }; const SERVER_MSG_1: &[u8] = b"Server message 1"; const SERVER_MSG_2: &[u8] = b"Server message 2"; const CLIENT_MSG_1: &[u8] = b"Client message 1"; const CLIENT_MSG_2: &[u8] = b"Client message 2"; pub async fn server(name_sender: Sender, num_clients: u32) -> TestResult { async fn handle_conn(conn: DuplexMsgPipeStream) -> TestResult { let (mut reader, mut writer) = conn.split(); let (mut buf1, mut buf2) = ([0; CLIENT_MSG_1.len()], [0; CLIENT_MSG_2.len()]); let read = async { let read = reader .read(&mut buf1) .await .context("First pipe receive failed")?; assert_eq!(read, CLIENT_MSG_1.len()); assert_eq!(&buf1[0..read], CLIENT_MSG_1); let read = reader .read(&mut buf2) .await .context("Second pipe receive failed")?; assert_eq!(read, CLIENT_MSG_2.len()); assert_eq!(&buf2[0..read], CLIENT_MSG_2); TestResult::Ok(()) }; let write = async { let written = writer .write(SERVER_MSG_1) .await .context("Pipe send failed")?; assert_eq!(written, SERVER_MSG_1.len()); let written = writer .write(SERVER_MSG_2) .await .context("Pipe send failed")?; assert_eq!(written, SERVER_MSG_2.len()); TestResult::Ok(()) }; try_join!(read, write)?; Ok(()) } let (name, listener) = NameGen::new(true) .find_map(|nm| { let rnm: &OsStr = nm.as_ref(); let l = match PipeListenerOptions::new() .name(rnm) .mode(PipeMode::Messages) .create_tokio::() { Ok(l) => l, Err(e) if e.kind() == io::ErrorKind::AddrInUse => return None, Err(e) => return Some(Err(e)), }; Some(Ok((nm, l))) }) .unwrap() .context("Listener bind failed")?; let _ = name_sender.send(name); let mut tasks = Vec::with_capacity(num_clients.try_into().unwrap()); for _ in 0..num_clients { let conn = match listener.accept().await { Ok(c) => c, Err(e) => { eprintln!("Incoming connection failed: {}", e); continue; } }; let task = task::spawn(handle_conn(conn)); tasks.push(task); } for task in tasks { task.await .context("Server task panicked")? .context("Server task returned early with error")?; } Ok(()) } pub async fn client(name: Arc) -> TestResult { let (mut reader, mut writer) = loop { match DuplexMsgPipeStream::connect(name.as_str()) { Err(e) if e.kind() == io::ErrorKind::WouldBlock => { sleep(Duration::from_millis(10)).await; continue; } not_busy => break not_busy, } } .context("Connect failed")? .split(); let (mut buf1, mut buf2) = ([0; SERVER_MSG_1.len()], [0; SERVER_MSG_2.len()]); let read = async { let read = reader .read(&mut buf1) .await .context("First pipe receive failed")?; assert_eq!(read, SERVER_MSG_1.len()); assert_eq!(&buf1[0..read], SERVER_MSG_1); let read = reader .read(&mut buf2) .await .context("Second pipe receive failed")?; assert_eq!(read, SERVER_MSG_2.len()); assert_eq!(&buf2[0..read], SERVER_MSG_2); TestResult::Ok(()) }; let write = async { let written = writer .write(CLIENT_MSG_1) .await .context("First pipe send failed")?; assert_eq!(written, CLIENT_MSG_1.len()); let written = writer .write(CLIENT_MSG_2) .await .context("Second pipe send failed")?; assert_eq!(written, CLIENT_MSG_2.len()); TestResult::Ok(()) }; try_join!(read, write)?; Ok(()) }