use futures::future; use tokio::sync::mpsc::{self, error::TrySendError}; mod utils; #[tokio::test] async fn t_04() { const BUF_SIZE: usize = 128; const ITERATIONS: usize = 1_000_000; const PRODUCERS: usize = 4; let counter = utils::Counter::new(); { let (tx, mut rx) = mpsc::channel(BUF_SIZE); let producers = (0..PRODUCERS).map({ |_| { let counter = counter.clone(); let tx = tx.clone(); tokio::spawn(async move { let t0 = std::time::Instant::now(); for i in 0..ITERATIONS { let mut value = counter.add(i); loop { match tx.try_send(value) { Err(TrySendError::Full(rejected)) => { value = rejected; tokio::task::yield_now().await; } Ok(()) => break, Err(other) => panic!("tx-send: {:?}", other), } } } t0.elapsed() }) } }); let consumer = tokio::spawn(async move { let mut count = 0; let t0 = std::time::Instant::now(); while let Some(_) = rx.recv().await { count += 1; } let dt = t0.elapsed(); (count, dt) }); let producers = future::join_all(producers).await; std::mem::drop(tx); let producer_dts = producers .into_iter() .collect::, _>>() .expect("producers-join"); let (consumer_count, consumer_dt) = consumer.await.expect("consumer-join"); assert_eq!(consumer_count, ITERATIONS * PRODUCERS); eprintln!("consumer-dt: {:?}", consumer_dt); eprintln!("producer-dts: {:#?}", producer_dts); } assert_eq!(counter.count(), 0); }