#![forbid(unsafe_code)] use core::fmt::Debug; use core::ops::Range; use core::time::Duration; use safina_threadpool::{NewThreadPoolError, StartThreadsError, ThreadPool, TryScheduleError}; use std::io::ErrorKind; use std::time::Instant; fn assert_in_range(range: Range, value: &T) { assert!(!range.is_empty(), "invalid range {:?}", range); // println!( // "measured concurrency value {:?}, expected range {:?}", // value, range, // ); assert!( range.contains(value), "measured concurrency value {:?} out of range {:?}", value, range, ); } fn assert_elapsed(before: Instant, range_ms: Range) { assert!(!range_ms.is_empty(), "invalid range {:?}", range_ms); let elapsed = before.elapsed(); let duration_range = Duration::from_millis(range_ms.start)..Duration::from_millis(range_ms.end); assert!( duration_range.contains(&elapsed), "{:?} elapsed, out of range {:?}", elapsed, duration_range ); } fn measure_concurrency(pool: &ThreadPool, num_jobs: usize) -> f32 { const WAIT_DURATION: Duration = Duration::from_millis(100); let before = Instant::now(); let receiver = { let (sender, receiver) = std::sync::mpsc::channel(); for _ in 0..num_jobs { let sender_clone = sender.clone(); pool.schedule(move || { std::thread::sleep(WAIT_DURATION); sender_clone.send(()).unwrap(); }); } receiver }; for _ in 0..num_jobs { receiver.recv_timeout(Duration::from_millis(500)).unwrap(); } let elapsed = before.elapsed(); elapsed.as_secs_f32() / WAIT_DURATION.as_secs_f32() } fn sleep(ms: u64) { std::thread::sleep(Duration::from_millis(ms)); } fn err1() -> std::io::Error { std::io::Error::new(ErrorKind::Other, "err1") } #[test] fn start_threads_error() { // Display assert_eq!( "ThreadPool workers all panicked, failed starting replacement threads: err1", format!("{}", StartThreadsError::NoThreads(err1())) ); assert_eq!( "ThreadPool failed starting threads to replace panicked threads: err1", format!( "{}", StartThreadsError::Respawn(std::io::Error::new(ErrorKind::Other, "err1")) ) ); // PartialEq assert_eq!( StartThreadsError::NoThreads(std::io::Error::new(ErrorKind::Other, "err1")), StartThreadsError::NoThreads(std::io::Error::new(ErrorKind::Other, "err1")) ); assert_eq!( StartThreadsError::Respawn(std::io::Error::new(ErrorKind::Other, "err1")), StartThreadsError::Respawn(std::io::Error::new(ErrorKind::Other, "err1")) ); assert_ne!( StartThreadsError::NoThreads(std::io::Error::new(ErrorKind::Other, "err1")), StartThreadsError::Respawn(std::io::Error::new(ErrorKind::Other, "err1")) ); } #[test] fn new_thread_pool_error() { // Display assert_eq!( "err1", format!("{}", NewThreadPoolError::Parameter("err1".to_string())) ); assert_eq!( "ThreadPool failed starting threads: err1", format!( "{}", NewThreadPoolError::Spawn(std::io::Error::new(ErrorKind::Other, "err1")) ) ); // PartialEq assert_eq!( NewThreadPoolError::Parameter("err1".to_string()), NewThreadPoolError::Parameter("err1".to_string()), ); assert_eq!( NewThreadPoolError::Spawn(std::io::Error::new(ErrorKind::Other, "err1")), NewThreadPoolError::Spawn(std::io::Error::new(ErrorKind::Other, "err1")) ); assert_ne!( NewThreadPoolError::Parameter("err1".to_string()), NewThreadPoolError::Spawn(std::io::Error::new(ErrorKind::Other, "err1")) ); // From let target: NewThreadPoolError = StartThreadsError::NoThreads(err1()).into(); assert_eq!(NewThreadPoolError::Spawn(err1()), target); let target: NewThreadPoolError = StartThreadsError::Respawn(err1()).into(); assert_eq!(NewThreadPoolError::Spawn(err1()), target); } #[test] fn try_schedule_error() { // Display assert_eq!( "ThreadPool queue is full", format!("{}", TryScheduleError::QueueFull) ); assert_eq!( "ThreadPool workers all panicked, failed starting replacement threads: err1", format!( "{}", TryScheduleError::NoThreads(std::io::Error::new(ErrorKind::Other, "err1")) ) ); assert_eq!( "ThreadPool failed starting threads to replace panicked threads: err1", format!( "{}", TryScheduleError::Respawn(std::io::Error::new(ErrorKind::Other, "err1")) ) ); // PartialEq assert_eq!(TryScheduleError::QueueFull, TryScheduleError::QueueFull,); assert_eq!( TryScheduleError::NoThreads(std::io::Error::new(ErrorKind::Other, "err1")), TryScheduleError::NoThreads(std::io::Error::new(ErrorKind::Other, "err1")) ); assert_eq!( TryScheduleError::Respawn(std::io::Error::new(ErrorKind::Other, "err1")), TryScheduleError::Respawn(std::io::Error::new(ErrorKind::Other, "err1")) ); assert_ne!( TryScheduleError::QueueFull, TryScheduleError::NoThreads(std::io::Error::new(ErrorKind::Other, "err1")) ); // From let target: TryScheduleError = StartThreadsError::NoThreads(err1()).into(); assert_eq!(TryScheduleError::NoThreads(err1()), target); let target: TryScheduleError = StartThreadsError::Respawn(err1()).into(); assert_eq!(TryScheduleError::Respawn(err1()), target); } #[test] fn empty_name() { assert!(matches!( ThreadPool::new("", 1), Err(NewThreadPoolError::Parameter(_)) )); } #[test] fn zero_size() { assert!(matches!( ThreadPool::new("pool1", 0), Err(NewThreadPoolError::Parameter(_)) )); } #[test] fn test_size() { let pool = ThreadPool::new("pool1", 3).unwrap(); assert_eq!(3, pool.size()); } #[test] fn should_name_threads_consecutively() { let pool = ThreadPool::new("poolA", 2).unwrap(); pool.schedule(move || { sleep(200); }); pool.schedule(move || panic!("ignore this panic")); sleep(100); let (sender, receiver) = std::sync::mpsc::channel(); pool.schedule(move || { sender .send(std::thread::current().name().unwrap().to_string()) .unwrap(); }); assert_eq!( "poolA2", receiver.recv_timeout(Duration::from_millis(500)).unwrap() ); } #[test] fn test_num_live_threads() { let pool = ThreadPool::new("pool1", 3).unwrap(); sleep(100); assert_eq!(3, pool.num_live_threads()); for _ in 0..3 { pool.schedule(move || { sleep(100); panic!("ignore this panic"); }); } sleep(200); assert_eq!(0, pool.num_live_threads()); pool.schedule(move || {}); assert_eq!(3, pool.num_live_threads()); } #[test] fn schedule_should_run_the_fn() { let pool = ThreadPool::new("pool1", 1).unwrap(); let before = Instant::now(); let (sender, receiver) = std::sync::mpsc::channel(); pool.schedule(move || { sender.send(()).unwrap(); }); receiver.recv_timeout(Duration::from_millis(500)).unwrap(); assert_elapsed(before, 0..100); } #[test] fn schedule_should_start_a_thread_if_none() { let pool = ThreadPool::new("pool1", 3).unwrap(); sleep(100); for _ in 0..3 { pool.schedule(move || { sleep(100); panic!("ignore this panic"); }); } sleep(200); assert_eq!(0, pool.num_live_threads()); pool.schedule(|| {}); assert_eq!(3, pool.num_live_threads()); } #[test] fn schedule_retries_when_queue_full() { let pool = ThreadPool::new("test", 1).unwrap(); let before = Instant::now(); pool.schedule(move || sleep(200)); pool.schedule(move || sleep(200)); sleep(100); while pool.try_schedule(|| {}).is_ok() {} pool.schedule(|| {}); assert_elapsed(before, 200..300); } #[test] fn try_schedule_should_run_the_fn() { let pool = ThreadPool::new("pool1", 1).unwrap(); let before = Instant::now(); let (sender, receiver) = std::sync::mpsc::channel(); pool.try_schedule(move || { sender.send(()).unwrap(); }) .unwrap(); receiver.recv_timeout(Duration::from_millis(500)).unwrap(); assert_elapsed(before, 0..100); } #[test] fn try_schedule_queue_full() { let pool = ThreadPool::new("pool1", 1).unwrap(); let before = Instant::now(); pool.try_schedule(move || sleep(100)).unwrap(); while pool.try_schedule(|| {}).is_ok() {} assert!(matches!( pool.try_schedule(|| {}), Err(TryScheduleError::QueueFull), )); assert_elapsed(before, 0..100); } #[test] fn check_concurrency1() { let pool = ThreadPool::new("pool1", 1).unwrap(); assert_in_range(1.0..1.99, &measure_concurrency(&pool, 1)); assert_in_range(2.0..2.99, &measure_concurrency(&pool, 2)); } #[test] fn check_concurrency2() { let pool = ThreadPool::new("pool1", 2).unwrap(); assert_in_range(1.0..1.99, &measure_concurrency(&pool, 1)); assert_in_range(1.0..1.99, &measure_concurrency(&pool, 2)); assert_in_range(2.0..2.99, &measure_concurrency(&pool, 3)); assert_in_range(2.0..2.99, &measure_concurrency(&pool, 4)); } #[test] fn check_concurrency5() { let pool = ThreadPool::new("pool1", 5).unwrap(); assert_in_range(1.0..1.99, &measure_concurrency(&pool, 5)); assert_in_range(2.0..2.99, &measure_concurrency(&pool, 6)); } #[test] fn should_respawn_when_idle() { let pool = ThreadPool::new("pool1", 2).unwrap(); sleep(100); pool.schedule(move || panic!("ignore this panic")); sleep(100); assert_eq!(1, pool.num_live_threads()); sleep(500); assert_eq!(2, pool.num_live_threads()); assert_in_range(1.0..1.99, &measure_concurrency(&pool, 2)); } #[test] fn should_respawn_after_recv() { let pool = ThreadPool::new("pool1", 2).unwrap(); sleep(100); pool.schedule(move || panic!("ignore this panic")); sleep(100); assert_eq!(1, pool.num_live_threads()); pool.schedule(move || sleep(200)); sleep(100); assert_eq!(2, pool.num_live_threads()); } #[test] fn should_respawn_after_executing_job() { let pool = ThreadPool::new("pool1", 2).unwrap(); pool.schedule(move || sleep(200)); pool.schedule(move || panic!("ignore this panic")); sleep(100); assert_eq!(1, pool.num_live_threads()); sleep(200); assert_eq!(2, pool.num_live_threads()); } #[test] fn join() { let pool = ThreadPool::new("pool1", 2).unwrap(); sleep(100); let before = Instant::now(); pool.schedule(move || sleep(100)); pool.join(); assert_elapsed(before, 100..200); } #[test] fn try_join_ok() { let pool = ThreadPool::new("pool1", 2).unwrap(); sleep(100); let before = Instant::now(); pool.schedule(move || sleep(100)); pool.try_join(Duration::from_millis(500)).unwrap(); assert_elapsed(before, 100..200); } #[test] fn try_join_timeout() { let pool = ThreadPool::new("pool1", 2).unwrap(); sleep(100); let before = Instant::now(); pool.schedule(move || sleep(200)); pool.try_join(Duration::from_millis(100)).unwrap_err(); assert_elapsed(before, 100..200); } #[test] fn debug() { let pool = ThreadPool::new("pool1", 1).unwrap(); assert_eq!("ThreadPool{\"pool1\",size=1}", format!("{:?}", pool)); }