use std::time::Duration; use std::sync::Arc; use futures::future; use tokio::time; use holes::bus::Slot; use holes::spsc::{Link, Rx, Tx}; mod utils; #[tokio::test] async fn t_01() { type Item = utils::Item; let counter = utils::Counter::new(); { let slots = slots::<3, Item>(); let link = Link::::new(&slots); let mut tx = Tx::::new(&link); let mut rx = Rx::::new(&link); assert!(time::timeout(Duration::from_millis(10), rx.recv()) .await .is_err()); assert!(tx.send(counter.add(1)).await.is_ok()); assert!(tx.send(counter.add(2)).await.is_ok()); assert!( time::timeout(Duration::from_millis(10), tx.send(counter.add(3))) .await .is_err() ); assert_eq!(rx.recv().await.unwrap(), 1); assert!(tx.send(counter.add(4)).await.is_ok()); assert!( time::timeout(Duration::from_millis(10), tx.send(counter.add(5))) .await .is_err() ); assert_eq!(rx.recv().await.unwrap(), 2); assert_eq!(rx.recv().await.unwrap(), 4); assert!(time::timeout(Duration::from_millis(10), rx.recv()) .await .is_err()); } assert_eq!(counter.count(), 0); } #[tokio::test] async fn t_02() { type Item = utils::Item; let counter = utils::Counter::new(); { let slots = slots::<3, Item>(); let link = Link::::new(&slots); let mut tx = Tx::::new(&link); let mut rx = Rx::::new(&link); assert!(time::timeout(Duration::from_millis(10), rx.recv()) .await .is_err()); assert!(tx.send(counter.add(1)).await.is_ok()); assert!(tx.send(counter.add(2)).await.is_ok()); assert!( time::timeout(Duration::from_millis(10), tx.send(counter.add(3))) .await .is_err() ); assert_eq!(rx.recv().await.unwrap(), 1); assert!(tx.send(counter.add(4)).await.is_ok()); assert!( time::timeout(Duration::from_millis(10), tx.send(counter.add(5))) .await .is_err() ); assert_eq!(rx.recv().await.unwrap(), 2); // assert_eq!(rx.recv().await.unwrap(), 4); // assert!(time::timeout(Duration::from_millis(10), rx.recv()).await.is_err()); } assert_eq!(counter.count(), 0); } #[tokio::test] async fn t_03() { const BUF_SIZE: usize = 64; const ITERATIONS: usize = 1_000_000; type Item = utils::Item; let counter = utils::Counter::new(); { let slots = slots::(); let link = Link::::new(&slots); let mut tx = Tx::::new(&link); let mut rx = Rx::::new(&link); let producer = async { let t0 = std::time::Instant::now(); for i in 0..ITERATIONS { tx.send(counter.add(i)).await.expect("tx-send"); } let dt = t0.elapsed(); tx.close(); dt }; let consumer = async { let t0 = std::time::Instant::now(); let mut count = 0; while let Some(_) = rx.recv().await { count += 1; } let dt = t0.elapsed(); (count, dt) }; let (producer_dt, (count, consumer_dt)) = future::join(producer, consumer).await; eprintln!("producer: {:?}", producer_dt); eprintln!("consumer: {:?}", consumer_dt); assert_eq!(count, ITERATIONS); } assert_eq!(counter.count(), 0); } #[tokio::test] async fn t_04() { const BUF_SIZE: usize = 64; const ITERATIONS: usize = 1_000_000; type Item = utils::Item; let counter = utils::Counter::new(); { let slots = slots::(); let link = Link::::new(slots); let link = Arc::new(link); let mut tx = Tx::::new(Arc::clone(&link)); let mut rx = Rx::::new(Arc::clone(&link)); let producer = { let counter = counter.clone(); async move { let t0 = std::time::Instant::now(); for i in 0..ITERATIONS { tx.send(counter.add(i)).await.expect("tx-send"); } let dt = t0.elapsed(); tx.close(); dt } }; let consumer = async move { let t0 = std::time::Instant::now(); let mut count = 0; while let Some(_) = rx.recv().await { count += 1; } let dt = t0.elapsed(); (count, dt) }; let consumer = tokio::spawn(consumer); let producer = tokio::spawn(producer); let producer_dt = producer.await.expect("producer join"); let (count, consumer_dt) = consumer.await.expect("consumer join"); eprintln!("producer: {:?}", producer_dt); eprintln!("consumer: {:?}", consumer_dt); assert_eq!(count, ITERATIONS); } assert_eq!(counter.count(), 0); } fn slots() -> [Slot; SIZE] { std::array::from_fn(|_| Default::default()) }