use futures::pin_mut; use std::future::Future; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; use std::sync::Arc; use std::sync::Mutex; use std::task::Poll; use std::task::Wake; #[test] fn sync_to_async_and_back() { let (tx, rx) = batch_channel::bounded::<()>(1); let tx = tx.into_sync(); let rx = rx.into_sync(); let tx = tx.into_async(); let rx = rx.into_async(); _ = (tx, rx); } static COUNT: AtomicUsize = AtomicUsize::new(0); #[derive(Debug)] struct Counter { inc: usize, } impl Counter { fn new() -> Counter { let inc = 1; COUNT.fetch_add(inc, Ordering::AcqRel); Counter { inc } } } impl Drop for Counter { fn drop(&mut self) { COUNT.fetch_sub(self.inc, Ordering::AcqRel); } } // cargo test runs threads multithreaded by default. crazy. static COUNTER_TEST_MUTEX: Mutex<()> = Mutex::new(()); #[test] fn counter_instances_count() { let _l = COUNTER_TEST_MUTEX.lock().unwrap(); assert_eq!(0, COUNT.load(Ordering::Acquire)); let c1 = Counter::new(); assert_eq!(1, COUNT.load(Ordering::Acquire)); let c2 = Counter::new(); assert_eq!(2, COUNT.load(Ordering::Acquire)); drop(c1); assert_eq!(1, COUNT.load(Ordering::Acquire)); drop(c2); assert_eq!(0, COUNT.load(Ordering::Acquire)); } #[test] fn closing_rx_drops_elements() { let _l = COUNTER_TEST_MUTEX.lock().unwrap(); let (tx, rx) = batch_channel::bounded_sync(2); tx.send(Counter::new()).unwrap(); assert_eq!(1, COUNT.load(Ordering::Acquire)); tx.send(Counter::new()).unwrap(); assert_eq!(2, COUNT.load(Ordering::Acquire)); drop(rx); assert_eq!(0, COUNT.load(Ordering::Acquire)); } struct TestWake; impl Wake for TestWake { fn wake(self: Arc) {} } #[test] fn cancel_send() { let waker = Arc::new(TestWake).into(); let mut cx = std::task::Context::from_waker(&waker); let (tx, _rx) = batch_channel::bounded(1); let send_fut = tx.send('a'); pin_mut!(send_fut); assert_eq!(Poll::Ready(Ok(())), send_fut.poll(&mut cx)); let send_fut = tx.send('b'); pin_mut!(send_fut); assert_eq!(Poll::Pending, send_fut.as_mut().poll(&mut cx)); drop(send_fut); } #[test] fn cancel_send_iter() { let waker = Arc::new(TestWake).into(); let mut cx = std::task::Context::from_waker(&waker); let (tx, _rx) = batch_channel::bounded(1); let send_fut = tx.send_iter(vec!['a', 'b']); pin_mut!(send_fut); assert_eq!(Poll::Pending, send_fut.as_mut().poll(&mut cx)); drop(send_fut); } #[test] fn cancel_recv() { let waker = Arc::new(TestWake).into(); let mut cx = std::task::Context::from_waker(&waker); let (_tx, rx) = batch_channel::bounded::(1); let recv_fut = rx.recv(); pin_mut!(recv_fut); assert_eq!(Poll::Pending, recv_fut.as_mut().poll(&mut cx)); drop(recv_fut); } #[test] fn cancel_recv_batch() { let waker = Arc::new(TestWake).into(); let mut cx = std::task::Context::from_waker(&waker); let (_tx, rx) = batch_channel::bounded::(1); let recv_fut = rx.recv_batch(100); pin_mut!(recv_fut); assert_eq!(Poll::Pending, recv_fut.as_mut().poll(&mut cx)); drop(recv_fut); } #[test] fn cancel_recv_vec() { let waker = Arc::new(TestWake).into(); let mut cx = std::task::Context::from_waker(&waker); let (_tx, rx) = batch_channel::bounded::(1); let mut v = Vec::with_capacity(100); let recv_fut = rx.recv_vec(v.capacity(), &mut v); pin_mut!(recv_fut); assert_eq!(Poll::Pending, recv_fut.as_mut().poll(&mut cx)); drop(recv_fut); }