use futures::{ future::{self, FutureResult}, prelude::*, stream::futures_unordered::FuturesUnordered }; use holly::{Error, prelude::*}; use std::{collections::HashMap, fmt::Display}; use tokio_threadpool::ThreadPool; // A request message is sent to a broadcast actor. enum Request { // Register a broadcast receiver. Register(Addr>), // Deliver a message to all receivers. Broadcast(T) } // A message sent from `Broadcast` to `Receiver`. enum Message { // Upon registration, the registration ID is sent. Id(u32), // A broadcast item. Data(T) } // The broadcast actor. struct Broadcast { id_counter: u32, receivers: HashMap>> } impl Actor, Error> for Broadcast { type Result = Box>, Error = Error> + Send>; fn process(mut self, _ctx: &mut Context>, msg: Option>) -> Self::Result { match msg { Some(Request::Register(addr)) => { let id = self.id_counter; self.receivers.insert(id, addr.clone()); self.id_counter += 1; let f = addr.send(Message::Id(id)).then(move |r| { if r.is_err() { self.receivers.remove(&id); self.id_counter -= 1 } Ok(State::Ready(self)) }); Box::new(f) } Some(Request::Broadcast(msg)) => { let mut f = FuturesUnordered::new(); for a in self.receivers.values().cloned() { f.push(a.send(Message::Data(msg.clone()))) } let f = f.for_each(|_| Ok(())).map(move |_| State::Done); Box::new(f) } None => { Box::new(future::ok(State::Done)) } } } } // A receiver of broadcasts. struct Receiver(u32); impl Actor, ()> for Receiver { type Result = FutureResult>, ()>; fn process(mut self, _ctx: &mut Context>, msg: Option>) -> Self::Result { match msg { Some(Message::Id(id)) => { self.0 = id; future::ok(State::Ready(self)) } Some(Message::Data(x)) => { println!("Receiver {}: {}", self.0, x); future::ok(State::Done) } None => future::ok(State::Done) } } } #[test] fn broadcast() -> Result<(), Error> { let pool = ThreadPool::new(); let exec = Scheduler::new(pool.sender().clone()); let mut broadcast = exec.spawn(Broadcast { id_counter: 0, receivers: HashMap::new() })?; for _ in 0 .. 10 { broadcast.send_now(Request::Register(exec.spawn(Receiver(0))?))?; } broadcast.send_now(Request::Broadcast("Hello receivers!"))?; pool.shutdown_on_idle().wait().unwrap(); Ok(()) }