use core::ops::Range; use core::time::Duration; use safina_async_test::async_test; use safina_executor::Executor; use safina_sync::{oneshot, sync_channel, OneSender, Receiver, SyncSender}; use safina_timer::sleep_for; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering::{Acquire, Release}; use std::sync::mpsc::{RecvTimeoutError, SendError, TryRecvError}; use std::sync::Arc; use std::time::Instant; const MILLIS_100: Duration = Duration::from_millis(100); const MILLIS_200: Duration = Duration::from_millis(200); /// # Panics /// Panics if the time elapsed since `before` is not in `range_ms`. pub fn expect_elapsed(before: Instant, range_ms: Range) { assert!(!range_ms.is_empty(), "invalid range {:?}", range_ms); let elapsed = before.elapsed(); let duration_range = Duration::from_millis(range_ms.start)..Duration::from_millis(range_ms.end); assert!( duration_range.contains(&elapsed), "{:?} elapsed, out of range {:?}", elapsed, duration_range ); } #[test] fn debug() { let (one_sender, receiver) = oneshot::(); assert_eq!( "OneSender", format!("{:?}", one_sender) ); assert_eq!("Receiver", format!("{:?}", receiver)); let (sync_sender, receiver) = sync_channel::(1); assert_eq!( "SyncSender", format!("{:?}", sync_sender) ); assert_eq!("Receiver", format!("{:?}", receiver)); } #[async_test] async fn oneshot_already_sent() { let (sender, mut receiver) = oneshot(); sender.send(()).unwrap(); let before = Instant::now(); receiver.async_recv().await.unwrap(); expect_elapsed(before, 0..10); receiver.async_recv().await.unwrap_err(); expect_elapsed(before, 0..10); } #[async_test] async fn oneshot_already_disconnected() { let (sender, mut receiver): (OneSender<()>, _) = oneshot(); drop(sender); let before = Instant::now(); receiver.async_recv().await.unwrap_err(); expect_elapsed(before, 0..10); } #[async_test] async fn oneshot_await() { let (sender, mut receiver) = oneshot(); let before = Instant::now(); safina_executor::spawn(async move { sleep_for(MILLIS_100).await; sender.send(()).unwrap(); }); receiver.async_recv().await.unwrap(); expect_elapsed(before, 100..200); let before = Instant::now(); receiver.async_recv().await.unwrap_err(); expect_elapsed(before, 0..10); } #[test] fn oneshot_try_recv() { let (sender, receiver) = oneshot(); assert_eq!(Err(TryRecvError::Empty), receiver.try_recv()); std::thread::spawn(move || { sender.send(()).unwrap(); }); std::thread::sleep(MILLIS_100); receiver.try_recv().unwrap(); std::thread::sleep(MILLIS_100); assert_eq!(Err(TryRecvError::Disconnected), receiver.try_recv()); } #[test] fn oneshot_recv() { let (sender, receiver) = oneshot(); let before = Instant::now(); std::thread::spawn(move || { std::thread::sleep(MILLIS_100); sender.send(()).unwrap(); }); receiver.recv().unwrap(); expect_elapsed(before, 100..200); let before = Instant::now(); receiver.recv().unwrap_err(); expect_elapsed(before, 0..10); } #[test] fn oneshot_recv_timeout() { let (sender, receiver) = oneshot(); let before = Instant::now(); std::thread::spawn(move || { std::thread::sleep(MILLIS_200); sender.send(()).unwrap(); }); assert_eq!( Err(RecvTimeoutError::Timeout), receiver.recv_timeout(MILLIS_100) ); expect_elapsed(before, 100..200); receiver.recv_timeout(MILLIS_200).unwrap(); let before = Instant::now(); assert_eq!( Err(RecvTimeoutError::Disconnected), receiver.recv_timeout(MILLIS_100) ); expect_elapsed(before, 0..10); } #[test] fn oneshot_iter() { let (sender, receiver) = oneshot(); let before = Instant::now(); std::thread::spawn(move || { std::thread::sleep(MILLIS_100); sender.send(()).unwrap(); }); let mut iter = receiver.iter(); assert_eq!(Some(()), iter.next()); expect_elapsed(before, 100..200); let before = Instant::now(); assert_eq!(None, iter.next()); expect_elapsed(before, 0..10); } #[test] fn oneshot_into_iterator() { let (sender, receiver) = oneshot(); let before = Instant::now(); std::thread::spawn(move || { std::thread::sleep(MILLIS_100); sender.send(()).unwrap(); }); let mut iter = receiver.into_iter(); assert_eq!(Some(()), iter.next()); expect_elapsed(before, 100..200); let before = Instant::now(); assert_eq!(None, iter.next()); expect_elapsed(before, 0..10); } #[test] fn oneshot_try_iter() { let (sender, receiver) = oneshot(); assert_eq!(None, receiver.try_iter().next()); std::thread::spawn(move || { sender.send(()).unwrap(); }); std::thread::sleep(MILLIS_100); let before = Instant::now(); let mut iter = receiver.try_iter(); assert_eq!(Some(()), iter.next()); assert_eq!(None, iter.next()); assert_eq!(None, receiver.try_iter().next()); expect_elapsed(before, 0..10); std::thread::sleep(MILLIS_100); let before = Instant::now(); assert_eq!(None, receiver.try_iter().next()); expect_elapsed(before, 0..10); } #[test] #[should_panic] fn sync_channel_zero() { let _unused = sync_channel::<()>(0); } #[async_test] async fn sync_channel_one() { let (sender, receiver) = sync_channel(2); let before = Instant::now(); assert_eq!(Err(TryRecvError::Empty), receiver.try_recv()); sender.async_send(()).await.unwrap(); receiver.try_recv().unwrap(); assert_eq!(Err(TryRecvError::Empty), receiver.try_recv()); expect_elapsed(before, 0..10); } #[async_test] async fn sync_channel_full() { let (sender, receiver) = sync_channel(2); let before = Instant::now(); sender.async_send(()).await.unwrap(); sender.async_send(()).await.unwrap(); receiver.try_recv().unwrap(); receiver.try_recv().unwrap(); assert_eq!(Err(TryRecvError::Empty), receiver.try_recv()); expect_elapsed(before, 0..10); } #[async_test] async fn sync_channel_async_send() { let (sender, receiver) = sync_channel(2); let before = Instant::now(); sender.async_send(()).await.unwrap(); sender.async_send(()).await.unwrap(); expect_elapsed(before, 0..10); std::thread::spawn(move || { std::thread::sleep(MILLIS_100); receiver.try_recv().unwrap(); std::thread::sleep(MILLIS_100); drop(receiver); }); sender.async_send(()).await.unwrap(); expect_elapsed(before, 100..200); assert_eq!(Err(SendError(())), sender.async_send(()).await); expect_elapsed(before, 200..300); } #[test] #[should_panic] fn sync_channel_async_send_repoll_panics() { safina_executor::block_on(async { let (sender, _receiver) = sync_channel(1); let mut fut = Box::pin(sender.async_send(())); (&mut fut).await.unwrap(); (&mut fut).await.unwrap(); }); } #[test] fn sync_channel_async_send_does_not_block() { let (sender, _receiver) = sync_channel(1); sender.send(()).unwrap(); let executor = safina_executor::Executor::new(1, 1).unwrap(); executor.spawn(async move { sender.async_send(()).await.unwrap(); }); std::thread::sleep(MILLIS_100); let task_ran = Arc::new(AtomicBool::new(false)); let task_ran_clone = task_ran.clone(); executor.spawn(async move { task_ran_clone.store(true, Release) }); std::thread::sleep(MILLIS_100); assert!(task_ran.load(Acquire)); } #[async_test] async fn sync_channel_async_sends_wait() { let (sender, receiver) = sync_channel(2); let before = Instant::now(); sender.async_send(()).await.unwrap(); sender.async_send(()).await.unwrap(); expect_elapsed(before, 0..10); for _ in 0..3 { let sender_clone = sender.clone(); safina_executor::spawn(async { sender_clone.async_send(()).await.unwrap(); sleep_for(MILLIS_200).await; drop(sender_clone); }); } for _ in 0..3 { let sender_clone = sender.clone(); std::thread::spawn(move || { sender_clone.send(()).unwrap(); std::thread::sleep(MILLIS_200); drop(sender_clone); }); } drop(sender); std::thread::sleep(MILLIS_100); receiver.try_recv().unwrap(); std::thread::sleep(MILLIS_100); receiver.try_recv().unwrap(); std::thread::sleep(MILLIS_100); receiver.try_recv().unwrap(); std::thread::sleep(MILLIS_100); receiver.try_recv().unwrap(); std::thread::sleep(MILLIS_100); receiver.try_recv().unwrap(); std::thread::sleep(MILLIS_100); receiver.try_recv().unwrap(); std::thread::sleep(MILLIS_100); receiver.try_recv().unwrap(); receiver.try_recv().unwrap(); assert_eq!(Err(TryRecvError::Empty), receiver.try_recv()); expect_elapsed(before, 700..800); std::thread::sleep(MILLIS_200); assert_eq!(Err(TryRecvError::Disconnected), receiver.try_recv()); expect_elapsed(before, 900..1000); } #[async_test] async fn sync_channel_receive_wakes_sender() { for f in [ |r: &Receiver<()>| r.try_recv().unwrap(), |r: &Receiver<()>| r.recv().unwrap(), |r: &Receiver<()>| r.recv_timeout(MILLIS_100).unwrap(), |r: &Receiver<()>| r.iter().next().unwrap(), |r: &Receiver<()>| r.try_iter().next().unwrap(), ] { let (sender, receiver) = sync_channel(1); let before = Instant::now(); sender.async_send(()).await.unwrap(); std::thread::spawn(move || { std::thread::sleep(MILLIS_100); f(&receiver); std::thread::sleep(MILLIS_100); drop(receiver); }); sender.async_send(()).await.unwrap(); expect_elapsed(before, 100..200); } } #[async_test] async fn sync_channel_async_recv_wakes_sender() { let (sender, mut receiver) = sync_channel(1); let before = Instant::now(); sender.async_send(()).await.unwrap(); expect_elapsed(before, 0..10); safina_executor::spawn(async move { sleep_for(MILLIS_100).await; receiver.async_recv().await.unwrap(); sleep_for(MILLIS_100).await; drop(receiver); }); sender.async_send(()).await.unwrap(); expect_elapsed(before, 100..200); } #[async_test] async fn sync_channel_await_wakes_sender() { let (sender, mut receiver) = sync_channel(1); let before = Instant::now(); sender.async_send(()).await.unwrap(); expect_elapsed(before, 0..10); safina_executor::spawn(async move { sleep_for(MILLIS_100).await; (&mut receiver).await.unwrap(); sleep_for(MILLIS_100).await; drop(receiver); }); sender.async_send(()).await.unwrap(); expect_elapsed(before, 100..200); } #[async_test] async fn sync_channel_async_send_wakes_async_recv() { let (sender, mut receiver) = sync_channel(1); let before = Instant::now(); safina_executor::spawn(async move { sleep_for(MILLIS_100).await; sender.async_send(()).await.unwrap(); sleep_for(MILLIS_100).await; drop(sender); }); receiver.async_recv().await.unwrap(); expect_elapsed(before, 100..200); receiver.async_recv().await.unwrap_err(); expect_elapsed(before, 200..300); let before = Instant::now(); receiver.async_recv().await.unwrap_err(); expect_elapsed(before, 0..10); } #[async_test] async fn sync_channel_send_wakes_async_recv() { let (sender, mut receiver) = sync_channel(1); let before = Instant::now(); std::thread::spawn(move || { std::thread::sleep(MILLIS_100); sender.send(()).unwrap(); std::thread::sleep(MILLIS_100); drop(sender); }); receiver.async_recv().await.unwrap(); expect_elapsed(before, 100..200); receiver.async_recv().await.unwrap_err(); expect_elapsed(before, 200..300); let before = Instant::now(); receiver.async_recv().await.unwrap_err(); expect_elapsed(before, 0..10); } #[async_test] async fn sync_channel_try_send_wakes_async_recv() { let (sender, mut receiver) = sync_channel(1); let before = Instant::now(); std::thread::spawn(move || { std::thread::sleep(MILLIS_100); sender.try_send(()).unwrap(); std::thread::sleep(MILLIS_100); drop(sender); }); receiver.async_recv().await.unwrap(); expect_elapsed(before, 100..200); receiver.async_recv().await.unwrap_err(); expect_elapsed(before, 200..300); let before = Instant::now(); receiver.async_recv().await.unwrap_err(); expect_elapsed(before, 0..10); } #[async_test] async fn sync_channel_try_recv() { let (sender, receiver) = sync_channel(1); let before = Instant::now(); assert_eq!(Err(TryRecvError::Empty), receiver.try_recv()); sender.async_send(()).await.unwrap(); receiver.try_recv().unwrap(); assert_eq!(Err(TryRecvError::Empty), receiver.try_recv()); expect_elapsed(before, 0..10); } fn spawn_sender_task(sender: SyncSender<()>) -> Arc { let executor = safina_executor::Executor::new(1, 1).unwrap(); executor.spawn(async { sleep_for(MILLIS_100).await; sender.async_send(()).await.unwrap(); sleep_for(MILLIS_100).await; drop(sender); }); executor } #[test] fn sync_channel_recv() { let (sender, receiver) = sync_channel(1); let before = Instant::now(); sender.send(()).unwrap(); receiver.recv().unwrap(); expect_elapsed(before, 0..10); let _executor = spawn_sender_task(sender); receiver.recv().unwrap(); expect_elapsed(before, 100..200); receiver.recv().unwrap_err(); expect_elapsed(before, 200..300); let before = Instant::now(); receiver.recv().unwrap_err(); expect_elapsed(before, 0..10); } #[test] fn sync_channel_recv_timeout() { let (sender, receiver) = sync_channel(1); // Item already available. let before = Instant::now(); sender.send(()).unwrap(); receiver.recv().unwrap(); expect_elapsed(before, 0..10); // Timeout let before = Instant::now(); assert_eq!( Err(RecvTimeoutError::Timeout), receiver.recv_timeout(MILLIS_100) ); expect_elapsed(before, 100..200); // Waits let before = Instant::now(); let _executor = spawn_sender_task(sender); receiver.recv_timeout(MILLIS_200).unwrap(); expect_elapsed(before, 100..200); assert_eq!( Err(RecvTimeoutError::Disconnected), receiver.recv_timeout(MILLIS_200) ); expect_elapsed(before, 200..300); let before = Instant::now(); assert_eq!( Err(RecvTimeoutError::Disconnected), receiver.recv_timeout(MILLIS_200) ); expect_elapsed(before, 0..10); } #[test] fn sync_channel_iter() { let (sender, receiver) = sync_channel(1); let before = Instant::now(); sender.send(()).unwrap(); receiver.iter().next().unwrap(); expect_elapsed(before, 0..10); let _executor = spawn_sender_task(sender); receiver.iter().next().unwrap(); expect_elapsed(before, 100..200); assert_eq!(None, receiver.iter().next()); expect_elapsed(before, 200..300); let before = Instant::now(); assert_eq!(None, receiver.iter().next()); expect_elapsed(before, 0..10); } #[test] fn sync_channel_into_iterator() { let (sender, receiver) = sync_channel(1); let before = Instant::now(); sender.send(()).unwrap(); let mut iter = receiver.into_iter(); iter.next().unwrap(); expect_elapsed(before, 0..10); let _executor = spawn_sender_task(sender); iter.next().unwrap(); expect_elapsed(before, 100..200); assert_eq!(None, iter.next()); expect_elapsed(before, 200..300); let before = Instant::now(); assert_eq!(None, iter.next()); expect_elapsed(before, 0..10); } #[async_test] async fn sync_channel_try_iter() { let (sender, receiver) = sync_channel(1); let before = Instant::now(); assert_eq!(None, receiver.try_iter().next()); sender.async_send(()).await.unwrap(); receiver.try_iter().next().unwrap(); assert_eq!(None, receiver.try_iter().next()); expect_elapsed(before, 0..10); }