use std::{thread, time}; use recstrm::*; use testtools::thread::expect_runtime; enum MyErr { Something } #[derive(PartialEq, Eq, Hash, Clone)] enum Checkpoint { RecSent } #[test] fn send_after_recv() { let (tx, rx) = Builder::new().queue_size(8).build::<_, ()>(); let jh = thread::spawn(move || { thread::sleep(time::Duration::from_millis(500)); tx.send("hello").unwrap(); }); if let Some(node) = rx.recv().unwrap() { assert_eq!(node, "hello"); } jh.join().unwrap(); } #[test] fn send_before_recv() { let sync = testtools::sync::Checkpoint::new(); let (tx, rx) = Builder::new().queue_size(8).build::<_, ()>(); let tsync = sync.clone(); let jh = thread::spawn(move || { tx.send("hello").unwrap(); tsync.reached(Checkpoint::RecSent); }); sync.waitfor([Checkpoint::RecSent]); if let Some(node) = rx.recv().unwrap() { assert_eq!(node, "hello"); } jh.join().unwrap(); } #[test] fn eof_after_sender_drop() { let (tx, rx) = Builder::new().queue_size(8).build::<(), ()>(); drop(tx); let Ok(None) = rx.recv() else { panic!("Unexpectedly not Ok(None)"); }; } #[test] fn eof_with_queued_records() { let (tx, rx) = Builder::new().queue_size(8).build::<_, ()>(); tx.send(42).unwrap(); drop(tx); let Ok(Some(v)) = rx.recv() else { panic!("Unexpectedly not Ok(Some(42))"); }; assert_eq!(v, 42); let Ok(None) = rx.recv() else { panic!("Unexpectedly not Ok(None)"); }; } #[test] fn try_send_full_queue() { let (tx, _rx) = Builder::new().queue_size(1).build::<_, ()>(); tx.send(42).unwrap(); let Err(Error::QueueFull) = tx.try_send(1147) else { panic!("Unexpectedly not Err(Error::QueueFull)"); }; } #[test] fn order() { let (tx, rx) = Builder::new().queue_size(4).build::<_, ()>(); tx.send(0).unwrap(); tx.send(1).unwrap(); tx.send(2).unwrap(); tx.send(3).unwrap(); assert_eq!(rx.recv().unwrap(), Some(0)); assert_eq!(rx.recv().unwrap(), Some(1)); assert_eq!(rx.recv().unwrap(), Some(2)); assert_eq!(rx.recv().unwrap(), Some(3)); } #[test] fn batch_order() { let (tx, rx) = Builder::new().queue_size(4).build::<_, ()>(); let ints = [0, 1, 2, 3]; tx.send_batch(ints.into_iter()).unwrap(); assert_eq!(rx.recv().unwrap(), Some(0)); assert_eq!(rx.recv().unwrap(), Some(1)); assert_eq!(rx.recv().unwrap(), Some(2)); assert_eq!(rx.recv().unwrap(), Some(3)); } #[test] fn recv_all() { let (tx, rx) = Builder::new().queue_size(4).build::<_, ()>(); let ints = [0, 1, 2, 3]; tx.send_batch(ints.into_iter()).unwrap(); let res = rx.recv_all().unwrap().unwrap(); assert_eq!(res, [0, 1, 2, 3]); } #[test] fn recv_atmost() { let (tx, rx) = Builder::new().queue_size(4).build::<_, ()>(); let ints = [0, 1, 2, 3]; tx.send_batch(ints.into_iter()).unwrap(); let res = rx.recv_atmost(2).unwrap().unwrap(); assert_eq!(res, [0, 1]); let res = rx.recv_atmost(2).unwrap().unwrap(); assert_eq!(res, [2, 3]); } // Make sure that sender blocks on send if the queue is full. // // Because the call is expected to block we do it on a thread, and make the // receiver sleep for a known about of time before taking out data from the // queue. #[test] fn sender_blocking() { const GRACE_MS: u64 = 250; let (tx, rx) = Builder::new().queue_size(2).build::<_, ()>(); let jh = thread::spawn(move || { // Fill the queue up let ints = [0, 1]; tx.send_batch(ints.into_iter()).unwrap(); // Attempt to send another record. This should take at least 250ms, // because the main thread is delaying 250ms before takeing a record // out of the queue expect_runtime(time::Duration::from_millis(GRACE_MS), || { tx.send(2).unwrap(); }); }); // Wait a bit so we can measure the block of send() thread::sleep(time::Duration::from_millis(GRACE_MS)); let res = rx.recv_atmost(2).unwrap().unwrap(); assert_eq!(res, [0, 1]); let res = rx.recv().unwrap(); assert_eq!(res, Some(2)); jh.join().unwrap(); } #[test] fn receiver_disappeared() { let (tx, rx) = Builder::new().queue_size(4).build::<_, ()>(); drop(rx); let Err(Error::ReceiverDisappeared) = tx.send("hello") else { panic!("Result unexpectedly not Err(Error::ReceiverDisappeared)"); }; } #[test] fn fail_sender() { let (tx, rx) = Builder::new().queue_size(4).build::<&str, MyErr>(); tx.fail(MyErr::Something); let Err(Error::App(MyErr::Something)) = rx.recv() else { panic!("Unexpected return value"); }; } #[test] fn fail_receiver() { let (tx, rx) = Builder::new().queue_size(4).build::<&str, MyErr>(); rx.fail(MyErr::Something); let Err(Error::App(MyErr::Something)) = tx.send("hello") else { panic!("Unexpected return value"); }; } // Construct a records queue with a fixed number of records. Drop the sender. // The receiver should return Error::RecordsUnderflow when trying to read // from the queue. #[test] fn drop_sender_before_num_recs() { let (tx, rx) = Builder::new() .queue_size(4) .num_records(8) .build::<&str, ()>(); drop(tx); let Err(Error::RecordsUnderflow) = rx.recv() else { panic!("Result unexpectedly not Err(Error::ReceiverDisappeared)"); }; } // Make sure that there's no off-by-one error when checking for // RecordsUnderflow. #[test] fn drop_sender_before_num_recs_one_off() { let (tx, rx) = Builder::new().queue_size(4).num_records(8).build::<_, ()>(); // Fill up queue let ints = [0, 1, 2, 3]; tx.send_batch(ints.into_iter()).unwrap(); // Drain queue let res = rx.recv_all().unwrap().unwrap(); assert_eq!(res, [0, 1, 2, 3]); // Add elements so we reach num expected records - 1 let ints = [4, 5, 6]; tx.send_batch(ints.into_iter()).unwrap(); drop(tx); let Err(Error::RecordsUnderflow) = rx.recv() else { panic!("Result unexpectedly not Err(Error::ReceiverDisappeared)"); }; } // Make sure that there's no off-by-one error when checking for // RecordsUnderflow. #[test] fn drop_sender_after_num_recs() { let (tx, rx) = Builder::new().queue_size(4).num_records(8).build::<_, ()>(); // Fill up queue let ints = [0, 1, 2, 3]; tx.send_batch(ints.into_iter()).unwrap(); // Drain queue let res = rx.recv_all().unwrap().unwrap(); assert_eq!(res, [0, 1, 2, 3]); // Add elements so we reach num expected records - 1 let ints = [4, 5, 6, 7]; tx.send_batch(ints.into_iter()).unwrap(); drop(tx); // Drain queue let res = rx.recv_all().unwrap().unwrap(); assert_eq!(res, [4, 5, 6, 7]); } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :