extern crate tokio_executor; extern crate tokio_threadpool; extern crate env_logger; extern crate futures; extern crate rand; use tokio_threadpool::*; use futures::future::{lazy, poll_fn}; use futures::*; use rand::*; use std::sync::atomic::Ordering::*; use std::sync::atomic::*; use std::sync::*; use std::thread; use std::time::Duration; #[test] fn basic() { let _ = ::env_logger::try_init(); let pool = Builder::new().pool_size(1).max_blocking(1).build(); let (tx1, rx1) = mpsc::channel(); let (tx2, rx2) = mpsc::channel(); pool.spawn(lazy(move || { let res = blocking(|| { let v = rx1.recv().unwrap(); tx2.send(v).unwrap(); }) .unwrap(); assert!(res.is_ready()); Ok(().into()) })); pool.spawn(lazy(move || { tx1.send(()).unwrap(); Ok(().into()) })); rx2.recv().unwrap(); } #[test] fn other_executors_can_run_inside_blocking() { let _ = ::env_logger::try_init(); let pool = Builder::new().pool_size(1).max_blocking(1).build(); let (tx, rx) = mpsc::channel(); pool.spawn(lazy(move || { let res = blocking(|| { let _e = tokio_executor::enter().expect("nested blocking enter"); tx.send(()).unwrap(); }) .unwrap(); assert!(res.is_ready()); Ok(().into()) })); rx.recv().unwrap(); } #[test] fn notify_task_on_capacity() { const BLOCKING: usize = 10; let pool = Builder::new().pool_size(1).max_blocking(1).build(); let rem = Arc::new(AtomicUsize::new(BLOCKING)); let (tx, rx) = mpsc::channel(); for _ in 0..BLOCKING { let rem = rem.clone(); let tx = tx.clone(); pool.spawn(lazy(move || { poll_fn(move || { blocking(|| { thread::sleep(Duration::from_millis(100)); let prev = rem.fetch_sub(1, Relaxed); if prev == 1 { tx.send(()).unwrap(); } }) .map_err(|e| panic!("blocking err {:?}", e)) }) })); } rx.recv().unwrap(); assert_eq!(0, rem.load(Relaxed)); } #[test] fn capacity_is_use_it_or_lose_it() { use futures::sync::oneshot; use futures::task::Task; use futures::Async::*; use futures::*; // TODO: Run w/ bigger pool size let pool = Builder::new().pool_size(1).max_blocking(1).build(); let (tx1, rx1) = mpsc::channel(); let (tx2, rx2) = oneshot::channel(); let (tx3, rx3) = mpsc::channel(); let (tx4, rx4) = mpsc::channel(); // First, fill the blocking capacity pool.spawn(lazy(move || { poll_fn(move || { blocking(|| { rx1.recv().unwrap(); }) .map_err(|_| panic!()) }) })); pool.spawn(lazy(move || { rx2.map_err(|_| panic!()).and_then(|task: Task| { poll_fn(move || { blocking(|| { // Notify the other task task.notify(); // Block until woken rx3.recv().unwrap(); }) .map_err(|_| panic!()) }) }) })); // Spawn a future that will try to block, get notified, then not actually // use the blocking let mut i = 0; let mut tx2 = Some(tx2); pool.spawn(lazy(move || { poll_fn(move || { match i { 0 => { i = 1; let res = blocking(|| unreachable!()).map_err(|_| panic!()); assert!(res.unwrap().is_not_ready()); // Unblock the first blocker tx1.send(()).unwrap(); return Ok(NotReady); } 1 => { i = 2; // Skip blocking, and notify the second task that it should // start blocking let me = task::current(); tx2.take().unwrap().send(me).unwrap(); return Ok(NotReady); } 2 => { let res = blocking(|| unreachable!()).map_err(|_| panic!()); assert!(res.unwrap().is_not_ready()); // Unblock the first blocker tx3.send(()).unwrap(); tx4.send(()).unwrap(); Ok(().into()) } _ => unreachable!(), } }) })); rx4.recv().unwrap(); } #[test] fn blocking_thread_does_not_take_over_shutdown_worker_thread() { let pool = Builder::new().pool_size(2).max_blocking(1).build(); let (enter_tx, enter_rx) = mpsc::channel(); let (exit_tx, exit_rx) = mpsc::channel(); let (try_tx, try_rx) = mpsc::channel(); let exited = Arc::new(AtomicBool::new(false)); { let exited = exited.clone(); pool.spawn(lazy(move || { poll_fn(move || { blocking(|| { enter_tx.send(()).unwrap(); exit_rx.recv().unwrap(); exited.store(true, Relaxed); }) .map_err(|_| panic!()) }) })); } // Wait for the task to block let _ = enter_rx.recv().unwrap(); // Spawn another task that attempts to block pool.spawn(lazy(move || { poll_fn(move || { let res = blocking(|| {}).unwrap(); assert_eq!(res.is_ready(), exited.load(Relaxed)); try_tx.send(res.is_ready()).unwrap(); Ok(res) }) })); // Wait for the second task to try to block (and not be ready). let res = try_rx.recv().unwrap(); assert!(!res); // Unblock the first task exit_tx.send(()).unwrap(); // Wait for the second task to successfully block. let res = try_rx.recv().unwrap(); assert!(res); drop(pool); } #[test] fn blocking_one_time_gets_capacity_for_multiple_blocks() { const ITER: usize = 1; const BLOCKING: usize = 2; for _ in 0..ITER { let pool = Builder::new().pool_size(4).max_blocking(1).build(); let rem = Arc::new(AtomicUsize::new(BLOCKING)); let (tx, rx) = mpsc::channel(); for _ in 0..BLOCKING { let rem = rem.clone(); let tx = tx.clone(); pool.spawn(lazy(move || { poll_fn(move || { // First block let res = blocking(|| { thread::sleep(Duration::from_millis(100)); }) .map_err(|e| panic!("blocking err {:?}", e)); try_ready!(res); let res = blocking(|| { thread::sleep(Duration::from_millis(100)); let prev = rem.fetch_sub(1, Relaxed); if prev == 1 { tx.send(()).unwrap(); } }); assert!(res.unwrap().is_ready()); Ok(().into()) }) })); } rx.recv().unwrap(); assert_eq!(0, rem.load(Relaxed)); } } #[test] fn shutdown() { const ITER: usize = 1_000; const BLOCKING: usize = 10; for _ in 0..ITER { let num_inc = Arc::new(AtomicUsize::new(0)); let num_dec = Arc::new(AtomicUsize::new(0)); let (tx, rx) = mpsc::channel(); let pool = { let num_inc = num_inc.clone(); let num_dec = num_dec.clone(); Builder::new() .pool_size(1) .max_blocking(BLOCKING) .after_start(move || { num_inc.fetch_add(1, Relaxed); }) .before_stop(move || { num_dec.fetch_add(1, Relaxed); }) .build() }; let barrier = Arc::new(Barrier::new(BLOCKING)); for _ in 0..BLOCKING { let barrier = barrier.clone(); let tx = tx.clone(); pool.spawn(lazy(move || { let res = blocking(|| { barrier.wait(); Ok::<_, ()>(()) }) .unwrap(); tx.send(()).unwrap(); assert!(res.is_ready()); Ok(().into()) })); } for _ in 0..BLOCKING { rx.recv().unwrap(); } // Shutdown drop(pool); assert_eq!(11, num_inc.load(Relaxed)); assert_eq!(11, num_dec.load(Relaxed)); } } #[derive(Debug, Copy, Clone)] enum Sleep { Skip, Yield, Rand, Fixed(Duration), } #[test] fn hammer() { use self::Sleep::*; const ITER: usize = 5; let combos = [ (2, 4, 1_000, Skip), (2, 4, 1_000, Yield), (2, 4, 100, Rand), (2, 4, 100, Fixed(Duration::from_millis(3))), (2, 4, 100, Fixed(Duration::from_millis(12))), ]; for &(size, max_blocking, n, sleep) in &combos { for _ in 0..ITER { let pool = Builder::new() .pool_size(size) .max_blocking(max_blocking) .build(); let cnt_task = Arc::new(AtomicUsize::new(0)); let cnt_block = Arc::new(AtomicUsize::new(0)); for _ in 0..n { let cnt_task = cnt_task.clone(); let cnt_block = cnt_block.clone(); pool.spawn(lazy(move || { cnt_task.fetch_add(1, Relaxed); poll_fn(move || { blocking(|| { match sleep { Skip => {} Yield => { thread::yield_now(); } Rand => { let ms = thread_rng().gen_range(3, 12); thread::sleep(Duration::from_millis(ms)); } Fixed(dur) => { thread::sleep(dur); } } cnt_block.fetch_add(1, Relaxed); }) .map_err(|_| panic!()) }) })); } // Wait for the work to complete pool.shutdown_on_idle().wait().unwrap(); assert_eq!(n, cnt_task.load(Relaxed)); assert_eq!(n, cnt_block.load(Relaxed)); } } }