// lossy tests based on protocol to be fully deterministic use std::cmp::min; use std::collections::VecDeque; use std::{ str, time::{Duration, Instant}, }; use log::{info, trace}; use rand::{distributions::Bernoulli, prelude::StdRng, SeedableRng}; use rand_distr::Normal; use srt_protocol::{connection::Input, options::*}; pub mod simulator; use simulator::*; #[test] fn lossy_deterministic() { let _ = pretty_env_logger::try_init(); let once_failing_seeds = [ (13858442656353620955, 10_000), (3330590297113083014, 10_000), (11174431011217123256, 10_000), (7843866891970470107, 10_000), (940980453060602806, 10_000), (10550053401338237831, 10_000), (9602806002654919948, 10_000), (11134687271549837280, 10_000), (10210281456068034833, 10_000), ]; for &(s, size) in &once_failing_seeds { do_lossy_test(s, size); } for _ in 0..10 { let seed = rand::random(); do_lossy_test(seed, 10_000); } } fn do_lossy_test(seed: u64, count: usize) { info!("Seed is: {}, count is: {}", seed, count); const PACKET_SPACING: Duration = Duration::from_millis(1); const DROP_RATE: f64 = 0.06; let delay_mean = Duration::from_secs_f64(20e-3); let delay_stdev = Duration::from_secs_f64(4e-3); let start = Instant::now(); let mut simulation = RandomLossSimulation { rng: StdRng::seed_from_u64(seed), delay_dist: Normal::new(delay_mean.as_secs_f64(), delay_stdev.as_secs_f64()).unwrap(), drop_dist: Bernoulli::new(DROP_RATE).unwrap(), }; let (mut network, mut sender, mut receiver) = simulation.build(start, Duration::from_secs(1), PacketCount(8192)); input_data_simulation(start, count, PACKET_SPACING, &mut network.sender); let mut now = start; let mut next_data = 0i32; let mut dropped = 0i32; let mut received = 0i32; loop { let sender_next_time = if sender.is_open() { assert_eq!(sender.next_data(now), None); while let Some(packet) = sender.next_packet(now) { match simulation.next_packet_schedule(now) { Some(release_at) => network.send(release_at, packet), None => trace!("Dropping {:?}", packet), } } let next_timer = sender.check_timers(now); let (next_time, input) = network.sender.select_next_input(now, next_timer); match input { Input::Data(data) => sender.handle_data_input(next_time, data), Input::Packet(packet) => sender.handle_packet_input(next_time, packet), _ => {} }; Some(next_time) } else { None }; let receiver_next_time = if receiver.is_open() { while let Some((ts, payload)) = receiver.next_data(now) { let diff_ms = (now - ts).as_millis(); assert!( 700 < diff_ms && diff_ms < 1300, "Latency not in tolerance zone: {diff_ms}ms" ); let actual: i32 = str::from_utf8(&payload[..]).unwrap().parse().unwrap(); dropped += actual - next_data; next_data = actual + 1; received += 1; } while let Some(packet) = receiver.next_packet(now) { match simulation.next_packet_schedule(now) { Some(release_at) => network.send(release_at, packet), None => trace!("Dropping {:?}", packet), } } let next_timer = receiver.check_timers(now); let (next_time, input) = network.receiver.select_next_input(now, next_timer); match input { Input::Data(data) => receiver.handle_data_input(now, data), Input::Packet(packet) => receiver.handle_packet_input(now, packet), _ => {} }; Some(next_time) } else { None }; let next_time = match (sender_next_time, receiver_next_time) { (Some(s), Some(r)) => min(s, r), (Some(s), None) => s, (None, Some(r)) => r, _ => break, }; let delta = next_time - now; trace!("Delta = {:?}", delta); now = next_time; } info!("Received: {}", received); assert_ne!(received, 0); assert!(dropped < 15, "Expected less than 15 drops, got {dropped}"); } #[test] fn high_bandwidth_deterministic() { let _ = pretty_env_logger::try_init(); // once failing do_high_bandwidth_deterministic(13087270514753106960, 100_000); for _ in 0..5 { do_high_bandwidth_deterministic(rand::random(), 100_000); } } fn do_high_bandwidth_deterministic(seed: u64, count: usize) { println!("Seed is: {seed}, count is: {count}"); let start = Instant::now(); let delay_mean = Duration::from_millis(10); let delay_stdev = Duration::from_millis(1); let drop_rate = 0.005; let bandwidth = DataRate(50_000_000); // bytes/second let bandwidth_mbps = bandwidth.as_mbps_f64(); // MB/second let packet_size = PacketSize(1316); // bytes/packet. Note that packets are not actually this large, but packet size does not affect non-realtime tests like this let packet_spacing = PacketPeriod::try_from(bandwidth, packet_size).unwrap(); // s/packet let latency = Duration::from_secs(1); let latency_packet_count: PacketCount = PacketCount::for_time_window(latency, packet_spacing); // double to be safe let recv_buffer_size = (latency_packet_count * packet_size) * 2; let mut simulation = RandomLossSimulation { rng: StdRng::seed_from_u64(seed), delay_dist: Normal::new(delay_mean.as_secs_f64(), delay_stdev.as_secs_f64()).unwrap(), drop_dist: Bernoulli::new(drop_rate).unwrap(), }; let (mut network, mut sender, mut receiver) = simulation.build(start, latency, recv_buffer_size / packet_size); input_data_simulation(start, count, packet_spacing, &mut network.sender); let mut now = start; let window_size = Duration::from_secs(1); let startup_packets = 50_000; let mut window = VecDeque::new(); let mut bytes_received = 0; let mut packets_received = 0; let mut last_packet = None; loop { let sender_next_time = if sender.is_open() { assert_eq!(sender.next_data(now), None); while let Some(packet) = sender.next_packet(now) { match simulation.next_packet_schedule(now) { Some(release_at) => network.send(release_at, packet), None => trace!("Dropping {:?}", packet), } } let next_timer = sender.check_timers(now); let (next_time, input) = network.sender.select_next_input(now, next_timer); match input { Input::Data(data) => sender.handle_data_input(next_time, data), Input::Packet(packet) => sender.handle_packet_input(next_time, packet), _ => {} }; Some(next_time) } else { None }; let receiver_next_time = if receiver.is_open() { while let Some((ts, payload)) = receiver.next_data(now) { bytes_received += packet_size.0; window.push_back((ts, packet_size.0)); packets_received += 1; while let Some((a, bytes)) = window.front() { if ts - *a > window_size { bytes_received -= *bytes; window.pop_front(); } else { break; } } let rate_mbps = bytes_received as f64 / 1024. / 1024.; if packets_received >= startup_packets { assert!( rate_mbps > bandwidth_mbps * 0.9, "Rate was {}, expected at least {}", rate_mbps, bandwidth_mbps * 0.9 ); assert!( rate_mbps < bandwidth_mbps * 1.1, "Rate was {}, expected less than {}", rate_mbps, bandwidth_mbps * 1.1 ); } // make sure no loss let actual: i32 = str::from_utf8(&payload[..]).unwrap().parse().unwrap(); if let Some(last_packet) = last_packet { assert_eq!(last_packet + 1, actual); } last_packet = Some(actual); // print!("Received {:10.3}MB/s\r", rate_mbps); } while let Some(packet) = receiver.next_packet(now) { match simulation.next_packet_schedule(now) { Some(release_at) => network.send(release_at, packet), None => trace!("Dropping {:?}", packet), } } let next_timer = receiver.check_timers(now); let (next_time, input) = network.receiver.select_next_input(now, next_timer); match input { Input::Data(data) => receiver.handle_data_input(now, data), Input::Packet(packet) => receiver.handle_packet_input(now, packet), _ => {} }; Some(next_time) } else { None }; let next_time = match (sender_next_time, receiver_next_time) { (Some(s), Some(r)) => min(s, r), (Some(s), None) => s, (None, Some(r)) => r, _ => break, }; let delta = next_time - now; trace!("Delta = {:?}", delta); now = next_time; } assert!( packets_received > count * 2 / 3, "Expected at least {} packets, got {}", count * 2 / 3, packets_received, ); }