use std::{ net::SocketAddr, sync::{ atomic::{AtomicBool, Ordering}, Arc, }, time::{Duration, Instant}, }; use sans_io_runtime::{ backend::{BackendIncoming, BackendOutgoing, PollBackend}, collections::DynamicDeque, group_owner_type, Buffer, Controller, Task, TaskGroup, TaskGroupOutput, TaskSwitcherChild, WorkerInner, WorkerInnerInput, WorkerInnerOutput, }; type ExtIn = (); type ExtOut = (); type ICfg = (); type SCfg = EchoTaskCfg; type ChannelId = (); type Event = (); enum EchoTaskOutput { Net(BackendOutgoing), OnResourceEmpty, } #[derive(Debug, Clone)] struct EchoTaskCfg { count: usize, dest: SocketAddr, brust_size: usize, } struct EchoTask { count: usize, cfg: EchoTaskCfg, local_addr: SocketAddr, local_backend_slot: usize, output: DynamicDeque, shutdown: bool, } impl EchoTask { pub fn new(cfg: EchoTaskCfg) -> Self { log::info!("Create new echo client task in addr {}", cfg.dest); Self { count: 0, cfg, local_addr: SocketAddr::from(([127, 0, 0, 1], 0)), local_backend_slot: 0, output: DynamicDeque::from([EchoTaskOutput::Net(BackendOutgoing::UdpListen { addr: SocketAddr::from(([127, 0, 0, 1], 0)), reuse: false, })]), shutdown: false, } } } impl Task for EchoTask { fn on_tick(&mut self, _now: Instant) {} fn on_event(&mut self, _now: Instant, input: BackendIncoming) { match input { BackendIncoming::UdpListenResult { bind, result } => { log::info!("UdpListenResult: {} {:?}", bind, result); if let Ok((addr, slot)) = result { self.local_addr = addr; self.local_backend_slot = slot; for _ in 0..self.cfg.brust_size { self.output .push_back(EchoTaskOutput::Net(BackendOutgoing::UdpPacket { slot: self.local_backend_slot, to: self.cfg.dest, data: Buffer::from([0; 1000].to_vec()), })); } } } BackendIncoming::UdpPacket { from, slot, data } => { self.count += 1; if self.count >= self.cfg.count { log::info!("EchoTask done"); self.shutdown = true; } else { self.output .push_back(EchoTaskOutput::Net(BackendOutgoing::UdpPacket { slot, to: from, data, })); } } } } fn on_shutdown(&mut self, _now: Instant) { log::info!("EchoTask shutdown"); self.output .push_back(EchoTaskOutput::Net(BackendOutgoing::UdpUnlisten { slot: self.local_backend_slot, })); self.shutdown = true; } } impl TaskSwitcherChild for EchoTask { type Time = Instant; fn empty_event(&self) -> EchoTaskOutput { EchoTaskOutput::OnResourceEmpty } fn is_empty(&self) -> bool { self.shutdown && self.output.is_empty() } fn pop_output(&mut self, _now: Instant) -> Option { self.output.pop_front() } } group_owner_type!(OwnerType); struct EchoWorkerInner { worker: u16, tasks: TaskGroup, } impl WorkerInner for EchoWorkerInner { fn tasks(&self) -> usize { self.tasks.tasks() } fn is_empty(&self) -> bool { self.tasks.is_empty() } fn worker_index(&self) -> u16 { self.worker } fn build(worker: u16, _cfg: ICfg) -> Self { Self { worker, tasks: TaskGroup::default(), } } fn spawn(&mut self, _now: Instant, cfg: SCfg) { self.tasks.add_task(EchoTask::new(cfg)); } fn on_tick(&mut self, now: Instant) { self.tasks.on_tick(now); } fn on_event( &mut self, now: Instant, event: WorkerInnerInput, ) { match event { WorkerInnerInput::Net(owner, event) => { self.tasks.on_event(now, owner.index(), event); } _ => unreachable!(), } } fn pop_output( &mut self, now: Instant, ) -> Option> { loop { if let Some(TaskGroupOutput::TaskOutput(index, out)) = self.tasks.pop_output(now) { if let Some(out) = self.convert_output(OwnerType(index), out) { return Some(out); } } } } fn on_shutdown(&mut self, now: Instant) { self.tasks.on_shutdown(now); } } impl EchoWorkerInner { fn convert_output( &mut self, owner: OwnerType, out: EchoTaskOutput, ) -> Option> { match out { EchoTaskOutput::Net(out) => Some(WorkerInnerOutput::Net(owner, out)), EchoTaskOutput::OnResourceEmpty => { self.tasks.remove_task(owner.index()); Some(WorkerInnerOutput::Continue) } } } } fn main() { env_logger::init(); println!("{}", std::mem::size_of::()); let mut controller = Controller::::default(); controller.add_worker::>( Duration::from_secs(1), (), None, ); controller.add_worker::>( Duration::from_secs(1), (), None, ); for _i in 0..2 { controller.spawn(EchoTaskCfg { count: 1000, brust_size: 1, dest: SocketAddr::from(([127, 0, 0, 1], 10001)), }); controller.spawn(EchoTaskCfg { count: 10000, brust_size: 1, dest: SocketAddr::from(([127, 0, 0, 1], 10002)), }); } let term = Arc::new(AtomicBool::new(false)); signal_hook::flag::register(signal_hook::consts::SIGINT, Arc::clone(&term)) .expect("Should register hook"); while controller.process().is_some() { if term.load(Ordering::Relaxed) { controller.shutdown(); } std::thread::sleep(Duration::from_millis(10)); } log::info!("Server shutdown"); }