use tokio::task; use unsync::broadcast; #[cfg(not(miri))] const SIZE: u32 = 100_000; #[cfg(miri)] const SIZE: u32 = 10; #[tokio::test] async fn test_broadcast() -> Result<(), Box> { let local = task::LocalSet::new(); let mut tx = broadcast::channel(2); let (receivers, b) = local .run_until(async move { let mut receivers = Vec::new(); for _ in 0..16 { let mut rx = tx.subscribe(); let a = task::spawn_local(async move { let mut out = Vec::new(); while let Some(value) = rx.recv().await { out.push(value); if value % 3 == 0 { task::yield_now().await; } } out }); receivers.push(a); } let b = task::spawn_local(async move { for n in 0..SIZE { let _ = tx.send(n).await; if n % 5 == 0 { task::yield_now().await; } } }); let mut received = Vec::new(); for receiver in receivers { received.push(receiver.await.unwrap()); } (received, b.await) }) .await; b?; let expected = (0..SIZE).collect::>(); for actual in receivers { assert_eq!(actual, expected); } Ok(()) } #[tokio::test] async fn test_broadcast_receiver_drop() { let mut tx = broadcast::channel(1); let mut sub1 = tx.subscribe(); let mut sub2 = tx.subscribe(); let (result, s1, s2) = tokio::join!(tx.send(1), sub1.recv(), sub2.recv()); drop(sub2); assert_eq!(result, 2); assert_eq!(s1, Some(1)); assert_eq!(s2, Some(1)); let (result, s1) = tokio::join!(tx.send(2), sub1.recv()); assert_eq!(result, 1); assert_eq!(s1, Some(2)); drop(sub1); let (send,) = tokio::join!(tx.send(2)); assert_eq!(send, 0); }