use std::net::{Ipv4Addr, SocketAddrV4}; use pallas_network::multiplexer::{Bearer, Plexer}; use rand::{distributions::Uniform, Rng}; use tokio::net::TcpListener; async fn setup_passive_muxer() -> Plexer { let server = TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, P)) .await .unwrap(); println!("listening for connections on port {P}"); let (bearer, _) = Bearer::accept_tcp(&server).await.unwrap(); Plexer::new(bearer) } async fn setup_active_muxer() -> Plexer { let bearer = Bearer::connect_tcp(SocketAddrV4::new(Ipv4Addr::LOCALHOST, P)) .await .unwrap(); println!("active plexer connected"); Plexer::new(bearer) } fn random_payload(size: usize) -> Vec { let range = Uniform::from(0..255); rand::thread_rng().sample_iter(&range).take(size).collect() } #[tokio::test] async fn one_way_small_sequence_of_payloads() { let passive = tokio::task::spawn(setup_passive_muxer::<50301>()); // HACK: a small sleep seems to be required for Github actions runner to // formally expose the port tokio::time::sleep(std::time::Duration::from_secs(1)).await; let mut active = setup_active_muxer::<50301>().await; let mut passive = passive.await.unwrap(); let mut sender_channel = active.subscribe_client(3); let mut receiver_channel = passive.subscribe_server(3); let passive = passive.spawn(); let active = active.spawn(); for _ in 0..100 { let payload = random_payload(50); println!("sending chunk"); sender_channel.enqueue_chunk(payload.clone()).await.unwrap(); let received_payload = receiver_channel.dequeue_chunk().await.unwrap(); assert_eq!(payload, received_payload); } passive.abort().await; active.abort().await; }