pub use rstest::rstest; pub use rstest_reuse::{self, *}; // attributes template for the tests #[template] #[rstest] #[case::data_len_16(16)] #[case::data_len_32(32)] #[case::data_len_64(64)] #[case::data_len_256(256)] #[case::data_len_512(512)] #[case::data_len_1024(1024)] #[case::data_len_4096(4096)] #[tokio::test] pub async fn data_transfer_template( #[case] data_len: usize, ) {} #[cfg(test)] mod channel { use cs_utils::{random_number, random_str}; use tokio::io::{AsyncWriteExt, AsyncReadExt, ReadHalf, WriteHalf}; use tokio::try_join; use connection_utils::Channel; pub use super::*; fn create_data_to_transfer( messages_count: usize, data_len: usize, ) -> Vec { let mut test_data = vec![]; for _ in 0..messages_count { test_data.push(random_str(data_len)); } return test_data; } pub async fn test_channel_data_transfer( channel: (ReadHalf>, WriteHalf>), data_to_send: Vec, data_to_receive: Vec, data_len: usize, ) -> Result<(), Box> { let (mut source, mut sink) = channel; try_join!( // sending channel tokio::spawn(async move { let mut iter = data_to_send.iter(); while let Some(message) = iter.next() { let data = message.as_bytes(); sink .write(&data[..]) .await .expect("Cannot send message."); } }), // receiving channel tokio::spawn(async move { let mut received_messages: Vec = vec![]; // buffer is at least `data_len` plus, maybe, a little bit more let mut buf = vec![0; data_len + random_number(0..=100)]; loop { let bytes_read = { source .read(&mut buf) .await .expect("Cannot read message.") }; let message_string = std::str::from_utf8(&buf[..bytes_read]) .expect("Cannot parse message UTF8 string.") .to_string(); received_messages.push(message_string); let received_string = received_messages.join(""); let test_data_string = data_to_receive.join(""); assert!( received_string.len() <= test_data_string.len(), "Received string is larger than original." ); if received_string.len() == test_data_string.len() { assert_eq!( received_string, test_data_string, "Received and test strings must be equal.", ); return; } } }), ).unwrap(); return Ok(()); } mod unidirectional { use futures::future::try_join_all; use tokio::io::split; use webrtc_connection::utils::test_helpers::create_channels; pub use super::*; #[apply(data_transfer_template)] async fn must_send_data_thru_channels_forward( #[case] data_len: usize, ) { multiple_channels_data_transfer_unidirectional( data_len, || { return true; }, ).await; } #[apply(data_transfer_template)] async fn must_send_data_thru_channels_backward( #[case] data_len: usize, ) { multiple_channels_data_transfer_unidirectional( data_len, || { return false; }, ).await; } #[apply(data_transfer_template)] async fn must_send_data_thru_channels_random( #[case] data_len: usize, ) { use cs_utils::random_bool; multiple_channels_data_transfer_unidirectional( data_len, || { return random_bool(); }, ).await; } // -- Helpers /// Function to test, potentially multiple, simultaneous channels data transfer /// in a single direction, defined by the `is_forward_direction()` closure. pub async fn multiple_channels_data_transfer_unidirectional( data_len: usize, is_forward_direction: fn() -> bool, ) { let channels = create_channels( random_number(1..10), ).await; let mut promises = vec![]; for (channel1, channel2) in channels { let (source1, sink1) = split(channel1); let (source2, sink2) = split(channel2); let test_data = create_data_to_transfer(random_number(5..15), data_len); let channel = if is_forward_direction() { (source2, sink1) } else { (source1, sink2) }; promises.push( test_channel_data_transfer( channel, test_data.clone(), test_data.clone(), data_len, ), ); } try_join_all(promises) .await .unwrap(); } } // -- Bidirectional data transfer tests. mod bidirectional { use futures::future::try_join_all; use tokio::io::split; use webrtc_connection::utils::test_helpers::create_channels; pub use super::*; #[apply(data_transfer_template)] async fn must_send_data_thru_channels_forward( #[case] data_len: usize, ) { multiple_channels_data_transfer_bidirectional( data_len, || { return true; }, ).await; } #[apply(data_transfer_template)] async fn must_send_data_thru_channels_backward( #[case] data_len: usize, ) { multiple_channels_data_transfer_bidirectional( data_len, || { return false; }, ).await; } #[apply(data_transfer_template)] async fn must_send_data_thru_channels_random( #[case] data_len: usize, ) { use cs_utils::random_bool; multiple_channels_data_transfer_bidirectional( data_len, || { return random_bool(); }, ).await; } // -- Helpers. /// Function that tests data transfer between 2 channels in a both directions at the same time. async fn test_channel_pair_data_transfer_bidirectional( data_len: usize, messages_count: usize, channel1: Box, channel2: Box, ) -> Result<(), Box> { let test_data1 = create_data_to_transfer(messages_count, data_len); let test_data2 = create_data_to_transfer(messages_count, data_len); try_join!( test_channel_data_transfer(split(channel1), test_data1.clone(), test_data2.clone(), data_len), test_channel_data_transfer(split(channel2), test_data2.clone(), test_data1.clone(), data_len), ).unwrap(); return Ok(()); } /// Function to test, potentially multiple, simultaneous channels data transfer /// in both directions at the same time. The `is_forward_direction()` should not /// matter much in this case. async fn multiple_channels_data_transfer_bidirectional( data_len: usize, is_forward_direction: fn() -> bool, ) { let channels = create_channels( random_number(1..10), ).await; let mut promises = vec![]; for (channel1, channel2) in channels { if is_forward_direction() { promises.push( test_channel_pair_data_transfer_bidirectional( data_len, random_number(5..15), channel1, channel2, ), ); continue; } promises.push( test_channel_pair_data_transfer_bidirectional( data_len, random_number(5..15), channel2, channel1, ), ); } try_join_all(promises) .await .unwrap(); } } }