use std::{collections::HashMap, future::Future, pin::Pin}; use tokactor::{ util::{ io::{DataFrameReceiver, Writer}, read::Read, }, Actor, ActorRef, Ask, AskResult, AsyncAsk, Ctx, DeadActorResult, Handler, TcpRequest, World, }; use tracing::Level; #[derive(Default)] struct Broadcaster { map: HashMap, } impl Actor for Broadcaster {} impl Handler<(usize, Writer)> for Broadcaster { fn handle(&mut self, message: (usize, Writer), _context: &mut Ctx) { self.map.insert(message.0, message.1); } } impl Handler> for Broadcaster { fn handle(&mut self, message: Vec, context: &mut Ctx) { let addresses = self.map.values().map(Clone::clone).collect::>(); context.anonymous_task(async move { for address in addresses { let _ = address.write(message.clone()).await; } }); } } impl Handler for Broadcaster { fn handle(&mut self, message: usize, _context: &mut Ctx) { self.map.remove(&message); } } struct Connection { id: usize, broadcaster: ActorRef, } impl Actor for Connection {} impl AsyncAsk for Connection { type Output = (); type Future<'a> = Pin + Send + Sync + 'a>>; fn handle<'a>(&'a mut self, Data(message): Data, _: &mut Ctx) -> Self::Future<'a> { let broadcaster = self.broadcaster.clone(); Box::pin(async move { broadcaster.send_async(message).await.unwrap(); }) } } #[derive(Default)] struct Router { counter: usize, broadcaster: Option>, } impl Actor for Router { fn on_start(&mut self, ctx: &mut Ctx) where Self: Actor, { self.broadcaster = Some(ctx.spawn(Broadcaster::default())); } } impl Ask for Router { type Result = Connection; fn handle(&mut self, message: TcpRequest, context: &mut Ctx) -> AskResult { let conn = Connection { id: self.counter, broadcaster: self.broadcaster.as_ref().unwrap().clone(), }; let counter = self.counter; let broadcaster = self.broadcaster.as_ref().unwrap().clone(); context.anonymous_task(async move { broadcaster.send_async((counter, message.0)).await.unwrap(); }); self.counter += 1; AskResult::Reply(conn) } } impl Handler> for Router { fn handle(&mut self, dead: DeadActorResult, _context: &mut Ctx) { match dead { Ok(_bcast) => { // how do we delete this? } Err(error) => { println!("Broadcast actor died unexpectingly with {:?}", error); } } } } impl Handler> for Router { fn handle(&mut self, dead: DeadActorResult, ctx: &mut Ctx) { match dead { Ok(conn) => { // how do we delete this? let broadcaster = self.broadcaster.as_ref().unwrap().clone(); ctx.anonymous_task(async move { broadcaster.send_async(conn.actor.id).await.unwrap(); }); } Err(error) => { println!("Connection actor died unexpectingly with {:?}", error); } } } } #[derive(Default, Debug)] struct Data(Vec); impl DataFrameReceiver for Data { type Request = Self; type Frame = Read<1024>; fn recv(&mut self, frame: &Self::Frame) -> Option { Some(Data(frame.to_vec())) } } fn main() { tracing_subscriber::fmt() .pretty() // all spans/events with a level higher than TRACE (e.g, info, warn, etc.) // will be written to stdout. .with_max_level(Level::TRACE) .with_writer(std::io::stdout) // sets this to be the default, global collector for this application. .init(); tracing::info!("Starting up..."); let mut world = World::new().unwrap(); let tcp_input = world .tcp_component::("127.0.0.1:8080", Router::default()) .unwrap(); world.on_input(tcp_input); world.block_until_completion(); println!("Completed! Shutting down..."); }