mod test_util; use crate::test_util::{expect_elapsed, FakeWaker}; use core::convert::TryFrom; use core::future::Future; use core::sync::atomic::{AtomicBool, Ordering}; use core::task::{Context, Poll}; use core::time::Duration; use rusty_fork::rusty_fork_test; use safina::executor::Executor; use safina::timer::{sleep_for, sleep_until, start_timer_thread}; use std::sync::Arc; use std::time::Instant; fn timer_thread_not_started_inner() { assert!(std::panic::catch_unwind(|| { safina::executor::block_on(async move { sleep_for(Duration::from_millis(10)).await }); }) .unwrap_err() .downcast::() .unwrap() .contains("TimerThreadNotStarted")); assert!(std::panic::catch_unwind(|| { safina::executor::block_on(async move { sleep_until(Instant::now() + Duration::from_millis(10)).await; }); }) .unwrap_err() .downcast::() .unwrap() .contains("TimerThreadNotStarted")); for _ in 0..2 { start_timer_thread(); safina::executor::block_on(async move { sleep_for(Duration::from_millis(10)).await; }); safina::executor::block_on(async move { sleep_until(Instant::now() + Duration::from_millis(10)).await; }); } } rusty_fork_test! { #[test] fn timer_thread_not_started() { timer_thread_not_started_inner(); } } #[test] fn test_sleep_for() { start_timer_thread(); let before = Instant::now(); safina::executor::block_on(async move { sleep_for(Duration::from_millis(100)).await; }); expect_elapsed(before, 100..200); } #[test] fn test_sleep_for_zero() { start_timer_thread(); let before = Instant::now(); safina::executor::block_on(async move { sleep_for(Duration::from_secs(0)).await; }); expect_elapsed(before, 0..90); } #[test] fn test_sleep_until() { start_timer_thread(); let before = Instant::now(); let deadline = before + Duration::from_millis(100); safina::executor::block_on(async move { sleep_until(deadline).await; }); expect_elapsed(before, 100..200); } #[test] fn test_sleep_until_past() { start_timer_thread(); let before = Instant::now(); let deadline = before.checked_sub(Duration::from_millis(100)).unwrap(); safina::executor::block_on(async move { sleep_until(deadline).await; }); expect_elapsed(before, 0..90); } #[test] fn test_multi_sleep() { let executor = Executor::new(1, 1).unwrap(); start_timer_thread(); let before = Instant::now(); let mut expected_durations_ms = [200_u64, 100, 0, 400, 500, 300]; let receiver = { let (sender, receiver) = std::sync::mpsc::channel::(); for duration_ms in &expected_durations_ms { let before_clone = before; let sender_clone = sender.clone(); let duration_ms_copy = *duration_ms; executor.spawn(async move { // println!("{} sleeping until {} ms", n, duration_ms); let deadline = before_clone + Duration::from_millis(duration_ms_copy); sleep_until(deadline).await; let elapsed = before_clone.elapsed(); let elapsed_u64 = u64::try_from(elapsed.as_millis()).unwrap(); // println!("{} finished sleeping, sending {:?}, {} ms", n, elapsed, elapsed_u64); sender_clone.send(elapsed_u64).unwrap(); // println!("{} done", n); }); } receiver }; let mut actual_durations_ms: Vec = Vec::new(); for duration_ms in receiver { actual_durations_ms.push(duration_ms); // println!("received duration {:?} ms", actual_durations_ms); } actual_durations_ms.sort_unstable(); // println!("actual durations {:?} ms", actual_durations_ms); expected_durations_ms.sort_unstable(); // println!("expected durations {:?} ms", expected_duration_ms); for n in 0..expected_durations_ms.len() { let actual = actual_durations_ms[n]; let expected = expected_durations_ms[n]; let range = expected..(expected + 90); // println!("{:?} ms actual, expected range {:?}", actual, range); assert!( range.contains(&actual), "{:?} ms actual, out of range {:?}", actual, range ); } } #[test] fn should_use_most_recent_waker_passed_to_poll() { // "Note that on multiple calls to poll, only the Waker from the Context // passed to the most recent call should be scheduled to receive a wakeup." // https://doc.rust-lang.org/stable/std/future/trait.Future.html#tymethod.poll start_timer_thread(); let deadline = Instant::now() + Duration::from_millis(100); let mut fut = Box::pin(async move { sleep_until(deadline).await; }); let waker1_called = Arc::new(AtomicBool::new(false)); { let waker1 = FakeWaker::new(&waker1_called).into_waker(); let mut cx = Context::from_waker(&waker1); assert_eq!(Poll::Pending, fut.as_mut().poll(&mut cx)); } let waker2_called = Arc::new(AtomicBool::new(false)); { let waker2 = FakeWaker::new(&waker2_called).into_waker(); let mut cx = Context::from_waker(&waker2); assert_eq!(Poll::Pending, fut.as_mut().poll(&mut cx)); } std::thread::sleep(Duration::from_millis(200)); { let waker3_called = Arc::new(AtomicBool::new(true /* should never get called */)); let waker3 = FakeWaker::new(&waker3_called).into_waker(); let mut cx = Context::from_waker(&waker3); assert_eq!(Poll::Ready(()), fut.as_mut().poll(&mut cx)); } assert!(!waker1_called.load(Ordering::Acquire)); assert!(waker2_called.load(Ordering::Acquire)); }