use core::fmt::Debug; use core::ops::Range; use core::time::Duration; use safina::executor::Executor; use std::future::Future; use std::pin::Pin; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::task::{Context, Poll}; use std::time::Instant; fn assert_in_range(range: Range, value: &T) { assert!(!range.is_empty(), "invalid range {:?}", range); // println!( // "measured concurrency value {:?}, expected range {:?}", // value, range, // ); assert!( range.contains(value), "measured concurrency value {:?} out of range {:?}", value, range, ); } /// # Panics /// Panics if the elapsed time since `before` is outside of `range_ms`. pub fn assert_elapsed(before: Instant, range_ms: Range) { assert!(!range_ms.is_empty(), "invalid range {:?}", range_ms); let elapsed = before.elapsed(); let duration_range = Duration::from_millis(range_ms.start)..Duration::from_millis(range_ms.end); assert!( duration_range.contains(&elapsed), "{:?} elapsed, out of range {:?}", elapsed, duration_range ); } fn measure_async_concurrency(executor: &Arc, num_jobs: usize) -> f32 { const WAIT_DURATION: Duration = Duration::from_millis(100); let before = Instant::now(); let receiver = { let (sender, receiver) = std::sync::mpsc::channel(); for _ in 0..num_jobs { let sender_clone = sender.clone(); executor.spawn(async move { std::thread::sleep(WAIT_DURATION); sender_clone.send(()).unwrap(); }); } receiver }; for _ in 0..num_jobs { receiver.recv_timeout(Duration::from_millis(500)).unwrap(); } let elapsed = before.elapsed(); elapsed.as_secs_f32() / WAIT_DURATION.as_secs_f32() } fn expect_async_concurrency(executor: &Arc, num_tasks: usize) { assert_in_range( 1.0_f32..1.90, &measure_async_concurrency(executor, num_tasks), ); assert_in_range( 2.0_f32..2.90, &measure_async_concurrency(executor, num_tasks + 1), ); } async fn measure_blocking_concurrency(executor: &Arc, num_jobs: usize) -> f32 { const WAIT_DURATION: Duration = Duration::from_millis(100); let before = Instant::now(); let mut receivers = Vec::new(); for _ in 0..num_jobs { receivers.push(executor.schedule_blocking(|| std::thread::sleep(WAIT_DURATION))); } let deadline = Instant::now() + WAIT_DURATION * 3; for mut receiver in receivers { safina::timer::with_deadline(async move { receiver.async_recv().await }, deadline) .await .unwrap() .unwrap(); } let elapsed = before.elapsed(); elapsed.as_secs_f32() / WAIT_DURATION.as_secs_f32() } async fn expect_blocking_concurrency(executor: &Arc, num_tasks: usize) { assert_in_range( 1.0_f32..1.90, &measure_blocking_concurrency(executor, num_tasks).await, ); assert_in_range( 2.0_f32..2.90, &measure_blocking_concurrency(executor, num_tasks + 1).await, ); } trait UnwindAnyToString { fn any_to_string(&self) -> String; } impl UnwindAnyToString for Box { fn any_to_string(&self) -> String { if let Some(s) = self.downcast_ref::<&'static str>() { (*s).to_string() } else if let Some(s) = self.downcast_ref::() { s.clone() } else { format!("{self:?}") } } } /// # Panics /// Panics if the time elapsed since `before` is not in `range_ms`. pub fn expect_elapsed(before: Instant, range_ms: Range) { assert!(!range_ms.is_empty(), "invalid range {:?}", range_ms); let elapsed = before.elapsed(); let duration_range = Duration::from_millis(range_ms.start)..Duration::from_millis(range_ms.end); assert!( duration_range.contains(&elapsed), "{:?} elapsed, out of range {:?}", elapsed, duration_range ); } fn get_async_thread_name(executor: &Arc) -> String { let (sender, receiver) = std::sync::mpsc::channel(); executor.spawn(async move { sender .send(std::thread::current().name().unwrap().to_string()) .unwrap(); }); receiver.recv_timeout(Duration::from_millis(500)).unwrap() } async fn get_blocking_thread_name(executor: &Arc) -> String { executor .schedule_blocking(|| std::thread::current().name().unwrap().to_string()) .async_recv() .await .unwrap() } #[test] fn test_default_trait() { let executor: Arc = Arc::default(); expect_async_concurrency(&executor, 4); assert_eq!( "async", get_async_thread_name(&executor) .strip_suffix(|c| "0123".contains(c)) .unwrap() ); let executor_clone = executor.clone(); executor.block_on(async move { expect_blocking_concurrency(&executor_clone, 4).await; assert_eq!( "blocking", get_blocking_thread_name(&executor_clone) .await .strip_suffix(|c| "0123".contains(c)) .unwrap() ); }); } #[test] fn test_new() { let executor = Executor::new(2, 3).unwrap(); expect_async_concurrency(&executor, 2); assert_eq!( "async", get_async_thread_name(&executor) .strip_suffix(|c| "01".contains(c)) .unwrap() ); let executor_clone = executor.clone(); executor.block_on(async move { expect_blocking_concurrency(&executor_clone, 3).await; assert_eq!( "blocking", get_blocking_thread_name(&executor_clone) .await .strip_suffix(|c| "0123".contains(c)) .unwrap() ); }); } #[test] fn test_build() { let executor = Executor::builder() .with_async_threads(2) .with_thread_name_prefix("prefix1_") .with_blocking_threads(3) .build() .unwrap(); expect_async_concurrency(&executor, 2); assert_eq!( "prefix1_async", get_async_thread_name(&executor) .strip_suffix(|c| "01".contains(c)) .unwrap() ); let executor_clone = executor.clone(); executor.block_on(async move { expect_blocking_concurrency(&executor_clone, 3).await; assert_eq!( "prefix1_blocking", get_blocking_thread_name(&executor_clone) .await .strip_suffix(|c| "0123".contains(c)) .unwrap() ); }); } #[test] fn test_spawn_unpin() { let executor = Executor::new(1, 1).unwrap(); let (sender, receiver) = std::sync::mpsc::channel(); executor.spawn(async move { sender.send(()).unwrap(); }); receiver.recv().unwrap(); } #[test] fn test_spawn() { let executor = Executor::new(1, 1).unwrap(); let (sender, receiver) = std::sync::mpsc::channel(); executor.spawn_unpin(Box::pin(async move { sender.send(()).unwrap(); })); receiver.recv().unwrap(); } #[test] fn test_waking() { safina::timer::start_timer_thread(); let executor = Executor::new(1, 1).unwrap(); let (sender, receiver) = std::sync::mpsc::channel(); let before = Instant::now(); executor.spawn(async move { safina::timer::sleep_for(Duration::from_millis(100)).await; sender.send(()).unwrap(); }); receiver.recv().unwrap(); expect_elapsed(before, 100..200); } #[test] #[allow(clippy::type_complexity)] fn test_wake_after_ready() { struct FnFuture(Box) -> Poll<()>) + Send>); impl Future for FnFuture { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { self.as_mut().0(cx) } } safina::timer::start_timer_thread(); let executor = Executor::new(1, 1).unwrap(); let (sender, receiver) = std::sync::mpsc::channel(); let state = Arc::new(AtomicBool::new(true)); let fut = FnFuture(Box::new(move |cx| { sender.send(cx.waker().clone()).unwrap(); if state.swap(false, Ordering::AcqRel) { Poll::Ready(()) } else { panic!("resumed after completion"); } })); executor.spawn(fut); receiver.recv().unwrap().wake(); receiver.recv().unwrap_err(); } #[test] fn test_async_panic() { // Start a single thread. let executor = Executor::new(1, 1).unwrap(); // Run a future that panics. executor.spawn(async { panic!("ignore this panic") }); std::thread::sleep(Duration::from_millis(100)); // Check that futures still run. let (sender, receiver) = std::sync::mpsc::channel(); executor.spawn(async move { sender.send(()).unwrap(); }); receiver.recv().unwrap(); } #[test] fn test_executor_block_on() { safina::timer::start_timer_thread(); let executor = Executor::new(1, 1).unwrap(); assert_eq!(3_u8, executor.block_on(async { 3_u8 })); let value = 5_u8; assert_eq!(5_u8, executor.block_on(async move { value })); let before = Instant::now(); executor.block_on(async { safina::timer::sleep_for(Duration::from_millis(100)).await; }); expect_elapsed(before, 100..200); assert_eq!( "ignore this panic", std::panic::catch_unwind(|| executor.block_on(async { panic!("ignore this panic") })) .unwrap_err() .any_to_string() .as_str() ); executor.block_on(async { let (sender, receiver) = std::sync::mpsc::channel(); safina::executor::spawn(async move { sender.send(()).unwrap(); }); receiver.recv().unwrap(); }); } #[test] fn test_block_on() { safina::timer::start_timer_thread(); assert_eq!(3_u8, safina::executor::block_on(async { 3_u8 })); let value = 5_u8; assert_eq!(5_u8, safina::executor::block_on(async move { value })); let before = Instant::now(); safina::executor::block_on(async { safina::timer::sleep_for(Duration::from_millis(100)).await; }); expect_elapsed(before, 100..200); assert_eq!( "ignore this panic", std::panic::catch_unwind(|| safina::executor::block_on(async { panic!("ignore this panic"); })) .unwrap_err() .any_to_string() .as_str() ); } #[test] fn test_block_on_unpin() { assert_eq!( 3_u8, safina::executor::block_on_unpin(Box::pin(async { 3_u8 })) ); } #[test] fn test_schedule_blocking_simple() { let executor = Executor::new(1, 1).unwrap(); executor.block_on(async { assert_eq!( 3_u8, safina::executor::schedule_blocking(|| 3_u8) .async_recv() .await .unwrap() ); }); } #[test] fn test_executor_schedule_blocking_simple() { let executor = Executor::new(1, 1).unwrap(); let executor_clone = executor.clone(); executor.block_on(async move { assert_eq!( 3_u8, executor_clone .schedule_blocking(|| 3_u8) .async_recv() .await .unwrap() ); }); } #[test] fn test_schedule_blocking_move() { let executor = Executor::new(1, 1).unwrap(); executor.block_on(async { let value = 5_u8; assert_eq!( 5_u8, safina::executor::schedule_blocking(move || value) .async_recv() .await .unwrap() ); }); } #[test] fn test_executor_schedule_blocking_move() { let executor = Executor::new(1, 1).unwrap(); let executor_clone = executor.clone(); executor.block_on(async move { let value = 5_u8; assert_eq!( 5_u8, executor_clone .schedule_blocking(move || value) .async_recv() .await .unwrap() ); }); } #[test] fn test_schedule_blocking_sleep() { let executor = Executor::new(1, 1).unwrap(); executor.block_on(async { let before = Instant::now(); assert_eq!( 7_u8, safina::executor::schedule_blocking(|| { std::thread::sleep(Duration::from_millis(100)); 7_u8 }) .async_recv() .await .unwrap() ); expect_elapsed(before, 100..200); }); } #[test] fn test_executor_schedule_blocking_sleep() { let executor = Executor::new(1, 1).unwrap(); let executor_clone = executor.clone(); executor.block_on(async move { let before = Instant::now(); assert_eq!( 7_u8, executor_clone .schedule_blocking(|| { std::thread::sleep(Duration::from_millis(100)); 7_u8 }) .async_recv() .await .unwrap() ); expect_elapsed(before, 100..200); }); } #[test] fn test_schedule_blocking_panic() { let executor = Executor::new(1, 1).unwrap(); executor.block_on(async { safina::executor::schedule_blocking(|| panic!("ignore this panic")) .async_recv() .await .unwrap_err(); }); } #[test] fn test_executor_schedule_blocking_panic() { let executor = Executor::new(1, 1).unwrap(); let executor_clone = executor.clone(); executor.block_on(async move { executor_clone .schedule_blocking(|| panic!("ignore this panic")) .async_recv() .await .unwrap_err(); }); } #[test] fn test_schedule_blocking_recv() { safina::timer::start_timer_thread(); let executor = Executor::new(1, 1).unwrap(); executor.block_on(async { let before = Instant::now(); let (sender, receiver) = std::sync::mpsc::channel(); let mut result_receiver = safina::executor::schedule_blocking(move || receiver.recv().unwrap()); safina::timer::sleep_for(Duration::from_millis(100)).await; sender.send(9_u8).unwrap(); assert_eq!(9_u8, result_receiver.async_recv().await.unwrap()); expect_elapsed(before, 100..200); }); } #[test] fn test_executor_schedule_blocking_recv() { safina::timer::start_timer_thread(); let executor = Executor::new(1, 1).unwrap(); let executor_clone = executor.clone(); executor.block_on(async move { let before = Instant::now(); let (sender, receiver) = std::sync::mpsc::channel(); let mut result_receiver = executor_clone.schedule_blocking(move || receiver.recv().unwrap()); safina::timer::sleep_for(Duration::from_millis(100)).await; sender.send(9_u8).unwrap(); assert_eq!(9_u8, result_receiver.async_recv().await.unwrap()); expect_elapsed(before, 100..200); }); } #[test] fn no_executor_running() { assert_eq!( "called from outside a task; check for duplicate safina-executor crate: cargo tree -d", std::panic::catch_unwind(|| safina::executor::block_on( safina::executor::schedule_blocking(|| 3_u8) )) .unwrap_err() .any_to_string() .as_str() ); assert_eq!( "called from outside a task; check for duplicate safina-executor crate: cargo tree -d", std::panic::catch_unwind(|| safina::executor::block_on(async { safina::executor::spawn(async {}); })) .unwrap_err() .any_to_string() .as_str() ); }