extern crate pickleback; use std::collections::HashMap; use std::collections::HashSet; use std::collections::VecDeque; use log::*; use pickleback::prelude::*; use pickleback::testing::*; /// How many messages to send during soak tests: const NUM_TEST_MSGS: usize = 100000; /// If doing reliable sending over lossy pipe, how many extra ticks to mop up straggling acks: const NUM_EXTRA_ITERATIONS: usize = 100; /// Random payload size is selected to generate up to this many fragments: const MAX_FRAGMENTS: u32 = 10; #[test] fn soak_message_transmission() { init_logger(); let channel = 0; let mut harness = MessageTestHarness::new(JitterPipeConfig::disabled()); // pre-generate test messages of varying sizes, many of which require fragmentation let test_msgs = (0..NUM_TEST_MSGS) .map(|_| random_payload_max_frags(MAX_FRAGMENTS)) .collect::>(); let mut send_msgs_iter = test_msgs.iter(); let mut in_flight_msgs = HashMap::, MessageId>::new(); let mut sent_ids = VecDeque::new(); 'sending: while !test_msgs.is_empty() { // Send up to num_to- NB: check MAX_FRAGMENTS * this is under ack limit. let num_to_send = rand::random::() % 7; let mut num_sent = 0; for _ in 0..num_to_send { let Some(msg) = send_msgs_iter.next() else { if num_sent == 0 { break 'sending; } else { break; } }; num_sent += 1; let msg_id = harness.server.send_message(channel, msg.as_ref()).unwrap(); in_flight_msgs.insert(msg.clone(), msg_id); // info!("{msg_id:?} PAYLOAD(len:{}) = {msg:?}", msg.len()); sent_ids.push_back(msg_id); } harness.advance(0.03); let client_received_messages = harness .client .drain_received_messages(channel) .collect::>(); // acks not guaranteed to be reported in same order they were sent - using hashset. let acks = harness .server .drain_message_acks(channel) .collect::>(); assert_eq!( client_received_messages.len(), num_sent, "didn't receive all the messages sent this tick" ); for recv_msg in client_received_messages.iter() { let rec = recv_msg.payload_to_owned(); if let Some(expected_id) = in_flight_msgs.get(&rec) { assert_eq!(*expected_id, recv_msg.id(), "ids don't match"); } else { panic!("payload not found in sent msgs for {recv_msg:?}, payload: {rec:?}"); } assert!( acks.contains(&sent_ids.pop_front().unwrap()), "ack mismatch" ); } } } #[test] fn soak_reliable_message_transmission_with_terrible_network() { init_logger(); let channel = 1; let mut harness = MessageTestHarness::new(JitterPipeConfig::very_very_bad()); let mut test_msgs = Vec::new(); (0..NUM_TEST_MSGS).for_each(|_| test_msgs.push(random_payload_max_frags(MAX_FRAGMENTS))); let mut unacked_sent_msg_ids = Vec::new(); let mut client_received_messages = Vec::new(); let mut i = 0; while i < NUM_TEST_MSGS + NUM_EXTRA_ITERATIONS { if let Some(msg) = test_msgs.get(i) { let size = msg.len(); match harness.server.send_message(channel, msg.as_ref()) { Ok(msg_id) => { trace!("💌💌 Sending message {i}/{NUM_TEST_MSGS}, size {size}, msg_id: {msg_id:?}"); unacked_sent_msg_ids.push(msg_id); i += 1; } Err(PicklebackError::Backpressure(Backpressure::TooManyPending)) => { warn!("Backpressure"); } Err(e) => panic!("Unhandled send error {e:?}"), } } else { i += 1; } harness.advance(0.051); let acked_ids = harness.collect_server_acks(channel); if !acked_ids.is_empty() { unacked_sent_msg_ids.retain(|id| !acked_ids.contains(id)); } client_received_messages.extend(harness.client.drain_received_messages(channel)); } assert_eq!( Vec::::new(), unacked_sent_msg_ids, "server is missing acks for these messages" ); // with enough extra iterations, resends should have ensured everything was received. assert_eq!(client_received_messages.len(), test_msgs.len()); // Testing the test harness here for good measure. // // Make sure observed loss/dupe/jittered roughly match values configured in JitterPipe let server_sp = harness.server.stats().packets_sent; let client_rp = harness.client.stats().packets_received; let observed_packet_loss = (server_sp - client_rp) as f32 / server_sp as f32; let configured_packet_loss = harness.server_jitter_pipe.config_mut().drop_chance; println!("Observed packet loss: {observed_packet_loss} configured: {configured_packet_loss}"); // ensure packetloss within some% of configured value during test assert_float_relative_eq!(observed_packet_loss, configured_packet_loss, 0.5); let observed_dupe = harness.client.stats().packets_duplicate as f32 / server_sp as f32; let configured_dupe = harness.server_jitter_pipe.config_mut().duplicate_chance; println!("Observed duplicate rate: {observed_dupe} configured: {configured_dupe}"); assert_float_relative_eq!(observed_dupe, configured_dupe, 0.5); println!("Server Stats: {:?}", harness.server.stats()); println!("Server Packet loss: {}", harness.server.packet_loss()); println!("Server RTT: {}", harness.server.rtt()); println!("Client Stats: {:?}", harness.client.stats()); println!("Client Packet loss: {}", harness.client.packet_loss()); println!("Client RTT: {}", harness.client.rtt()); println!("Test harness transmission stats: {:?}", harness.stats); }