use async_priority_channel::bounded; #[tokio::test] async fn capacity() { for i in 1..10 { let (s, r) = bounded::<(), i32>(i); assert_eq!(s.capacity(), Some(i)); assert_eq!(r.capacity(), Some(i)); } } #[tokio::test] async fn len_empty_full() { let (s, r) = bounded(2); assert_eq!(s.len(), 0); assert_eq!(s.is_empty(), true); assert_eq!(s.is_full(), false); assert_eq!(r.len(), 0); assert_eq!(r.is_empty(), true); assert_eq!(r.is_full(), false); s.send((), 0).await.unwrap(); assert_eq!(s.len(), 1); assert_eq!(s.is_empty(), false); assert_eq!(s.is_full(), false); assert_eq!(r.len(), 1); assert_eq!(r.is_empty(), false); assert_eq!(r.is_full(), false); s.send((), 0).await.unwrap(); assert_eq!(s.len(), 2); assert_eq!(s.is_empty(), false); assert_eq!(s.is_full(), true); assert_eq!(r.len(), 2); assert_eq!(r.is_empty(), false); assert_eq!(r.is_full(), true); r.recv().await.unwrap(); assert_eq!(s.len(), 1); assert_eq!(s.is_empty(), false); assert_eq!(s.is_full(), false); assert_eq!(r.len(), 1); assert_eq!(r.is_empty(), false); assert_eq!(r.is_full(), false); } #[test] fn receiver_count() { let (s, r) = bounded::<(), i32>(5); let receiver_clones: Vec<_> = (0..20).map(|_| r.clone()).collect(); assert_eq!(s.receiver_count(), 21); assert_eq!(r.receiver_count(), 21); drop(receiver_clones); assert_eq!(s.receiver_count(), 1); assert_eq!(r.receiver_count(), 1); } #[test] fn sender_count() { let (s, r) = bounded::<(), i32>(5); let sender_clones: Vec<_> = (0..20).map(|_| s.clone()).collect(); assert_eq!(s.sender_count(), 21); assert_eq!(r.sender_count(), 21); drop(sender_clones); assert_eq!(s.receiver_count(), 1); assert_eq!(r.receiver_count(), 1); } #[tokio::test(flavor = "multi_thread")] async fn test_send_recv_1() { let (tx, rx) = bounded(1); tx.send(1, 1).await.unwrap(); assert_eq!(rx.recv().await.unwrap(), (1, 1)); } #[tokio::test(flavor = "multi_thread")] async fn test_send_recv_2() { let (tx, rx) = bounded(3); tx.send(1, 1).await.unwrap(); tx.send(3, 3).await.unwrap(); tx.send(2, 2).await.unwrap(); assert_eq!(rx.recv().await.unwrap(), (3, 3)); assert_eq!(rx.recv().await.unwrap(), (2, 2)); assert_eq!(rx.recv().await.unwrap(), (1, 1)); } #[tokio::test(flavor = "multi_thread")] async fn test_send_recv_close_1() { let (tx, rx) = bounded(3); tx.send(1, 1).await.unwrap(); tx.send(3, 3).await.unwrap(); tx.send(2, 2).await.unwrap(); tx.close(); tx.send(4, 4).await.unwrap_err(); assert_eq!(rx.recv().await.unwrap(), (3, 3)); assert_eq!(rx.recv().await.unwrap(), (2, 2)); assert_eq!(rx.recv().await.unwrap(), (1, 1)); rx.recv().await.unwrap_err(); } #[tokio::test(flavor = "multi_thread")] async fn test_send_recv_close_2() { let (tx, rx) = bounded(2); rx.close(); tx.send(4, 4).await.unwrap_err(); rx.recv().await.unwrap_err(); } #[tokio::test(flavor = "multi_thread")] async fn bconcurrent_1() { let n: i32 = 1000; let (tx, rx) = bounded(10); tokio::spawn(async move { for i in 0..n { tx.send(i, i).await.unwrap(); } }); let mut v = Vec::new(); for _ in 0..n { let r = rx.recv().await.unwrap(); v.push(r.0); } v.sort(); let expected: Vec = (0..n).collect(); assert_eq!(v, expected); } #[tokio::test(flavor = "multi_thread")] async fn bconcurrent_2() { let n: i32 = 500; let m: i32 = 10; let (tx, rx) = bounded(10); for j in 0..m { let tx = tx.clone(); tokio::spawn(async move { for i in 0..n { let priority = j * n + i; tx.send((), priority).await.unwrap(); } }); } let mut v = Vec::new(); for _ in 0..n * m { let r = rx.recv().await.unwrap(); v.push(r.1); } v.sort(); let expected: Vec = (0..n * m).collect(); assert_eq!(v, expected); } #[tokio::test(flavor = "multi_thread")] async fn bconcurrent_3() { let n: i32 = 500; let m: i32 = 10; let (tx, rx) = bounded(10); for j in 0..m { let tx = tx.clone(); tokio::spawn(async move { for i in 0..n { tx.send((), j * n + i).await.unwrap(); } }); } let mut collected = Vec::new(); for _ in 0..m { let tx = rx.clone(); let (result_tx, result_rx) = tokio::sync::oneshot::channel(); let mut v = Vec::new(); tokio::spawn(async move { for _ in 0..n { v.push(tx.recv().await.unwrap().1); } result_tx.send(v).unwrap(); }); collected.push(result_rx); } let mut v = Vec::new(); for item in collected { v.extend(item.await.unwrap()); } v.sort(); let expected: Vec = (0..n * m).collect(); assert_eq!(v, expected); } #[tokio::test(flavor = "multi_thread")] async fn bclose_1() { let (tx, rx) = bounded::<(), i32>(10); let mut jh = Vec::new(); for _ in 0..10 { let rx = rx.clone(); let thread = tokio::spawn(async move { rx.recv().await.unwrap_err(); }); jh.push(thread); } jh.push(tokio::spawn(async move { tx.close(); })); for thread in jh { thread.await.unwrap(); } } #[test] fn sendv_1() { let cap = 10; let (tx, rx) = bounded::<&str, i32>(cap); let v = vec![("a", 0), ("b", 1)]; tx.try_sendv(v.into_iter().peekable()).unwrap(); assert_eq!(rx.try_recv().unwrap(), ("b", 1)); assert_eq!(rx.try_recv().unwrap(), ("a", 0)); let n = 100; let v = vec![("a", 0); n as usize]; let err = tx.try_sendv(v.into_iter().peekable()).unwrap_err(); assert_eq!(err.into_inner().count() as u64, n - cap); } #[test] fn sendv_2() { let cap = 10; let (tx, _rx) = bounded::<&str, i32>(cap); tx.close(); assert!(tx.is_closed()); let err = tx .try_sendv(vec![("a", 0), ("b", 1)].into_iter().peekable()) .unwrap_err(); assert_eq!(err.into_inner().count(), 2); } #[test] fn sendv_3() { let (tx, _rx) = bounded::<(), i32>(2); tx.try_send((), 0).unwrap(); tx.try_send((), 0).unwrap(); }