use std::cell::Cell; use std::sync::{Arc, Mutex}; use std::thread; use std::time::Duration; use futures_executor::block_on; use async_resource::PoolConfig; mod utils; use utils::AtomicCounter; fn counter_pool_config() -> PoolConfig { let source = Arc::new(AtomicCounter::default()); PoolConfig::::new(move || { let s = source.clone(); async move { Ok(s.increment()) } }) } #[test] fn test_pool_acquire_order_timeout() { let disposed = Arc::new(AtomicCounter::default()); let dcopy = disposed.clone(); let pool = counter_pool_config() .idle_timeout(Duration::from_secs(1)) .dispose(move |_res, _| { dcopy.increment(); }) .build(); block_on(async move { let mut fst = pool.acquire().await.unwrap(); let mut snd = pool.acquire().await.unwrap(); assert_eq!(*fst, 1); assert_eq!(*snd, 2); // 2 should be returned directly to the idle queue drop(snd); snd = pool.acquire().await.unwrap(); assert_eq!(*snd, 2); // 1 should be returned directly to the idle queue drop(fst); fst = pool.acquire().await.unwrap(); assert_eq!(*fst, 1); // with all resources dropped, shutdown should be quick drop(fst); drop(snd); pool.drain(Duration::from_millis(500)).await.unwrap(); assert_eq!(disposed.value(), 2); }) } #[test] fn test_pool_acquire_order_no_timeout() { let disposed = Arc::new(AtomicCounter::default()); let dcopy = disposed.clone(); let mut pool = counter_pool_config() .dispose(move |_res, _| { dcopy.increment(); }) .build(); block_on(async move { let fst = pool.acquire().await.unwrap(); let snd = pool.acquire().await.unwrap(); assert_eq!(*fst, 1); assert_eq!(*snd, 2); drop(snd); // when there is no idle timeout and the pool is not busy (no waiters) // then 2 should be disposed, not returned to the idle queue let trd = pool.acquire().await.unwrap(); assert_eq!(*trd, 3); drop(fst); // check the resources we released (1 and 2) have been disposed assert_eq!(disposed.value(), 2); // shutdown must time out because a resource is held pool = pool.drain(Duration::from_millis(50)).await.unwrap_err(); drop(trd); pool.drain(Duration::from_millis(500)).await.unwrap(); }); } #[test] // demonstrate a resource type that is Send but !Sync fn test_pool_not_sync() { let source = Arc::new(AtomicCounter::default()); let pool = PoolConfig::, ()>::new(move || { let s = source.clone(); async move { Ok(Cell::new(s.increment())) } }) .build(); block_on(async move { assert_eq!(pool.acquire().await.unwrap().get(), 1); }); } #[test] // test support for resource waiters fn test_pool_waiter() { let waiting = Arc::new(AtomicCounter::default()); let results = Arc::new(Mutex::new(vec![])); let pool = counter_pool_config().max_count(1).build(); let p1 = pool.clone(); let mut waiters = 3; // load first resource results .lock() .unwrap() .push(block_on(async move { p1.acquire().await.unwrap() })); // create waiters for the resource for _ in 0..waiters { let pool = pool.clone(); let results = results.clone(); let waiting = waiting.clone(); thread::spawn(|| { block_on(async move { let wait = pool.acquire(); waiting.increment(); let result = wait.await; // intentionally poison mutex on failure (acquire timeout) results.lock().unwrap().push(result.unwrap()); waiting.decrement(); }) }); } // spin until waiters are queued up loop { if waiting.value() == waiters { break; } thread::yield_now(); } // exhaust waiters // since the queue is 'busy' from this point and there is no expiry, // the same resource will be returned each time loop { assert_eq!(*results.lock().unwrap().pop().unwrap(), 1); waiters -= 1; loop { if waiting.value() == waiters { break; } // check mutex survives assert!(results.lock().unwrap().len() <= 1); thread::yield_now(); } if waiters == 0 { break; } } let mut res = results.lock().unwrap(); assert!(*res.pop().unwrap() == 1); assert!(res.is_empty()); }