use cs_utils::constants::time::{DAY_MS, SECOND_MS}; use cs_utils::futures::wait; use cs_utils::random_str; use multiplexed_connection::{MultiplexedConnection, Connected, Channel}; use tokio::io::{duplex, split, AsyncReadExt, AsyncWriteExt}; use tokio::try_join; async fn handle_channel( connection_name: &'static str, channel: Box, should_send_data: bool, ) { let channel_label = channel.label().clone(); let (mut source, mut sink) = split(channel); let channel_label1 = channel_label.clone(); let channel_label2 = channel_label.clone(); try_join!( tokio::spawn(async move { let mut buf = [0; 1024]; while let Ok(bytes_read) = source.read(&mut buf).await { let data_str = std::str::from_utf8(&buf[..bytes_read]) .expect("Cannot parse UTF8 string."); println!("<< {:?} received {:?} on channel {:?}", &connection_name, &data_str, &channel_label1); } }), tokio::spawn(async move { loop { if !should_send_data { break; } let data_str = random_str(8); println!(">> {:?} sending {:?} on channel {:?}", &connection_name, &data_str, &channel_label2); sink .write(data_str.as_bytes()).await .expect("Cannot send data."); wait(1 * SECOND_MS).await; } }), ).unwrap(); } async fn handle_connection( connection_name: &'static str, mut connection: Box, ) { let mut on_remote_channel = connection.on_remote_channel() .expect("No `on_remote_channel` listener found."); try_join!( tokio::spawn(async move { while let Some(remote_channel) = on_remote_channel.recv().await { tokio::spawn(handle_channel(&connection_name, remote_channel, false)); } }), tokio::spawn(async move { loop { let channel_label = format!("channel-{}", random_str(4)); let channel = connection .channel(channel_label).await .expect("Failed to open data channel."); println!("🚏 {:?} created data channel {:?}", &connection_name, channel.label()); tokio::spawn(handle_channel(&connection_name, channel, true)); wait(15 * SECOND_MS).await; } }), ).unwrap(); } #[tokio::main] async fn main() { let (duplex1, duplex2) = duplex(4096); let connection1 = MultiplexedConnection::new(duplex1); let connection2 = MultiplexedConnection::new(duplex2); try_join!( tokio::spawn(async move { wait(10).await; let connected = connection1 .listen().await .expect("Failed to receive incoming connection."); println!("🙌 got incoming connection"); handle_connection( "connection1", connected, ).await }), tokio::spawn(async move { let connected = connection2 .connect().await .expect("Failed to connect."); println!("👌 connected"); handle_connection( "connection2", connected, ).await }), ).unwrap(); wait(DAY_MS).await; }