use bytes::Bytes; use chokepoint::{ ChokeSettings, ChokeSettingsOrder, ChokeStream, }; use futures::stream::StreamExt; use std::time::Duration; use tokio::sync::mpsc; use tokio_stream::wrappers::UnboundedReceiverStream; #[tokio::test] async fn delivery_without_modifications() { let (tx, rx) = mpsc::unbounded_channel(); let traffic_shaper = ChokeStream::new(Box::new(UnboundedReceiverStream::new(rx)), Default::default()); tokio::spawn(async move { for i in 0..10usize { tx.send(Bytes::from(i.to_le_bytes().to_vec())).unwrap(); } }); let output = traffic_shaper .map(|packet| usize::from_le_bytes(packet[0..8].try_into().unwrap())) .collect::>() .await; assert_eq!(output, (0..10).collect::>()); } #[yare::parameterized( unordered = { ChokeSettingsOrder::Unordered, vec![2, 3, 1] }, ordered = { ChokeSettingsOrder::Ordered, vec![1, 2, 3] }, // backpressure = { ChokeSettingsOrder::Backpressure, vec![1, 2, 3] } )] #[test_macro(tokio::test)] async fn ordering(ordering: ChokeSettingsOrder, expected: Vec) { let (tx, rx) = mpsc::unbounded_channel(); let stream = ChokeStream::new( Box::new(UnboundedReceiverStream::new(rx)), ChokeSettings::default() .set_ordering(Some(ordering)) .set_latency_distribution(Some({ let mut n = 0; move || { n += 1; match n { 1 => Some(Duration::from_millis(150)), 2 => Some(Duration::from_millis(50)), 3 => Some(Duration::from_millis(100)), _ => None, } } })), ); tx.send(Bytes::from(1usize.to_le_bytes().to_vec())).unwrap(); tx.send(Bytes::from(2usize.to_le_bytes().to_vec())).unwrap(); tx.send(Bytes::from(3usize.to_le_bytes().to_vec())).unwrap(); drop(tx); let output = stream .map(|packet| usize::from_le_bytes(packet[0..8].try_into().unwrap())) .collect::>() .await; assert_eq!(output, expected); }