use core::ops::Range; use core::time::Duration; use safina_async_test::async_test; use safina_net::TcpStream; use safina_sync::{oneshot, OneSender}; use std::io::{ErrorKind, IoSlice, IoSliceMut}; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::time::Instant; fn any_port() -> std::net::SocketAddr { SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0) } fn expect_elapsed(before: Instant, range_ms: Range) { assert!(!range_ms.is_empty(), "invalid range {:?}", range_ms); let elapsed = before.elapsed(); let duration_range = Duration::from_millis(range_ms.start)..Duration::from_millis(range_ms.end); assert!( duration_range.contains(&elapsed), "{:?} elapsed, out of range {:?}", elapsed, duration_range ); } enum Action { Read(OneSender>), Write(&'static str), SleepMillis(u64), ShutdownWrite, ShutdownRead, } fn simple_server(actions: Vec) -> SocketAddr { let listener = std::net::TcpListener::bind(&any_port()).unwrap(); let addr = listener.local_addr().unwrap(); std::thread::spawn(move || { let (mut tcp_stream, _remote_addr) = listener.accept().unwrap(); for action in actions { match action { Action::Read(sender) => { let mut buf = [0_u8; 1024]; let num_read = std::io::Read::read(&mut tcp_stream, &mut buf).unwrap(); let mut data = Vec::new(); data.extend_from_slice(&buf[..num_read]); let _result = sender.send(data.into_boxed_slice()); } Action::Write(data) => { std::io::Write::write_all(&mut tcp_stream, data.as_bytes()).unwrap(); } Action::SleepMillis(millis) => std::thread::sleep(Duration::from_millis(millis)), Action::ShutdownWrite => tcp_stream.shutdown(std::net::Shutdown::Write).unwrap(), Action::ShutdownRead => tcp_stream.shutdown(std::net::Shutdown::Read).unwrap(), } } }); addr } async fn socket_in_error() -> TcpStream { let addr = simple_server(vec![Action::SleepMillis(50)]); let mut tcp_stream = safina_timer::with_timeout(TcpStream::connect(addr), Duration::from_millis(500)) .await .unwrap() .unwrap(); assert_eq!(3, tcp_stream.write(b"abc").await.unwrap()); safina_timer::sleep_for(Duration::from_millis(100)).await; tcp_stream } fn filled_vec(b: u8, len: usize) -> Vec { let mut buf = Vec::with_capacity(len); for _ in 0..(len >> 10) { buf.extend_from_slice(&[b'.'; 1024]); } buf.extend(core::iter::repeat(b).take(len & 1023)); buf } #[async_test] async fn inner_mut() { let addr = simple_server(vec![Action::SleepMillis(500)]); let mut tcp_stream = safina_timer::with_timeout(TcpStream::connect(addr), Duration::from_millis(500)) .await .unwrap() .unwrap(); let inner: &mut std::net::TcpStream = tcp_stream.inner_mut(); let _result = inner.shutdown(std::net::Shutdown::Write); } #[async_test] async fn into_inner() { let addr = simple_server(vec![Action::SleepMillis(500)]); let tcp_stream = safina_timer::with_timeout(TcpStream::connect(addr), Duration::from_millis(500)) .await .unwrap() .unwrap(); let inner: std::net::TcpStream = tcp_stream.into_inner(); let _result = inner.shutdown(std::net::Shutdown::Write); } #[async_test] async fn read() { let start = Instant::now(); let addr = simple_server(vec![ Action::Write("abcde"), Action::SleepMillis(100), Action::ShutdownRead, Action::Write("f"), Action::SleepMillis(100), Action::ShutdownWrite, Action::SleepMillis(100), ]); let mut tcp_stream = safina_timer::with_timeout(TcpStream::connect(addr), Duration::from_millis(500)) .await .unwrap() .unwrap(); let mut data = [b'.'; 3]; assert_eq!(3, tcp_stream.read(&mut data).await.unwrap()); assert_eq!("abc", std::str::from_utf8(&data).unwrap()); expect_elapsed(start, 0..100); data = [b'.'; 3]; assert_eq!(2, tcp_stream.read(&mut data).await.unwrap()); assert_eq!("de.", std::str::from_utf8(&data).unwrap()); expect_elapsed(start, 0..100); data = [b'.'; 3]; assert_eq!(1, tcp_stream.read(&mut data).await.unwrap()); assert_eq!("f..", std::str::from_utf8(&data).unwrap()); expect_elapsed(start, 100..200); data = [b'.'; 3]; assert_eq!(0, tcp_stream.read(&mut data).await.unwrap()); assert_eq!("...", std::str::from_utf8(&data).unwrap()); expect_elapsed(start, 200..300); let before = Instant::now(); data = [b'.'; 3]; assert_eq!(0, tcp_stream.read(&mut data).await.unwrap()); assert_eq!("...", std::str::from_utf8(&data).unwrap()); expect_elapsed(before, 0..10); } #[async_test] async fn read_error() { assert_eq!( std::io::ErrorKind::ConnectionReset, socket_in_error() .await .read(&mut [b'.'; 3]) .await .unwrap_err() .kind() ); } #[async_test] async fn read_to_end() { let addr = simple_server(vec![ Action::Write("abcde"), Action::SleepMillis(100), Action::Write("f"), Action::SleepMillis(100), ]); let before = Instant::now(); let mut tcp_stream = safina_timer::with_timeout(TcpStream::connect(addr), Duration::from_millis(500)) .await .unwrap() .unwrap(); let mut data = Vec::new(); assert_eq!(6, tcp_stream.read_to_end(&mut data).await.unwrap()); assert_eq!("abcdef", std::str::from_utf8(&data).unwrap()); expect_elapsed(before, 200..300); } #[async_test] async fn read_to_end_error() { assert_eq!( std::io::ErrorKind::ConnectionReset, socket_in_error() .await .read_to_end(&mut Vec::new()) .await .unwrap_err() .kind() ); } #[async_test] async fn read_to_string() { let addr = simple_server(vec![ Action::Write("abcde"), Action::SleepMillis(100), Action::Write("f"), Action::SleepMillis(100), ]); let before = Instant::now(); let mut tcp_stream = safina_timer::with_timeout(TcpStream::connect(addr), Duration::from_millis(500)) .await .unwrap() .unwrap(); let mut data = String::new(); assert_eq!(6, tcp_stream.read_to_string(&mut data).await.unwrap()); assert_eq!("abcdef", data); expect_elapsed(before, 200..300); } #[async_test] async fn read_to_string_error() { assert_eq!( std::io::ErrorKind::ConnectionReset, socket_in_error() .await .read_to_string(&mut String::new()) .await .unwrap_err() .kind() ); } #[async_test] async fn read_exact_empty() { let addr = simple_server(vec![Action::SleepMillis(100)]); let before = Instant::now(); let mut tcp_stream = safina_timer::with_timeout(TcpStream::connect(addr), Duration::from_millis(500)) .await .unwrap() .unwrap(); let mut data = [b'.'; 3]; assert_eq!( ErrorKind::UnexpectedEof, tcp_stream.read_exact(&mut data).await.unwrap_err().kind() ); assert_eq!("...", std::str::from_utf8(&data).unwrap()); expect_elapsed(before, 100..200); } #[async_test] async fn read_exact_ok() { let addr = simple_server(vec![ Action::Write("abcde"), Action::SleepMillis(100), Action::Write("f"), ]); let before = Instant::now(); let mut tcp_stream = safina_timer::with_timeout(TcpStream::connect(addr), Duration::from_millis(500)) .await .unwrap() .unwrap(); let mut data = [b'.'; 3]; tcp_stream.read_exact(&mut data).await.unwrap(); assert_eq!("abc", std::str::from_utf8(&data).unwrap()); expect_elapsed(before, 0..100); data = [b'.'; 3]; tcp_stream.read_exact(&mut data).await.unwrap(); assert_eq!("def", std::str::from_utf8(&data).unwrap()); expect_elapsed(before, 100..200); assert_eq!( ErrorKind::UnexpectedEof, tcp_stream.read_exact(&mut data).await.unwrap_err().kind() ); } #[async_test] async fn read_exact_not_enough_bytes() { let addr = simple_server(vec![Action::Write("ab"), Action::SleepMillis(100)]); let before = Instant::now(); let mut tcp_stream = safina_timer::with_timeout(TcpStream::connect(addr), Duration::from_millis(500)) .await .unwrap() .unwrap(); let mut data = [b'.'; 3]; assert_eq!( ErrorKind::UnexpectedEof, tcp_stream.read_exact(&mut data).await.unwrap_err().kind() ); expect_elapsed(before, 100..200); } #[async_test] async fn read_exact_error() { assert_eq!( std::io::ErrorKind::ConnectionReset, socket_in_error() .await .read_exact(&mut [b'.'; 3]) .await .unwrap_err() .kind() ); } #[async_test] async fn read_vectored() { let start = Instant::now(); let addr = simple_server(vec![ Action::Write("abcde"), Action::SleepMillis(100), Action::Write("f"), Action::SleepMillis(100), Action::Write("ghi"), Action::SleepMillis(100), Action::SleepMillis(100), Action::ShutdownWrite, Action::SleepMillis(100), ]); let mut tcp_stream = safina_timer::with_timeout(TcpStream::connect(addr), Duration::from_millis(500)) .await .unwrap() .unwrap(); let mut data1 = [b'.'; 1]; let mut data2 = [b'.'; 2]; assert_eq!( 3, tcp_stream .read_vectored(&mut [IoSliceMut::new(&mut data1), IoSliceMut::new(&mut data2)]) .await .unwrap() ); assert_eq!("a", std::str::from_utf8(&data1).unwrap()); assert_eq!("bc", std::str::from_utf8(&data2).unwrap()); expect_elapsed(start, 0..100); let before = Instant::now(); data1 = [b'.'; 1]; data2 = [b'.'; 2]; assert_eq!( 2, tcp_stream .read_vectored(&mut [IoSliceMut::new(&mut data1), IoSliceMut::new(&mut data2)]) .await .unwrap() ); assert_eq!("d", std::str::from_utf8(&data1).unwrap()); assert_eq!("e.", std::str::from_utf8(&data2).unwrap()); expect_elapsed(before, 0..10); safina_timer::sleep_until(start + Duration::from_millis(300)).await; let mut data5 = [b'.'; 5]; assert_eq!( 4, tcp_stream .read_vectored(&mut [IoSliceMut::new(&mut data5)]) .await .unwrap() ); assert_eq!("fghi.", std::str::from_utf8(&data5).unwrap()); expect_elapsed(start, 300..400); assert_eq!( 0, tcp_stream .read_vectored(&mut [IoSliceMut::new(&mut [b'.'; 5])]) .await .unwrap() ); expect_elapsed(start, 400..500); } #[async_test] async fn read_vectored_error() { assert_eq!( std::io::ErrorKind::ConnectionReset, socket_in_error() .await .read_vectored(&mut [IoSliceMut::new(&mut [b'.'; 3])]) .await .unwrap_err() .kind() ); } #[async_test] async fn peek() { let addr = simple_server(vec![ Action::Write("abc"), Action::SleepMillis(200), Action::Write("d"), Action::SleepMillis(200), Action::ShutdownWrite, Action::SleepMillis(100), ]); let mut tcp_stream = safina_timer::with_timeout(TcpStream::connect(addr), Duration::from_millis(500)) .await .unwrap() .unwrap(); let before = Instant::now(); let mut data = [b'.'; 7]; assert_eq!(3, tcp_stream.peek(&mut data).await.unwrap()); assert_eq!("abc....", std::str::from_utf8(&data).unwrap()); expect_elapsed(before, 0..100); safina_timer::sleep_for(Duration::from_millis(300)).await; data = [b'.'; 7]; assert_eq!(4, tcp_stream.peek(&mut data).await.unwrap()); assert_eq!("abcd...", std::str::from_utf8(&data).unwrap()); expect_elapsed(before, 300..400); let before = Instant::now(); data = [b'.'; 7]; assert_eq!(4, tcp_stream.peek(&mut data).await.unwrap()); assert_eq!("abcd...", std::str::from_utf8(&data).unwrap()); expect_elapsed(before, 0..10); let before = Instant::now(); data = [b'.'; 7]; assert_eq!(4, tcp_stream.read(&mut data).await.unwrap()); assert_eq!("abcd...", std::str::from_utf8(&data).unwrap()); expect_elapsed(before, 0..10); let before = Instant::now(); data = [b'.'; 7]; assert_eq!(0, tcp_stream.peek(&mut data).await.unwrap()); assert_eq!(".......", std::str::from_utf8(&data).unwrap()); expect_elapsed(before, 0..200); } #[async_test] async fn peek_error() { assert_eq!( std::io::ErrorKind::ConnectionReset, socket_in_error() .await .peek(&mut [b'.'; 3]) .await .unwrap_err() .kind() ); } #[async_test] async fn write() { let start = Instant::now(); let (sender, receiver): (OneSender>, _) = oneshot(); let addr = simple_server(vec![Action::Read(sender), Action::SleepMillis(1000)]); let mut tcp_stream = safina_timer::with_timeout(TcpStream::connect(addr), Duration::from_millis(500)) .await .unwrap() .unwrap(); assert_eq!(3, tcp_stream.write(b"abc").await.unwrap()); assert_eq!( "abc", std::str::from_utf8(&receiver.await.unwrap()).unwrap() ); expect_elapsed(start, 0..100); } #[async_test] async fn write_error() { socket_in_error().await.write(b"abc").await.unwrap_err(); } #[async_test] async fn flush() { let (sender, receiver): (OneSender>, _) = oneshot(); let addr = simple_server(vec![Action::Read(sender)]); let before = Instant::now(); let mut tcp_stream = safina_timer::with_timeout(TcpStream::connect(addr), Duration::from_millis(500)) .await .unwrap() .unwrap(); assert_eq!(3, tcp_stream.write(b"abc").await.unwrap()); tcp_stream.flush().await.unwrap(); assert_eq!( "abc", std::str::from_utf8(&receiver.await.unwrap()).unwrap() ); expect_elapsed(before, 0..100); } #[async_test] async fn write_all() { let addr = simple_server(vec![]); let mut tcp_stream = safina_timer::with_timeout(TcpStream::connect(addr), Duration::from_millis(500)) .await .unwrap() .unwrap(); let before = Instant::now(); tcp_stream.write(b"abc").await.unwrap(); expect_elapsed(before, 0..100); assert!(safina_timer::with_timeout( async move { tcp_stream .write_all(&filled_vec(b'.', 10 * 1024 * 1024)) .await }, Duration::from_millis(100), ) .await .unwrap() .is_err()); } #[async_test] async fn write_all_error() { socket_in_error() .await .write_all(&filled_vec(b'.', 10 * 1024 * 1024)) .await .unwrap_err(); } #[async_test] async fn write_vectored() { let (sender, receiver): (OneSender>, _) = oneshot(); let addr = simple_server(vec![Action::Read(sender)]); let before = Instant::now(); let mut tcp_stream = safina_timer::with_timeout(TcpStream::connect(addr), Duration::from_millis(500)) .await .unwrap() .unwrap(); assert_eq!( 6, tcp_stream .write_vectored(&[IoSlice::new(b"abc"), IoSlice::new(b"def")]) .await .unwrap() ); assert_eq!( "abcdef", std::str::from_utf8(&receiver.await.unwrap()).unwrap() ); expect_elapsed(before, 0..100); } #[async_test] async fn write_vectored_error() { socket_in_error() .await .write_vectored(&[IoSlice::new(b"abc")]) .await .unwrap_err(); }