use futures::future::lazy; use futures::prelude::*; use tokio::runtime::Runtime; use tokio::timer::Interval; use tokio_blocking::{FutureBlock, ThreadPool}; /// To visually check if blocking operations are not blocking other tasks fn dotting() -> impl Future { Interval::new(std::time::Instant::now(), std::time::Duration::from_secs(1)) .map(|_| println!(".")) .for_each(|_| Ok(())) .map_err(|_| ()) } #[test] fn simplest() { let mut runtime = Runtime::new().unwrap(); let pool = ThreadPool::new(4); let task = lazy(|| Ok::<_, ()>(3)) .and_then(|_| { // Normal non-blocking operations Ok(()) }) .and_then_block(pool, move |_| { // Allow blocking operation, which doesn't actually block other futures std::thread::sleep(std::time::Duration::from_secs(3)); Ok(10) }) .and_then(|_| { // Normal non-blocking operation Ok(()) }); match runtime.block_on(task.select(dotting())) { Ok(_) => {} Err(_) => panic!(), } } #[test] fn and_then() { let mut runtime = Runtime::new().unwrap(); let pool = ThreadPool::new(4); let task = lazy(|| Ok::<_, ()>(3)) .and_then_block(pool, move |item| { println!("Before block: {:?}: {}", std::thread::current().id(), item); let secs = 3; println!("In blocking: {:?}: {}", std::thread::current().id(), item); std::thread::sleep(std::time::Duration::from_secs(secs)); println!("Blocked: {:?}: {}", std::thread::current().id(), item); Ok(secs) }) .or_else(|_| { assert_eq!(false, true, "this path should be unreachable"); Err(()) }) .and_then(|value| { println!( "After block: {:?}: Slept for {} seconds", std::thread::current().id(), value ); assert_eq!(value, 3); Ok(()) }); match runtime.block_on(task.select(dotting())) { Ok(_) => {} Err(_) => panic!(), } } #[test] fn or_else() { let mut runtime = Runtime::new().unwrap(); let pool = ThreadPool::new(4); let task = lazy(|| Ok::<_, u64>(3)) .and_then_block(pool, move |item| { println!("Before block: {:?}: {}", std::thread::current().id(), item); let secs = 3; println!("In blocking: {:?}: {}", std::thread::current().id(), item); std::thread::sleep(std::time::Duration::from_secs(secs)); println!("Blocked: {:?}: {}", std::thread::current().id(), item); Err::<(), _>(secs) }) .and_then(|_| { assert_eq!(false, true, "this path should be unreachable"); Ok(()) }) .or_else(|value| { println!( "After block: {:?}: Slept for {} seconds", std::thread::current().id(), value ); assert_eq!(value, 3); Ok::<_, u64>(()) }) .map_err(|_| ()); match runtime.block_on(task.select(dotting())) { Ok(_) => {} Err(_) => panic!(), } } #[test] fn then() { let mut runtime = Runtime::new().unwrap(); let pool = ThreadPool::new(4); let task = lazy(|| Ok::<_, u64>(3)) .then_block(pool.clone(), move |item| { let item = match item { Ok(item) => item, Err(_) => unreachable!(), }; println!("Before block: {:?}: {}", std::thread::current().id(), item); let secs = 3; println!("In blocking: {:?}: {}", std::thread::current().id(), item); std::thread::sleep(std::time::Duration::from_secs(secs)); println!("Blocked: {:?}: {}", std::thread::current().id(), item); Err::<(), _>(3) }) .then_block(pool, move |item| { let item = match item { Ok(_) => unreachable!(), Err(e) => e, }; println!("Before block: {:?}: {}", std::thread::current().id(), item); let secs = 3; println!("In blocking: {:?}: {}", std::thread::current().id(), item); std::thread::sleep(std::time::Duration::from_secs(secs)); println!("Blocked: {:?}: {}", std::thread::current().id(), item); Ok::<_, ()>(()) }) .map_err(|_| ()); match runtime.block_on(task.select(dotting())) { Ok(_) => {} Err(_) => panic!(), } }