use std::thread::{self, scope}; use std::time::{Duration, Instant}; use loole::{bounded, RecvError, SendError}; fn ms(ms: u64) -> Duration { Duration::from_millis(ms) } fn sync_sleep(ms: u64) { thread::sleep(Duration::from_millis(ms)) } #[test] fn sync_send_before_recv_buffer_0() { let (tx, rx) = bounded(0); let h = thread::spawn(move || tx.send(1)); sync_sleep(100); assert_eq!(rx.recv(), Ok(1)); assert_eq!(h.join().unwrap(), Ok(())); } #[test] fn sync_recv_before_send_buffer_0() { let (tx, rx) = bounded(0); let h = thread::spawn(move || rx.recv()); sync_sleep(100); assert_eq!(tx.send(1), Ok(())); assert_eq!(h.join().unwrap(), Ok(1)); } #[test] fn sync_send_before_recv_buffer_1() { let (tx, rx) = bounded(1); assert_eq!(tx.send(1), Ok(())); assert_eq!(rx.recv(), Ok(1)); } #[test] fn sync_recv_before_send_buffer_1() { let (tx, rx) = bounded(1); let h = thread::spawn(move || rx.recv()); sync_sleep(100); assert_eq!(tx.send(1), Ok(())); assert_eq!(h.join().unwrap(), Ok(1)); } #[test] fn sync_recv_after_manually_closed_sender() { let (tx, rx) = bounded(1); assert_eq!(tx.send(1), Ok(())); assert!(tx.close()); assert_eq!(rx.recv(), Ok(1)); assert_eq!(rx.recv(), Err(RecvError::Disconnected)); } #[test] fn sync_recv_after_manually_closeed_receiver() { let (tx, rx) = bounded(1); assert_eq!(tx.send(1), Ok(())); assert!(rx.close()); assert_eq!(rx.recv(), Ok(1)); assert_eq!(rx.recv(), Err(RecvError::Disconnected)); } #[test] fn sync_is_closed_closed_by_sender_drop() { let (tx, rx) = bounded::<()>(1); assert!(!rx.is_closed()); drop(tx); assert!(rx.is_closed()); } #[test] fn sync_is_closed_closed_by_receiver_drop() { let (tx, rx) = bounded::<()>(1); assert!(!tx.is_closed()); drop(rx); assert!(tx.is_closed()); } #[test] fn sync_2_sends_before_2_recvs_buffer_1() { let (tx, rx) = bounded(1); assert_eq!(tx.capacity(), Some(1)); assert_eq!(tx.len(), 0); tx.send(1).unwrap(); assert_eq!(tx.len(), 1); thread::spawn(move || tx.send(2)); sync_sleep(100); assert_eq!(rx.len(), 1); let r1r = rx.recv(); assert_eq!(r1r, Ok(1)); assert_eq!(rx.len(), 1); let r2r = rx.recv(); assert_eq!(r2r, Ok(2)); assert_eq!(rx.len(), 0); } #[test] fn sync_send() { let (tx, rx) = bounded(2); scope(|scope| { scope.spawn(move || { assert_eq!(tx.send(1), Ok(())); assert_eq!(tx.send(2), Ok(())); thread::sleep(ms(1500)); assert_eq!(tx.send(4), Ok(())); thread::sleep(ms(1000)); assert_eq!(tx.send(5), Err(SendError(5))); }); scope.spawn(move || { thread::sleep(ms(1000)); assert_eq!(rx.recv(), Ok(1)); thread::sleep(ms(1000)); assert_eq!(rx.recv(), Ok(2)); assert_eq!(rx.recv(), Ok(4)); }); }); } #[test] fn sync_shift_pending_send_buffer_0() { let (tx, rx) = bounded(0); let tx_clone = tx.clone(); scope(|scope| { scope.spawn(move || { let start = Instant::now(); assert_eq!(tx.send(1), Ok(())); let elapsed = start.elapsed(); assert!(ms(900) <= elapsed && elapsed <= ms(1100)); }); scope.spawn(move || { let start = Instant::now(); thread::sleep(ms(100)); assert_eq!(tx_clone.send(2), Ok(())); let elapsed = start.elapsed(); assert!(ms(1900) <= elapsed && elapsed <= ms(2100)); }); scope.spawn(move || { thread::sleep(ms(1000)); assert_eq!(rx.recv(), Ok(1)); thread::sleep(ms(1000)); assert_eq!(rx.recv(), Ok(2)); }); }); } #[test] fn sync_shift_pending_send_buffer_2() { let (tx, rx) = bounded(2); scope(|scope| { assert_eq!(tx.send(1), Ok(())); assert_eq!(tx.send(2), Ok(())); scope.spawn(move || { let start = Instant::now(); assert_eq!(tx.send(3), Ok(())); let elapsed = start.elapsed(); assert!(ms(900) <= elapsed && elapsed <= ms(1100)); }); scope.spawn(move || { thread::sleep(ms(1000)); assert_eq!(rx.recv(), Ok(1)); }); }); } #[test] fn sync_drain() { let (tx, rx) = bounded(2); let tx_clone_1 = tx.clone(); let tx_clone_2 = tx.clone(); scope(|scope| { scope.spawn(move || { assert_eq!(tx.send(1), Ok(())); assert_eq!(tx.send(2), Ok(())); }); scope.spawn(move || { let start = Instant::now(); thread::sleep(ms(100)); assert_eq!(tx_clone_1.send(3), Ok(())); let elapsed = start.elapsed(); assert!(ms(900) <= elapsed && elapsed <= ms(1100)); }); scope.spawn(move || { let start = Instant::now(); thread::sleep(ms(100)); assert_eq!(tx_clone_2.send(4), Ok(())); let elapsed = start.elapsed(); assert!(ms(900) <= elapsed && elapsed <= ms(1100)); }); scope.spawn(move || { thread::sleep(ms(1000)); assert_eq!(rx.len(), 2); let v = rx.drain().collect::>(); assert_eq!(v, [1, 2]); assert_eq!(rx.len(), 2); }); }); }