use portus::ipc::{chan::Socket, BackendBuilder, Blocking, Ipc}; use portus::lang::Scope; use portus::{CongAlg, Datapath, DatapathInfo, DatapathTrait, Flow, Report}; use std::collections::HashMap; use std::marker::PhantomData; use std::sync::mpsc; use std::thread; pub const ACKED_PRIMITIVE: u32 = 5; // libccp uses this same value for acked_bytes mod mock_datapath; pub trait IntegrationTest: Sized { fn new() -> Self; fn datapath_programs() -> HashMap<&'static str, String>; fn install_test(&self, dp: &mut D) -> Option; fn check_test(&mut self, sc: &Scope, t: std::time::Instant, sock_id: u32, m: &Report) -> bool; } pub struct TestBaseConfig(mpsc::Sender>, PhantomData); impl CongAlg for TestBaseConfig { type Flow = TestBase; fn name() -> &'static str { "integration-test" } fn datapath_programs(&self) -> HashMap<&'static str, String> { T::datapath_programs() } fn new_flow(&self, control: Datapath, _info: DatapathInfo) -> Self::Flow { let mut tb = TestBase { control_channel: control, sc: Default::default(), test_start: std::time::Instant::now(), sender: self.0.clone(), t: T::new(), }; tb.sc = tb.t.install_test(&mut tb.control_channel); tb } } pub struct TestBase { pub control_channel: Datapath, pub sc: Option, pub test_start: std::time::Instant, pub sender: mpsc::Sender>, t: T, } impl Flow for TestBase { fn on_report(&mut self, sock_id: u32, m: Report) { let sc = self.sc.as_ref().unwrap(); let done = self.t.check_test(sc, self.test_start, sock_id, &m); if done { self.sender.send(Ok(())).unwrap(); } } } // Spawn userspace ccp fn start_ccp( sk: Socket, tx: mpsc::Sender>, ) -> portus::CCPHandle { portus::RunBuilder::new(BackendBuilder { sock: sk }) .default_alg(TestBaseConfig(tx, PhantomData::)) .spawn_thread() .run() .unwrap() } // Runs a specific intergration test pub fn run_test(num_flows: usize) { tracing_subscriber::fmt::try_init().unwrap_or_else(|_| ()); let (tx, rx) = std::sync::mpsc::channel(); // Channel for IPC let (s1, r1) = crossbeam::channel::unbounded(); let (s2, r2) = crossbeam::channel::unbounded(); // spawn libccp let (dp_handle, conn_handles) = mock_datapath::start(num_flows, s2, r1); let sk = Socket::::new(s1, r2); let ccp_handle = start_ccp::(sk, tx); // wait for program to finish let wait_for_done = thread::spawn(move || { rx.recv_timeout(std::time::Duration::from_secs(20)) .unwrap() .unwrap(); ccp_handle.kill(); // causes backend to stop iterating ccp_handle.wait().unwrap(); for h in conn_handles { h.cancel(); h.wait().unwrap(); } dp_handle.cancel(); dp_handle.wait().unwrap_or_else(|_| ()); }); wait_for_done.join().unwrap(); }