use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::{ net::SocketAddr, time::{Duration, Instant}, }; use sans_io_runtime::backend::{BackendIncoming, BackendOutgoing}; use sans_io_runtime::collections::DynamicDeque; use sans_io_runtime::{ backend::PollBackend, Controller, WorkerInner, WorkerInnerInput, WorkerInnerOutput, }; type ExtIn = (); type ExtOut = (); type ChannelId = (); type Event = (); type ICfg = EchoWorkerCfg; type SCfg = (); type OwnerType = (); struct EchoWorkerCfg { bind: SocketAddr, } struct EchoWorker { worker: u16, backend_slot: usize, output: DynamicDeque, 16>, shutdown: bool, } impl WorkerInner for EchoWorker { fn build(worker: u16, cfg: EchoWorkerCfg) -> Self { log::info!("Create new echo task in addr {}", cfg.bind); Self { worker, backend_slot: 0, output: DynamicDeque::from([WorkerInnerOutput::Net( (), BackendOutgoing::UdpListen { addr: cfg.bind, reuse: true, }, )]), shutdown: false, } } fn worker_index(&self) -> u16 { self.worker } fn tasks(&self) -> usize { if self.shutdown { 0 } else { 1 } } fn is_empty(&self) -> bool { self.shutdown && self.output.is_empty() } fn spawn(&mut self, _now: Instant, _cfg: SCfg) {} fn on_tick(&mut self, _now: Instant) {} fn on_event( &mut self, _now: Instant, event: WorkerInnerInput, ) { match event { WorkerInnerInput::Net(_owner, BackendIncoming::UdpListenResult { bind, result }) => { log::info!("UdpListenResult: {} {:?}", bind, result); self.backend_slot = result.expect("Should bind success").1; } WorkerInnerInput::Net(_owner, BackendIncoming::UdpPacket { from, slot, data }) => { assert!(data.len() <= 1500, "data too large"); self.output.push_back(WorkerInnerOutput::Net( (), BackendOutgoing::UdpPacket { slot, to: from, data, }, )); } _ => {} } } fn pop_output( &mut self, _now: Instant, ) -> Option> { self.output.pop_front() } fn on_shutdown(&mut self, _now: Instant) { if self.shutdown { return; } log::info!("EchoServer {} shutdown", self.worker); self.shutdown = true; self.output.push_back(WorkerInnerOutput::Net( (), BackendOutgoing::UdpUnlisten { slot: self.backend_slot, }, )); } } fn main() { env_logger::init(); let mut controller = Controller::::default(); controller.add_worker::>( Duration::from_secs(1), EchoWorkerCfg { bind: SocketAddr::from(([127, 0, 0, 1], 10001)), }, None, ); controller.add_worker::>( Duration::from_secs(1), EchoWorkerCfg { bind: SocketAddr::from(([127, 0, 0, 1], 10002)), }, None, ); let term = Arc::new(AtomicBool::new(false)); signal_hook::flag::register(signal_hook::consts::SIGINT, Arc::clone(&term)) .expect("Should register hook"); while controller.process().is_some() { if term.load(Ordering::Relaxed) { controller.shutdown(); } std::thread::sleep(Duration::from_millis(10)); } log::info!("Server shutdown"); }