use std::{ cmp::max, collections::BinaryHeap, convert::TryFrom, net::SocketAddr, time::{Duration, Instant}, }; use log::{error, warn}; use rand::{distributions::Bernoulli, prelude::*}; use rand_distr::Normal; use srt_protocol::{ connection::{Connection, ConnectionSettings, DuplexConnection, Input}, options::*, packet::*, protocol::handshake::Handshake, }; struct ScheduledInput(Instant, Input); impl PartialEq for ScheduledInput { fn eq(&self, other: &Self) -> bool { self.0 == other.0 } } impl Eq for ScheduledInput {} impl PartialOrd for ScheduledInput { fn partial_cmp(&self, other: &Self) -> Option { Some(self.cmp(other)) } } impl Ord for ScheduledInput { fn cmp(&self, other: &Self) -> std::cmp::Ordering { self.0.cmp(&other.0).reverse() // reverse to make it a min-heap } } pub fn input_data_simulation( start: Instant, count: usize, pace: Duration, peer: &mut PeerSimulator, ) { let count = u32::try_from(count).unwrap(); for i in 1..=count { let t = start + pace * i; peer.schedule_input(t, Input::Data(Some((t, i.to_string().into())))); } peer.schedule_input(start + pace * (count + 1), Input::Data(None)); } pub struct PeerSimulator { addr: SocketAddr, input: BinaryHeap, } impl PeerSimulator { pub fn new(addr: SocketAddr) -> PeerSimulator { PeerSimulator { addr, input: BinaryHeap::new(), } } pub fn addr(&self) -> SocketAddr { self.addr } pub fn schedule_input(&mut self, release_at: Instant, input: Input) { self.input.push(ScheduledInput(release_at, input)); } pub fn select_next_input(&mut self, now: Instant, next_timer: Instant) -> (Instant, Input) { if self.has_scheduled_input(next_timer) { self.input.pop().map(|i| (max(now, i.0), i.1)).unwrap() } else { (next_timer, Input::Timer) } } fn has_scheduled_input(&self, now: Instant) -> bool { self.input .peek() .map(|i| i.0) .filter(|t| *t <= now) .is_some() } } pub struct NetworkSimulator { pub sender: PeerSimulator, pub receiver: PeerSimulator, } impl NetworkSimulator { pub fn new(sender_addr: SocketAddr, receiver_addr: SocketAddr) -> NetworkSimulator { NetworkSimulator { sender: PeerSimulator::new(sender_addr), receiver: PeerSimulator::new(receiver_addr), } } pub fn send(&mut self, release_at: Instant, (packet, to): (Packet, SocketAddr)) { if to == self.sender.addr() { self.sender.schedule_input( release_at, Input::Packet(Ok((packet, self.receiver.addr()))), ); } else if to == self.receiver.addr() { self.receiver .schedule_input(release_at, Input::Packet(Ok((packet, self.sender.addr())))); } else { error!("Dropping {:?}", packet) } } pub fn send_lossy( &mut self, sim: &mut RandomLossSimulation, now: Instant, packet: (Packet, SocketAddr), ) { self.send( match sim.next_packet_schedule(now) { Some(time) => time, None => { warn!("Dropping {:?} to {}", packet.0, packet.1); return; } }, packet, ) } } pub struct RandomLossSimulation { pub rng: StdRng, pub delay_dist: Normal, pub drop_dist: Bernoulli, } impl RandomLossSimulation { pub fn build( &mut self, start: Instant, latency: Duration, recv_buffer_size: PacketCount, ) -> (NetworkSimulator, DuplexConnection, DuplexConnection) { let sender = self.new_connection_settings(start, latency); let receiver = ConnectionSettings { remote: (sender.remote.ip(), sender.remote.port().wrapping_add(1)).into(), remote_sockid: sender.local_sockid, local_sockid: sender.remote_sockid, init_seq_num: sender.init_seq_num, recv_buffer_size, ..sender.clone() }; let network = NetworkSimulator::new(receiver.remote, sender.remote); let sender = DuplexConnection::new(Connection { settings: sender, handshake: Handshake::Connector, }); let receiver = DuplexConnection::new(Connection { settings: receiver, handshake: Handshake::Connector, }); (network, sender, receiver) } pub fn next_packet_schedule(&mut self, now: Instant) -> Option { if !self.drop_dist.sample(&mut self.rng) { Some(now + Duration::from_secs_f64(self.delay_dist.sample(&mut self.rng).abs())) } else { None } } fn new_connection_settings(&mut self, start: Instant, latency: Duration) -> ConnectionSettings { ConnectionSettings { remote: ([127, 0, 0, 1], self.rng.gen()).into(), remote_sockid: self.rng.gen(), local_sockid: self.rng.gen(), socket_start_time: start, rtt: Duration::default(), init_seq_num: self.rng.gen(), max_packet_size: PacketSize(1316), max_flow_size: PacketCount(8192), send_tsbpd_latency: latency, recv_tsbpd_latency: latency, cipher: None, stream_id: None, bandwidth: Default::default(), recv_buffer_size: PacketCount(8192), send_buffer_size: PacketCount(8192), statistics_interval: Duration::from_secs(1), peer_idle_timeout: Duration::from_secs(5), too_late_packet_drop: true, } } }