use clap::Parser; use std::collections::VecDeque; use std::{io::Write, str::FromStr, time::Instant}; use retty::bootstrap::BootstrapUdpServer; use retty::channel::{Context, Handler, Pipeline}; use retty::codec::{ byte_to_message_decoder::{LineBasedFrameDecoder, TaggedByteToMessageCodec, TerminatorType}, string_codec::TaggedStringCodec, }; use retty::executor::LocalExecutorBuilder; use retty::transport::{TaggedBytesMut, TaggedString}; //////////////////////////////////////////////////////////////////////////////////////////////////// struct EchoHandler { transmits: VecDeque, } impl EchoHandler { fn new() -> Self { Self { transmits: VecDeque::new(), } } } impl Handler for EchoHandler { type Rin = TaggedString; type Rout = Self::Rin; type Win = TaggedString; type Wout = Self::Win; fn name(&self) -> &str { "EchoHandler" } fn handle_read( &mut self, _ctx: &Context, msg: Self::Rin, ) { println!( "handling {} from {:?}", msg.message, msg.transport.peer_addr ); if msg.message != "bye" { self.transmits.push_back(TaggedString { now: Instant::now(), transport: msg.transport, message: format!("{}\r\n", msg.message), }); } } fn poll_timeout( &mut self, _ctx: &Context, _eto: &mut Instant, ) { //last handler, no need to fire_poll_timeout } fn poll_write( &mut self, ctx: &Context, ) -> Option { if let Some(msg) = ctx.fire_poll_write() { self.transmits.push_back(msg); } self.transmits.pop_front() } } #[derive(Parser)] #[command(name = "Echo Server UDP")] #[command(author = "Rusty Rain ")] #[command(version = "0.1.0")] #[command(about = "An example of echo server udp", long_about = None)] struct Cli { #[arg(short, long)] debug: bool, #[arg(long, default_value_t = format!("0.0.0.0"))] host: String, #[arg(long, default_value_t = 8080)] port: u16, #[arg(long, default_value_t = format!("INFO"))] log_level: String, } fn main() -> anyhow::Result<()> { let cli = Cli::parse(); let host = cli.host; let port = cli.port; let log_level = log::LevelFilter::from_str(&cli.log_level)?; if cli.debug { env_logger::Builder::new() .format(|buf, record| { writeln!( buf, "{}:{} [{}] {} - {}", record.file().unwrap_or("unknown"), record.line().unwrap_or(0), record.level(), chrono::Local::now().format("%H:%M:%S.%6f"), record.args() ) }) .filter(None, log_level) .init(); } println!("listening {}:{}...", host, port); LocalExecutorBuilder::default().run(async move { let mut bootstrap = BootstrapUdpServer::new(); bootstrap.pipeline(Box::new(move || { let pipeline: Pipeline = Pipeline::new(); let line_based_frame_decoder_handler = TaggedByteToMessageCodec::new(Box::new( LineBasedFrameDecoder::new(8192, true, TerminatorType::BOTH), )); let string_codec_handler = TaggedStringCodec::new(); let echo_handler = EchoHandler::new(); pipeline.add_back(line_based_frame_decoder_handler); pipeline.add_back(string_codec_handler); pipeline.add_back(echo_handler); pipeline.finalize() })); bootstrap.bind(format!("{}:{}", host, port)).await.unwrap(); println!("Press ctrl-c to stop"); println!("try `nc -u {} {}` in another shell", host, port); let (tx, rx) = futures::channel::oneshot::channel(); std::thread::spawn(move || { let mut tx = Some(tx); ctrlc::set_handler(move || { if let Some(tx) = tx.take() { let _ = tx.send(()); } }) .expect("Error setting Ctrl-C handler"); }); let _ = rx.await; bootstrap.graceful_stop().await; }); Ok(()) }