use dialectic::prelude::*; use dialectic_tokio_serde::codec::LengthDelimitedCodec; use dialectic_tokio_serde_bincode::Bincode; use colored::*; use std::{error::Error, fmt::Debug, future::Future, io, process}; use structopt::StructOpt; use tokio::{ io::{AsyncBufRead, AsyncBufReadExt, AsyncWrite, AsyncWriteExt, BufReader, Stdin, Stdout}, net::{ tcp::{OwnedReadHalf, OwnedWriteHalf}, TcpListener, TcpStream, }, }; /// Prompt the user for a line of input, looping until they enter something that parses. #[allow(unused)] pub async fn prompt( prompt: &str, input: &mut R, output: &mut W, mut parse: impl FnMut(&str) -> Result, ) -> io::Result where R: AsyncBufRead + Unpin, W: AsyncWrite + Unpin, { loop { output.write_all(prompt.as_bytes()).await?; output.flush().await?; let mut line = String::new(); if 0 == input.read_line(&mut line).await? { break Err(io::Error::new( io::ErrorKind::UnexpectedEof, "unexpected end of input", )); } match parse(line.trim()) { Ok(t) => break Ok(t), Err(_) => continue, } } } /// A session-typed channel over TCP using length-delimited bincode encoding for serialization. pub type TcpChan = dialectic_tokio_serde::SymmetricalChan< S, Bincode, LengthDelimitedCodec, OwnedWriteHalf, OwnedReadHalf, >; /// Wrap a raw TCP socket in a given session type, using the length delimited bincode transport /// format/encoding. fn wrap_socket

(socket: TcpStream, max_length: usize) -> TcpChan

where P: Session, { let (rx, tx) = socket.into_split(); let (tx, rx) = dialectic_tokio_serde_bincode::length_delimited(tx, rx, 4, max_length); P::wrap(tx, rx) } /// Options for a TCP app. #[derive(Debug, Clone, StructOpt)] struct TcpAppOptions { #[structopt(subcommand)] party: Party, } /// A session-typed client/server application using TCP. #[derive(Debug, Clone, StructOpt)] enum Party { /// Run the client for this session Client { /// Connect to a server running on this port #[structopt(short, long, default_value = "5000")] port: u16, /// Connect to this IP address #[structopt(short, long, default_value = "127.0.0.1")] address: String, }, /// Run the server for this session Server { /// Serve on this port #[structopt(short, long, default_value = "5000")] port: u16, /// Serve on this IP address #[structopt(short, long, default_value = "127.0.0.1")] address: String, }, } /// Given client and server functions for a given session type `P`, construct a simple app that /// behaves as either a client or a server, depending on its command line arguments, and /// communicates over TCP. pub async fn demo( server: &'static Server, client: &'static Client, max_length: usize, ) where P: Session, P::Dual: Session, Server: Fn(TcpChan

) -> ServerFuture + Sync + 'static, Client: Fn(BufReader, Stdout, TcpChan) -> ClientFuture + Sync + 'static, ServerFuture: Future>> + std::marker::Send + 'static, ClientFuture: Future>> + std::marker::Send + 'static, ServerResult: Debug, ClientResult: Debug, { use Party::*; let options = TcpAppOptions::from_args(); if let Err(e) = match options.party { Server { port, ref address } => { listen_on::(address, port, max_length, server).await } Client { port, ref address } => { connect_to::(address, port, max_length, client).await } } { let party_name = match options.party { Server { .. } => "server", Client { .. } => "client", }; eprintln!("{}", format!("[{}] Error: {}", party_name, e).red()); process::exit(1); } } async fn listen_on( address: &str, port: u16, max_length: usize, interaction: &'static F, ) -> Result<(), Box> where F: Fn(TcpChan

) -> Fut + Sync + 'static, Fut: Future>> + std::marker::Send, P: Session, T: Debug, { let listener = TcpListener::bind((address, port)).await?; println!( "{}", format!("[server] Listening on {:?}", listener.local_addr().unwrap()).blue() ); loop { let (socket, addr) = listener.accept().await?; tokio::spawn(async move { let _ = interaction(wrap_socket::

(socket, max_length)) .await .map(|result| { println!( "{}", format!("[server] {} - Result: {:?}", addr, result).green() ) }) .map_err(|err| { eprintln!("{}", format!("[server] {} - Error: {}", addr, err).red()) }); }); } } async fn connect_to( address: &str, port: u16, max_length: usize, interaction: &'static F, ) -> Result<(), Box> where F: Fn(BufReader, Stdout, TcpChan

) -> Fut + Sync + 'static, Fut: Future>> + std::marker::Send, P: Session, T: Debug, { let socket = TcpStream::connect((address, port)).await?; println!( "{}", format!( "[client] Connected to {:?} from {:?}", socket.peer_addr().unwrap(), socket.local_addr().unwrap() ) .blue() ); let stdin = BufReader::new(tokio::io::stdin()); let stdout = tokio::io::stdout(); let result = interaction(stdin, stdout, wrap_socket::

(socket, max_length)).await?; println!("{}", format!("[client] Result: {:?}", result).green()); Ok(()) }