#![cfg(feature = "rustls")] use std::{ convert::TryInto, error::Error, str::FromStr, sync::{Arc, Mutex}, time::Duration, }; use crc::Crc; use quinn::{ConnectionError, ReadError, TransportConfig, WriteError}; use rand::{self, RngCore}; use scionnet::SocketAddr; // use std::sync::mpsc::{Sender, Receiver}; use clap::Parser; use futures::future::join_all; use futures::future::TryFutureExt; use futures::try_join; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tracing::{trace, warn}; #[derive(Parser, Debug)] #[clap(name = "server")] struct Opt { #[clap(long = "count")] count: u16, #[clap(long = "listen")] listen: Option, #[clap(long = "remote")] remote: Option, } async fn run_client(opts: Opt) -> Result<(), Box> { let mut endpoint = quinn::Endpoint::scion_client( None, None, Some(SocketAddr::from_str(&opts.listen.unwrap_or("0.0.0.0:0".to_owned())).unwrap()), )?; endpoint.set_default_client_config(configure_connector()); let connecting = endpoint .connect( SocketAddr::from_str(&opts.remote.unwrap()).unwrap(), "localhost", ) .map_err(|e| Box::new(e))?; let mut c = connecting.await?; let mut conn2 = c.clone(); let client_task = async move { let mut conn = conn2; for _ in 0..opts.count { //----------------------------------------------------------------------- // extract this loop body into its own async block and runn all iterations of the loop concurrently let data = random_data(1024 * 1024); let (mut ss, mut rs) = conn.open_bi().await?; let write_handle = tokio::spawn(async move { let mut ss = chunky_send(data, ss).await?; // Suppress finish errors, since the peer may close before ACKing match ss.finish().await { Ok(()) => return Ok(ss), Err(WriteError::ConnectionLost(ConnectionError::ApplicationClosed { .. })) => return Ok(ss), Err(e) => return Err(Box::new(e) as Box), }; //Result::>::Ok(ss) }); let read_handle = tokio::spawn(async move { chunky_read(rs).await }); // wait for the echo-ing to finish let res = try_join!(read_handle, write_handle) .map(|res| (res.0.unwrap(), res.1.unwrap())) .map_err(|e| Box::new(e) as Box); // then close the stream if res.is_ok() { res.unwrap() .1 .finish() .await .map_err(|e| Box::new(e) as Box)? } /* else{ res.map(|_|()) } */ //----------------------------------------------------------------------------- } Result::<(), Box>::Ok(()) }; client_task .and_then(|_| async move { c.close(0u32.into(), &[]); Ok(()) }) .await } // writes data to send stream async fn chunky_send( data: Vec, mut s: quinn::SendStream, ) -> Result> { for (_i, chunk) in data.chunks(1024).enumerate() { s.write_chunk( //bytes::Bytes::from_static( chunk ) bytes::Bytes::copy_from_slice(chunk), ) .await .map_err(|e| Box::new(e) as Box)? } Ok(s) } async fn chunky_read( mut s: quinn::RecvStream, ) -> Result> { while let Some(chunk) = s .read_chunk(1024, true) .await .map_err(|e| Box::new(e) as Box)? { //trace!("read chunk: {:?}", chunk); println!("read chunk: {:?}", chunk); } Ok(s) } async fn run_server(opts: Opt) -> Result<(), Box> { let cfg = configure_listener(); let endpoint = quinn::Endpoint::scion_server( None, cfg, SocketAddr::from_str(&opts.listen.unwrap_or("0.0.0.0:0".to_owned()))?, None, ) .map_err(|e| Box::new(e))?; let endpoint2 = endpoint.clone(); let accept_incoming = async move { loop { let conn = endpoint2.accept().await.unwrap().await.unwrap(); let conn2 = conn.clone(); let handle_echo_conn = async move { //while let Ok(stream) = conn.accept_bi().await.map_err(|e|Box::new(e)) loop { match conn .accept_bi() .await .map_err(|e| Box::new(e) as Box) { Ok((mut ss, mut rs)) => { let (tx, rx): (Sender, Receiver) = channel(1024); let handle_read = tokio::spawn(chunky_stream_read(rs, tx)); let handle_write = tokio::spawn(chunky_stream_write(rx, ss)); // wait for the echo-ing to finish let res = try_join!(handle_read, handle_write) .map(|res| (res.0.unwrap(), res.1.unwrap())) .map_err(|e| Box::new(e) as Box); // then close the stream if res.is_ok() { res.unwrap() .1 .finish() .await .map_err(|e| Box::new(e) as Box)? } else { // if we get here, res is an Err variant return res.map(|_| ()); } } Err(e) => { // did remote close its end of the conn ?! return Result::<(), Box>::Err(e); } } } // Ok(()) as Result::<(),Box> }; let _handle = tokio::spawn(async move { if let Err(e) = handle_echo_conn .and_then(|_| async move { conn2.close(0u32.into(), &[]); Ok(()) }) .await { warn!("ERROR: {:?}", e); } }); } }; tokio::spawn(accept_incoming); Ok(()) } // for the server: it reads from recv-stream and echos back through channel // (from where chunky_stream_write will send it back to client ) async fn chunky_stream_read( mut rstream: quinn::RecvStream, chan: Sender, ) -> Result> { while let Some(chunk) = rstream .read_chunk(1024, true) .await .map_err(|e| Box::new(e) as Box)? { trace!("read chunk: {:?}", chunk); chan.send(chunk) .await .map_err(|e| Box::new(e) as Box)?; } Ok(rstream) } // for server: it reads from channel and writes it back to client via SendStream async fn chunky_stream_write( mut chan: Receiver, mut sstream: quinn::SendStream, ) -> Result> { while let Some(chunk) = chan.recv().await { trace!("send chunk: {:?}", chunk); sstream .write_chunk(chunk.bytes) .await .map_err(|e| Box::new(e) as Box)?; } Ok(sstream) } /* async fn read_from_stream( mut stream: quinn::RecvStream, ) -> Result, quinn::ConnectionError> { let crc = crc::Crc::::new(&crc::CRC_32_ISO_HDLC); match stream.read_to_end(1024 * 1024 * 5).await { Ok(data) => { assert!(hash_correct(&data, &crc)); Ok(data) } Err(e) => { use quinn::ReadToEndError::*; use ReadError::*; match e { TooLong | Read(UnknownStream) | Read(ZeroRttRejected) | Read(IllegalOrderedRead) => unreachable!(), Read(Reset(error_code)) => panic!("unexpected stream reset: {error_code}"), Read(ConnectionLost(e)) => Err(e), } } } } async fn write_to_peer(conn: quinn::Connection, data: Vec) -> Result<(), WriteError> { let mut s = conn.open_bi().await.map_err(WriteError::ConnectionLost)?; s.write_all(&data).await?; // Suppress finish errors, since the peer may close before ACKing match s.finish().await { Ok(()) => Ok(()), Err(WriteError::ConnectionLost(ConnectionError::ApplicationClosed { .. })) => Ok(()), Err(e) => Err(e), } } async fn write_to_stream(s: quinn::SendStream, data: Vec) -> Result<(), WriteError> { s.write_all(&data).await?; // Suppress finish errors, since the peer may close before ACKing match s.finish().await { Ok(()) => Ok(()), Err(WriteError::ConnectionLost(ConnectionError::ApplicationClosed { .. })) => Ok(()), Err(e) => Err(e), } } */ #[tokio::main] async fn main() { tracing::subscriber::set_global_default( tracing_subscriber::FmtSubscriber::builder() .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) .finish(), ) .unwrap(); let opt = Opt::parse(); let code = { if opt.remote.is_some() { if let Err(e) = run_server(opt).await { eprintln!("ERROR: {e}"); 1 } else { 0 } } else { if let Err(e) = run_client(opt).await { eprintln!("ERROR: {e}"); 1 } else { 0 } } }; ::std::process::exit(code); } /// Builds client configuration. Trusts given node certificate. fn configure_connector() -> quinn::ClientConfig { let mut transport_config = TransportConfig::default(); transport_config.max_idle_timeout(Some(Duration::from_secs(20).try_into().unwrap())); let mut peer_cfg: quinn::ClientConfig = configure_insecure_client(); peer_cfg.transport_config(Arc::new(transport_config)); peer_cfg } /// Dummy certificate verifier that treats any certificate as valid. /// NOTE, such verification is vulnerable to MITM attacks, but convenient for testing. struct SkipServerVerification; impl SkipServerVerification { fn new() -> Arc { Arc::new(Self) } } impl rustls::client::ServerCertVerifier for SkipServerVerification { fn verify_server_cert( &self, _end_entity: &rustls::Certificate, _intermediates: &[rustls::Certificate], _server_name: &rustls::ServerName, _scts: &mut dyn Iterator, _ocsp_response: &[u8], _now: std::time::SystemTime, ) -> Result { Ok(rustls::client::ServerCertVerified::assertion()) } } fn configure_insecure_client() -> quinn::ClientConfig { let crypto = rustls::ClientConfig::builder() .with_safe_defaults() .with_custom_certificate_verifier(SkipServerVerification::new()) .with_no_client_auth(); quinn::ClientConfig::new(Arc::new(crypto)) } /// Builds listener configuration along with its certificate. fn configure_listener() -> quinn::ServerConfig { let (our_cert, our_priv_key) = gen_cert(); let mut our_cfg = quinn::ServerConfig::with_single_cert(vec![our_cert.clone()], our_priv_key).unwrap(); let transport_config = Arc::get_mut(&mut our_cfg.transport).unwrap(); transport_config.max_idle_timeout(Some(Duration::from_secs(20).try_into().unwrap())); our_cfg } fn gen_cert() -> (rustls::Certificate, rustls::PrivateKey) { let cert = rcgen::generate_simple_self_signed(vec!["localhost".to_string()]).unwrap(); let key = rustls::PrivateKey(cert.serialize_private_key_der()); (rustls::Certificate(cert.serialize_der().unwrap()), key) } /// Constructs a buffer with random bytes of given size prefixed with a hash of this data. fn random_data_with_hash(size: usize, crc: &Crc) -> Vec { let mut data = random_vec(size + 4); let hash = crc.checksum(&data[4..]); // write hash in big endian data[0] = (hash >> 24) as u8; data[1] = ((hash >> 16) & 0xff) as u8; data[2] = ((hash >> 8) & 0xff) as u8; data[3] = (hash & 0xff) as u8; data } fn random_data(size: usize) -> Vec { random_vec(size) } /// Checks if given data buffer hash is correct. Hash itself is a 4 byte prefix in the data. fn hash_correct(data: &[u8], crc: &Crc) -> bool { let encoded_hash = ((data[0] as u32) << 24) | ((data[1] as u32) << 16) | ((data[2] as u32) << 8) | data[3] as u32; let actual_hash = crc.checksum(&data[4..]); encoded_hash == actual_hash } #[allow(unsafe_code)] fn random_vec(size: usize) -> Vec { let mut ret = vec![0; size]; rand::thread_rng().fill_bytes(&mut ret[..]); ret }