use std::time::Duration; use std::sync::Arc; use futures::future; use holes::error::SendError; use tokio::time; use holes::bus::Slot; use holes::mpsc::{Link, Rx, Tx}; mod utils; #[tokio::test] async fn t_01() { type Item = utils::Item; let counter = utils::Counter::new(); { let slots = slots::<3, Item>(); let link = Link::::new(&slots); let tx = Tx::::new(&link); let mut rx = Rx::::new(&link); assert!(time::timeout(Duration::from_millis(10), rx.recv()) .await .is_err()); assert!(tx.send(counter.add(1)).is_ok()); assert!(tx.send(counter.add(2)).is_ok()); assert!(tx.send(counter.add(3)).is_err()); assert_eq!(rx.recv().await.unwrap(), 1); assert!(tx.send(counter.add(4)).is_ok()); assert!(tx.send(counter.add(5)).is_err()); assert_eq!(rx.recv().await.unwrap(), 2); assert_eq!(rx.recv().await.unwrap(), 4); assert!(time::timeout(Duration::from_millis(10), rx.recv()) .await .is_err()); } assert_eq!(counter.count(), 0); } #[tokio::test] async fn t_02() { type Item = utils::Item; let counter = utils::Counter::new(); { let slots = slots::<3, Item>(); let link = Link::::new(&slots); let tx = Tx::::new(&link); let mut rx = Rx::::new(&link); assert!(time::timeout(Duration::from_millis(10), rx.recv()) .await .is_err()); assert!(tx.send(counter.add(1)).is_ok()); assert!(tx.send(counter.add(2)).is_ok()); assert!(tx.send(counter.add(3)).is_err()); assert_eq!(rx.recv().await.unwrap(), 1); assert!(tx.send(counter.add(4)).is_ok()); assert!(tx.send(counter.add(5)).is_err()); } assert_eq!(counter.count(), 0); } #[tokio::test] async fn t_03() { const BUF_SIZE: usize = 128; const ITERATIONS: usize = 1_000_000; const PRODUCERS: usize = 4; type Item = utils::Item; let counter = utils::Counter::new(); { let slots = slots::(); let link = Link::::new(&slots); let mut rx = Rx::::new(&link); let tx = Tx::::new(&link); let producers = (0..PRODUCERS).map({ |_| { let counter = counter.clone(); let tx = tx.clone(); async move { let t0 = std::time::Instant::now(); for i in 0..ITERATIONS { let mut value = counter.add(i); loop { match tx.send(value) { Err(SendError::Full(rejected)) => { value = rejected; tokio::task::yield_now().await; } Ok(()) => break, Err(other) => panic!("tx-send: {:?}", other), } } } t0.elapsed() } } }); let producers = { let tx = &tx; async move { let dts = future::join_all(producers).await; tx.close(); dts } }; let consumer = async { let mut count = 0; let t0 = std::time::Instant::now(); while let Some(_) = rx.recv().await { count += 1; } let dt = t0.elapsed(); (count, dt) }; let (producer_dts, (consumer_count, consumer_dt)) = future::join(producers, consumer).await; assert_eq!(consumer_count, ITERATIONS * PRODUCERS); eprintln!("consumer-dt: {:?}", consumer_dt); eprintln!("producer-dts: {:#?}", producer_dts); } assert_eq!(counter.count(), 0); } #[tokio::test] async fn t_04() { const BUF_SIZE: usize = 128; const ITERATIONS: usize = 1_000_000; const PRODUCERS: usize = 4; type Item = utils::Item; let counter = utils::Counter::new(); { let slots = slots::(); let link = Arc::new(Link::::new(slots)); let mut rx = Rx::::new(Arc::clone(&link)); let tx = Tx::::new(Arc::clone(&link)); let producers = (0..PRODUCERS).map({ |_| { let counter = counter.clone(); let tx = tx.clone(); tokio::spawn(async move { let t0 = std::time::Instant::now(); for i in 0..ITERATIONS { let mut value = counter.add(i); loop { match tx.send(value) { Err(SendError::Full(rejected)) => { value = rejected; tokio::task::yield_now().await; } Ok(()) => break, Err(other) => panic!("tx-send: {:?}", other), } } } t0.elapsed() }) } }); let producers = { let tx = &tx; async move { let dts = future::join_all(producers).await; tx.close(); dts } }; let consumer = tokio::spawn(async move { let mut count = 0; let t0 = std::time::Instant::now(); while let Some(_) = rx.recv().await { count += 1; } let dt = t0.elapsed(); (count, dt) }); let producer_dts = producers .await .into_iter() .collect::, _>>() .expect("producers-join"); let (consumer_count, consumer_dt) = consumer.await.expect("consumer-join"); assert_eq!(consumer_count, ITERATIONS * PRODUCERS); eprintln!("consumer-dt: {:?}", consumer_dt); eprintln!("producer-dts: {:#?}", producer_dts); } assert_eq!(counter.count(), 0); } fn slots() -> [Slot; SIZE] { std::array::from_fn(|_| Default::default()) }