use bmrng::unbounded::UnboundedRequestReceiverStream; use bmrng::{error::*, RequestReceiverStream}; use futures_util::stream::StreamExt; use tokio::time::{advance, pause, resume, sleep, Duration}; #[tokio::test] async fn unbounded_send_receive() { let (tx, mut rx) = bmrng::unbounded_channel::(); tokio::spawn(async move { let (input, responder) = rx.recv().await.expect("Unexpected err"); assert!(!responder.is_closed()); let res = responder.respond(input * input); assert!(res.is_ok()); }); assert!(!tx.is_closed()); let response = tx.send_receive(8).await; assert!(tx.is_closed()); assert_eq!(response, Ok(64)); } #[tokio::test] async fn bounded_send_receive() { let (tx, mut rx) = bmrng::channel::(1); tokio::spawn(async move { let (input, responder) = rx.recv().await.expect("Unexpected err"); assert!(!responder.is_closed()); let res = responder.respond(input * input); assert!(res.is_ok()); }); assert!(!tx.is_closed()); let response = tx.send_receive(8).await; assert!(tx.is_closed()); assert_eq!(response, Ok(64)); } #[tokio::test] async fn unbounded_request_sender_clone() { let (tx, mut rx) = bmrng::unbounded_channel::(); let tx2 = tx.clone(); tokio::spawn(async move { let response = tx2.send_receive(7).await; assert_eq!(response, Ok(49)); }); tokio::spawn(async move { while let Ok((input, responder)) = rx.recv().await { assert!(!responder.is_closed()); let res = responder.respond(input * input); assert!(res.is_ok()); } }); assert!(!tx.is_closed()); let response = tx.send_receive(8).await; assert_eq!(response, Ok(64)); } #[tokio::test] async fn bounded_request_sender_clone() { let (tx, mut rx) = bmrng::channel::(1); let tx2 = tx.clone(); tokio::spawn(async move { let response = tx2.send_receive(7).await; assert_eq!(response, Ok(49)); }); tokio::spawn(async move { while let Ok((input, responder)) = rx.recv().await { assert!(!responder.is_closed()); let res = responder.respond(input * input); assert!(res.is_ok()); } }); assert!(!tx.is_closed()); let response = tx.send_receive(8).await; assert_eq!(response, Ok(64)); } #[tokio::test] async fn unbounded_drop_while_waiting_for_response() { let (tx, mut rx) = bmrng::unbounded_channel::(); let task = tokio::spawn(async move { let (_, responder) = rx.recv().await.expect("Unexpected err"); drop(responder); }); let response = tx.send_receive(8).await; assert!(tokio::join!(task).0.is_ok()); assert_eq!(response, Err(RequestError::RecvError)); } #[tokio::test] async fn unbounded_drop_while_waiting_for_request() { let (tx, mut rx) = bmrng::unbounded_channel::(); let task = tokio::spawn(async move { if rx.recv().await.is_ok() { panic!("this should not be ok") }; }); drop(tx); assert!(tokio::join!(task).0.is_ok()); } #[tokio::test] async fn unbounded_drop_sender_while_sending_response() { let (tx, mut rx) = bmrng::unbounded_channel::(); let task = tokio::spawn(async move { let (_, responder) = rx.recv().await.expect("Received err"); let respond_result = responder.respond(42); assert_eq!(respond_result, Err(RespondError(42))); }); let response_receiver = tx.send(21); drop(response_receiver); assert!(tokio::join!(task).0.is_ok()); } #[tokio::test] async fn bounded_drop_while_waiting_for_response() { let (tx, mut rx) = bmrng::channel::(1); let task = tokio::spawn(async move { let (_, responder) = rx.recv().await.expect("Unexpected err"); drop(responder); }); let response = tx.send_receive(8).await; assert!(tokio::join!(task).0.is_ok()); assert_eq!(response, Err(RequestError::RecvError)); } #[tokio::test] async fn bounded_drop_while_waiting_for_request() { let (tx, mut rx) = bmrng::channel::(1); let task = tokio::spawn(async move { rx.recv().await.expect_err("this should not be ok"); }); drop(tx); assert!(tokio::join!(task).0.is_ok()); } #[tokio::test] async fn bounded_drop_sender_while_sending_response() { let (tx, mut rx) = bmrng::channel::(1); let task = tokio::spawn(async move { let (_, responder) = rx.recv().await.expect("Unexpected err"); let respond_result = responder.respond(42); assert_eq!(respond_result, Err(RespondError(42))); }); let response_receiver = tx.send(21).await; drop(response_receiver); assert!(tokio::join!(task).0.is_ok()); } #[tokio::test] async fn bounded_close_request_receiver() { let (tx, mut rx) = bmrng::channel::(4); let task = tokio::spawn(async move { rx.close(); let (input, responder) = rx.recv().await.unwrap(); assert!(responder.respond(input * 2).is_ok()); }); let mut response_receiver = tx.send(21).await.unwrap(); let response = response_receiver.recv().await; assert_eq!(response, Ok(42)); drop(response_receiver); assert!(tx.send(1).await.is_err()); assert!(tokio::join!(task).0.is_ok()); } #[tokio::test] async fn unbounded_close_request_receiver() { let (tx, mut rx) = bmrng::unbounded_channel::(); let task = tokio::spawn(async move { rx.close(); let (input, responder) = rx.recv().await.unwrap(); assert!(responder.respond(input * 2).is_ok()); }); let mut response_receiver = tx.send(21).unwrap(); let response = response_receiver.recv().await; assert_eq!(response, Ok(42)); drop(response_receiver); assert!(tx.send(1).is_err()); assert!(tokio::join!(task).0.is_ok()); } #[tokio::test] async fn bounded_timeout() { let (tx, mut rx) = bmrng::channel_with_timeout::(1, Duration::from_millis(100)); pause(); tokio::spawn(async move { let (_input, responder) = rx.recv().await.expect("Unexpected err"); assert!(!responder.is_closed()); advance(Duration::from_millis(200)).await; sleep(Duration::from_micros(1)).await; resume(); panic!("Should have timed out"); }); assert!(!tx.is_closed()); let response = tx.send_receive(8).await; assert_eq!(response, Err(RequestError::::RecvTimeoutError)); } #[tokio::test] async fn unbounded_timeout() { let (tx, mut rx) = bmrng::unbounded::channel_with_timeout::(Duration::from_millis(100)); pause(); tokio::spawn(async move { let (_input, responder) = rx.recv().await.expect("Unexpected err"); assert!(!responder.is_closed()); advance(Duration::from_millis(200)).await; sleep(Duration::from_micros(1)).await; resume(); panic!("Should have timed out"); }); assert!(!tx.is_closed()); let response = tx.send_receive(8).await; assert_eq!(response, Err(RequestError::::RecvTimeoutError)); } #[tokio::test] async fn bounded_stream() { let (tx, rx) = bmrng::channel::(1); tokio::spawn(async move { let mut stream = rx.into_stream(); while let Some((input, responder)) = stream.next().await { assert_eq!(responder.is_closed(), false); let res = responder.respond(input * input); assert!(res.is_ok()); } }); assert!(!tx.is_closed()); assert_eq!(tx.send_receive(8).await, Ok(64)); assert!(!tx.is_closed()); assert_eq!(tx.send_receive(3).await, Ok(9)); assert!(!tx.is_closed()); assert_eq!(tx.send_receive(1).await, Ok(1)); assert!(!tx.is_closed()); } #[tokio::test] async fn unbounded_stream() { let (tx, rx) = bmrng::unbounded_channel::(); tokio::spawn(async move { let mut stream = rx.into_stream(); while let Some((input, responder)) = stream.next().await { assert!(!responder.is_closed()); let res = responder.respond(input * input); assert!(res.is_ok()); } }); assert!(!tx.is_closed()); assert_eq!(tx.send_receive(8).await, Ok(64)); assert!(!tx.is_closed()); assert_eq!(tx.send_receive(3).await, Ok(9)); assert!(!tx.is_closed()); assert_eq!(tx.send_receive(1).await, Ok(1)); assert!(!tx.is_closed()); } #[tokio::test] async fn req_receiver_into_inner() { let (tx, rx) = bmrng::channel::(1); let stream = RequestReceiverStream::new(rx); let mut rx = stream.into_inner(); tokio::spawn(async move { let (input, responder) = rx.recv().await.expect("Unexpected err"); assert!(!responder.is_closed()); let res = responder.respond(input * input); assert!(res.is_ok()); }); assert!(!tx.is_closed()); let response = tx.send_receive(8).await; assert!(tx.is_closed()); assert_eq!(response, Ok(64)); } #[tokio::test] async fn req_unbounded_receiver_into_inner() { let (tx, rx) = bmrng::unbounded_channel::(); let stream = UnboundedRequestReceiverStream::new(rx); let mut rx = stream.into_inner(); tokio::spawn(async move { let (input, responder) = rx.recv().await.expect("Unexpected err"); assert!(!responder.is_closed()); let res = responder.respond(input * input); assert!(res.is_ok()); }); assert!(!tx.is_closed()); let response = tx.send_receive(8).await; assert!(tx.is_closed()); assert_eq!(response, Ok(64)); }