use core::future::Future; use std::io; use std::net::{Shutdown, TcpListener, TcpStream, UdpSocket}; use std::sync::Arc; use std::thread; use async_io_mini::Async; use embassy_time::{Duration, Timer}; use futures_lite::{future, prelude::*}; const LOREM_IPSUM: &[u8] = b" Lorem ipsum dolor sit amet, consectetur adipiscing elit. Donec pretium ante erat, vitae sodales mi varius quis. Etiam vestibulum lorem vel urna tempor, eu fermentum odio aliquam. Aliquam consequat urna vitae ipsum pulvinar, in blandit purus eleifend. "; fn spawn( f: impl Future + Send + 'static, ) -> impl Future + Send + 'static { let (s, r) = async_channel::bounded(1); thread::spawn(move || { future::block_on(async { s.send(f.await).await.ok(); }) }); Box::pin(async move { r.recv().await.unwrap() }) } #[test] fn tcp_connect() -> io::Result<()> { future::block_on(async { let listener = Async::::bind(([127, 0, 0, 1], 0))?; let addr = listener.get_ref().local_addr()?; let task = spawn(async move { listener.accept().await }); let stream2 = Async::::connect(addr).await?; let stream1 = task.await?.0; assert_eq!( stream1.get_ref().peer_addr()?, stream2.get_ref().local_addr()?, ); assert_eq!( stream2.get_ref().peer_addr()?, stream1.get_ref().local_addr()?, ); // Now that the listener is closed, connect should fail. let err = Async::::connect(addr).await.unwrap_err(); assert_eq!(err.kind(), io::ErrorKind::ConnectionRefused); Ok(()) }) } #[test] fn tcp_peek_read() -> io::Result<()> { future::block_on(async { let listener = Async::::bind(([127, 0, 0, 1], 0))?; let addr = listener.get_ref().local_addr()?; let mut stream = Async::::connect(addr).await?; stream.write_all(LOREM_IPSUM).await?; let mut buf = [0; 1024]; let mut incoming = Box::pin(listener.incoming()); let mut stream = incoming.next().await.unwrap()?; let n = stream.peek(&mut buf).await?; assert_eq!(&buf[..n], LOREM_IPSUM); let n = stream.read(&mut buf).await?; assert_eq!(&buf[..n], LOREM_IPSUM); Ok(()) }) } #[test] fn tcp_reader_hangup() -> io::Result<()> { future::block_on(async { let listener = Async::::bind(([127, 0, 0, 1], 0))?; let addr = listener.get_ref().local_addr()?; let task = spawn(async move { listener.accept().await }); let mut stream2 = Async::::connect(addr).await?; let stream1 = task.await?.0; let task = spawn(async move { Timer::after(Duration::from_secs(1)).await; drop(stream1); }); while stream2.write_all(LOREM_IPSUM).await.is_ok() {} task.await; Ok(()) }) } #[test] fn tcp_writer_hangup() -> io::Result<()> { future::block_on(async { let listener = Async::::bind(([127, 0, 0, 1], 0))?; let addr = listener.get_ref().local_addr()?; let task = spawn(async move { listener.accept().await }); let mut stream2 = Async::::connect(addr).await?; let stream1 = task.await?.0; let task = spawn(async move { Timer::after(Duration::from_secs(1)).await; drop(stream1); }); let mut v = vec![]; stream2.read_to_end(&mut v).await?; assert!(v.is_empty()); task.await; Ok(()) }) } #[test] fn udp_send_recv() -> io::Result<()> { future::block_on(async { let socket1 = Async::::bind(([127, 0, 0, 1], 0))?; let socket2 = Async::::bind(([127, 0, 0, 1], 0))?; socket1.get_ref().connect(socket2.get_ref().local_addr()?)?; let mut buf = [0u8; 1024]; socket1.send(LOREM_IPSUM).await?; let n = socket2.peek(&mut buf).await?; assert_eq!(&buf[..n], LOREM_IPSUM); let n = socket2.recv(&mut buf).await?; assert_eq!(&buf[..n], LOREM_IPSUM); socket2 .send_to(LOREM_IPSUM, socket1.get_ref().local_addr()?) .await?; let n = socket1.peek_from(&mut buf).await?.0; assert_eq!(&buf[..n], LOREM_IPSUM); let n = socket1.recv_from(&mut buf).await?.0; assert_eq!(&buf[..n], LOREM_IPSUM); Ok(()) }) } // Test that we correctly re-register interests after we've previously been // interested in both readable and writable events and then we get only one of // those (we need to re-register interest on the other). #[test] fn tcp_duplex() -> io::Result<()> { future::block_on(async { let listener = Async::::bind(([127, 0, 0, 1], 0))?; let stream1 = Arc::new(Async::::connect(listener.get_ref().local_addr()?).await?); let stream2 = Arc::new(listener.accept().await?.0); async fn do_read(s: Arc>) -> io::Result<()> { let mut buf = vec![0u8; 4096]; loop { let len = (&*s).read(&mut buf).await?; if len == 0 { return Ok(()); } } } async fn do_write(s: Arc>) -> io::Result<()> { let buf = vec![0u8; 4096]; for _ in 0..4096 { (&*s).write_all(&buf).await?; } s.get_ref().shutdown(Shutdown::Write)?; Ok(()) } // Read from and write to stream1. let r1 = spawn(do_read(stream1.clone())); let w1 = spawn(do_write(stream1)); // Sleep a bit, so that reading and writing are both blocked. Timer::after(Duration::from_millis(5)).await; // Start reading stream2, make stream1 writable. let r2 = spawn(do_read(stream2.clone())); // Finish writing to stream1. w1.await?; r2.await?; // Start writing to stream2, make stream1 readable. let w2 = spawn(do_write(stream2)); // Will r1 be correctly woken? r1.await?; w2.await?; Ok(()) }) } #[test] fn shutdown() -> io::Result<()> { future::block_on(async { let listener = Async::::bind(([127, 0, 0, 1], 0))?; let addr = listener.get_ref().local_addr()?; let ((mut reader, _), writer) = future::try_zip(listener.accept(), Async::::connect(addr)).await?; // The writer must be closed in order for `read_to_end()` to finish. let mut buf = Vec::new(); future::try_zip(reader.read_to_end(&mut buf), async { writer.get_ref().shutdown(Shutdown::Write) }) .await?; Ok(()) }) } // prevent source from unregistering by trying to register it twice #[test] fn duplicate_socket_insert() -> io::Result<()> { future::block_on(async { let listener = Async::::bind(([127, 0, 0, 1], 0))?; let addr = listener.as_ref().local_addr()?; // attempt to register twice assert!(Async::new(&listener).is_err(), "fails upon second insert"); // Read and Write to confirm socket did not deregister on duplication attempt // Write to stream_w let mut stream_w = Async::::connect(addr).await?; stream_w.write(LOREM_IPSUM).await?; stream_w.get_ref().shutdown(Shutdown::Write)?; // Read from stream_r let mut stream_r = listener.accept().await?.0; let mut buffer = vec![0; LOREM_IPSUM.len()]; stream_r.read_exact(&mut buffer).await?; assert_eq!(buffer, LOREM_IPSUM); Ok(()) }) }