use tokio::{task, time}; use testtools::task::expect_runtime; use recstrm::*; enum MyErr { Something } #[tokio::test] async fn send_after_recv() { let (tx, rx) = Builder::new().queue_size(8).build::<_, ()>(); let jh = task::spawn(async move { time::sleep(time::Duration::from_millis(500)).await; tx.send_async("hello").await.unwrap(); }); if let Some(node) = rx.recv_async().await.unwrap() { assert_eq!(node, "hello"); } jh.await.unwrap(); } #[tokio::test] async fn send_before_recv() { let (tx, rx) = Builder::new().queue_size(8).build::<_, ()>(); let jh = task::spawn(async move { tx.send_async("hello").await.unwrap(); }); time::sleep(time::Duration::from_millis(500)).await; if let Some(node) = rx.recv_async().await.unwrap() { assert_eq!(node, "hello"); } jh.await.unwrap(); } #[tokio::test] async fn eof_after_sender_drop() { let (tx, rx) = Builder::new().queue_size(8).build::<(), ()>(); drop(tx); let Ok(None) = rx.recv_async().await else { panic!("Unexpectedly not Ok(None)"); }; } #[tokio::test] async fn eof_with_queued_records() { let (tx, rx) = Builder::new().queue_size(8).build::<_, ()>(); tx.send_async(42).await.unwrap(); drop(tx); let Ok(Some(v)) = rx.recv_async().await else { panic!("Unexpectedly not Ok(Some(42))"); }; assert_eq!(v, 42); let Ok(None) = rx.recv_async().await else { panic!("Unexpectedly not Ok(None)"); }; } #[tokio::test] async fn try_send_full_queue() { let (tx, _rx) = Builder::new().queue_size(1).build::<_, ()>(); tx.send_async(42).await.unwrap(); let Err(Error::QueueFull) = tx.try_send(1147) else { panic!("Unexpectedly not Err(Error::QueueFull)"); }; } #[tokio::test] async fn order() { let (tx, rx) = Builder::new().queue_size(4).build::<_, ()>(); tx.send_async(0).await.unwrap(); tx.send_async(1).await.unwrap(); tx.send_async(2).await.unwrap(); tx.send_async(3).await.unwrap(); assert_eq!(rx.recv_async().await.unwrap(), Some(0)); assert_eq!(rx.recv_async().await.unwrap(), Some(1)); assert_eq!(rx.recv_async().await.unwrap(), Some(2)); assert_eq!(rx.recv_async().await.unwrap(), Some(3)); } #[tokio::test] async fn batch_order() { let (tx, rx) = Builder::new().queue_size(4).build::<_, ()>(); let ints = [0, 1, 2, 3]; tx.send_batch_async(ints.into_iter()).await.unwrap(); assert_eq!(rx.recv_async().await.unwrap(), Some(0)); assert_eq!(rx.recv_async().await.unwrap(), Some(1)); assert_eq!(rx.recv_async().await.unwrap(), Some(2)); assert_eq!(rx.recv_async().await.unwrap(), Some(3)); } #[tokio::test] async fn recv_all() { let (tx, rx) = Builder::new().queue_size(4).build::<_, ()>(); let ints = [0, 1, 2, 3]; tx.send_batch_async(ints.into_iter()).await.unwrap(); let res = rx.recv_all_async().await.unwrap().unwrap(); assert_eq!(res, [0, 1, 2, 3]); } #[tokio::test] async fn recv_atmost() { let (tx, rx) = Builder::new().queue_size(4).build::<_, ()>(); let ints = [0, 1, 2, 3]; tx.send_batch_async(ints.into_iter()).await.unwrap(); let res = rx.recv_atmost_async(2).await.unwrap().unwrap(); assert_eq!(res, [0, 1]); let res = rx.recv_atmost_async(2).await.unwrap().unwrap(); assert_eq!(res, [2, 3]); } // Make sure that sender blocks on send if the queue is full. // // Because the call is expected to block we do it on a task, and make the // receiver sleep for a known about of time before taking out data from the // queue. #[tokio::test] async fn sender_blocking() { const GRACE_MS: u64 = 250; let (tx, rx) = Builder::new().queue_size(2).build::<_, ()>(); let jh = task::spawn(async { // Fill the queue up let ints = [0, 1]; tx.send_batch_async(ints.into_iter()).await.unwrap(); // Attempt to send another record. This should take at least 250ms, // because the main thread is delaying 250ms before takeing a record // out of the queue expect_runtime(time::Duration::from_millis(GRACE_MS), async move { tx.send_async(2).await.unwrap(); }) .await; }); // Wait a bit so we can measure the block of send() time::sleep(time::Duration::from_millis(GRACE_MS)).await; let res = rx.recv_atmost_async(2).await.unwrap().unwrap(); assert_eq!(res, [0, 1]); let res = rx.recv_async().await.unwrap(); assert_eq!(res, Some(2)); jh.await.unwrap(); } #[tokio::test] async fn receiver_disappeared() { let (tx, rx) = Builder::new().queue_size(4).build::<_, ()>(); drop(rx); let Err(Error::ReceiverDisappeared) = tx.send_async("hello").await else { panic!("Result unexpectedly not Err(Error::ReceiverDisappeared)"); }; } #[tokio::test] async fn fail_sender() { let (tx, rx) = Builder::new().queue_size(4).build::<&str, MyErr>(); tx.fail(MyErr::Something); let Err(Error::App(MyErr::Something)) = rx.recv_async().await else { panic!("Unexpected return value"); }; } #[tokio::test] async fn fail_receiver() { let (tx, rx) = Builder::new().queue_size(4).build::<&str, MyErr>(); rx.fail(MyErr::Something); let Err(Error::App(MyErr::Something)) = tx.send_async("hello").await else { panic!("Unexpected return value"); }; } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :