use std::{ collections::VecDeque, sync::{atomic::AtomicBool, Arc}, time::Duration, }; mod common; #[test] fn test() { common::setup_log(); let pool = threadpool_executor::threadpool::Builder::new() .core_pool_size(1) .maximum_pool_size(3) .keep_alive_time(Duration::from_secs(300)) .exeed_limit_policy(threadpool_executor::threadpool::ExceedLimitPolicy::CallerRuns) .build(); let mut expectations = VecDeque::new(); let e = 10; for i in 0..e { let j = i.clone(); let exp = pool .execute(move || { log::info!("Run in a thread pool!"); std::thread::sleep(Duration::from_secs(3)); j }) .unwrap(); expectations.push_back(exp); } for i in 0..e { let mut exp = expectations.pop_front().unwrap(); assert_eq!(exp.get_result().unwrap(), i); } let f = pool.execute(|| "abc"); assert_eq!(f.unwrap().get_result().unwrap(), "abc"); } #[test] fn test_panic() { common::setup_log(); let pool = threadpool_executor::ThreadPool::new(1); let r = pool.execute(|| { panic!("panic!!!"); }); let res = r.unwrap().get_result(); assert!(res.is_err()); if let Err(err) = res { matches!(err.kind(), threadpool_executor::error::ErrorKind::Panic); } let r = pool.execute(|| "abc"); assert_eq!(r.unwrap().get_result().unwrap(), "abc"); } #[test] fn test_timeout() { common::setup_log(); let pool = threadpool_executor::ThreadPool::new(1); let r = pool.execute(|| { std::thread::sleep(Duration::from_secs(3)); }); let res = r.unwrap().get_result_timeout(Duration::from_secs(1)); assert!(res.is_err()); if let Err(err) = res { matches!(err.kind(), threadpool_executor::error::ErrorKind::TimeOut); } } #[test] fn test_worker_exit() { common::setup_log(); let pool = threadpool_executor::threadpool::Builder::new() .core_pool_size(1) .maximum_pool_size(2) .keep_alive_time(Duration::from_secs(1)) .build(); pool.execute(|| std::thread::sleep(Duration::from_secs(1))) .unwrap(); pool.execute(|| std::thread::sleep(Duration::from_secs(1))) .unwrap(); std::thread::sleep(Duration::from_secs(10)); assert_eq!(1, pool.size()); } #[test] fn test_reject() { common::setup_log(); let pool = threadpool_executor::threadpool::Builder::new() .core_pool_size(1) .maximum_pool_size(1) .exeed_limit_policy(threadpool_executor::threadpool::ExceedLimitPolicy::Reject) .build(); let res = pool.execute(|| { std::thread::sleep(std::time::Duration::from_secs(3)); }); assert!(res.is_ok()); let res = pool.execute(|| "a"); assert!(res.is_err()); if let Err(err) = res { matches!( err.kind(), threadpool_executor::error::ErrorKind::TaskRejected ); } } #[test] fn test_cancel() { common::setup_log(); let pool = threadpool_executor::ThreadPool::new(1); let a = Arc::new(AtomicBool::new(true)); let a1 = a.clone(); let mut r1 = pool .execute(move || { std::thread::sleep(std::time::Duration::from_secs(3)); a1.store(false, std::sync::atomic::Ordering::Relaxed); }) .unwrap(); std::thread::sleep(Duration::from_secs(1)); let r1c = r1.cancel(); assert!(r1c.is_err()); if let Err(err) = r1c { matches!( err.kind(), threadpool_executor::error::ErrorKind::TaskRunning ); } let b = Arc::new(AtomicBool::new(true)); let b1 = b.clone(); let mut r2 = pool .execute(move || b1.store(false, std::sync::atomic::Ordering::Relaxed)) .unwrap(); r2.cancel().unwrap(); std::thread::sleep(Duration::from_secs(5)); assert!(r2.is_cancelled()); assert!(b.load(std::sync::atomic::Ordering::Relaxed)); assert!(r1.is_done()); assert!(!a.load(std::sync::atomic::Ordering::Relaxed)); }