use crate::{Actor, Addr, Context, Handler, Message, Result, Sender, Service}; use fnv::FnvHasher; use std::any::Any; use std::collections::HashMap; use std::hash::BuildHasherDefault; use std::marker::PhantomData; type SubscriptionId = u64; pub(crate) struct Subscribe> { pub(crate) id: SubscriptionId, pub(crate) sender: Sender, } impl> Message for Subscribe { type Result = (); } pub(crate) struct Unsubscribe { pub(crate) id: SubscriptionId, } impl Message for Unsubscribe { type Result = (); } struct Publish + Clone>(T); impl + Clone> Message for Publish { type Result = (); } /// Message broker is used to support publishing and subscribing to messages. /// /// # Examples /// /// ```rust /// use xactor::*; /// use std::time::Duration; /// /// #[message] /// #[derive(Clone)] /// struct MyMsg(&'static str); /// /// #[message(result = "String")] /// struct GetValue; /// /// #[derive(Default)] /// struct MyActor(String); /// /// #[async_trait::async_trait] /// impl Actor for MyActor { /// async fn started(&mut self, ctx: &mut Context) -> Result<()> { /// ctx.subscribe::().await; /// Ok(()) /// } /// } /// /// #[async_trait::async_trait] /// impl Handler for MyActor { /// async fn handle(&mut self, _ctx: &mut Context, msg: MyMsg) { /// self.0 += msg.0; /// } /// } /// /// #[async_trait::async_trait] /// impl Handler for MyActor { /// async fn handle(&mut self, _ctx: &mut Context, _msg: GetValue) -> String { /// self.0.clone() /// } /// } /// /// #[xactor::main] /// async fn main() -> Result<()> { /// let mut addr1 = MyActor::start_default().await?; /// let mut addr2 = MyActor::start_default().await?; /// /// Broker::from_registry().await?.publish(MyMsg("a")); /// Broker::from_registry().await?.publish(MyMsg("b")); /// /// sleep(Duration::from_secs(1)).await; // Wait for the messages /// /// assert_eq!(addr1.call(GetValue).await?, "ab"); /// assert_eq!(addr2.call(GetValue).await?, "ab"); /// Ok(()) /// } /// ``` pub struct Broker> { subscribes: HashMap, BuildHasherDefault>, mark: PhantomData, } impl> Default for Broker { fn default() -> Self { Self { subscribes: Default::default(), mark: PhantomData, } } } impl> Actor for Broker {} impl> Service for Broker {} #[async_trait::async_trait] impl> Handler> for Broker { async fn handle(&mut self, _ctx: &mut Context, msg: Subscribe) { self.subscribes.insert(msg.id, Box::new(msg.sender)); } } #[async_trait::async_trait] impl> Handler for Broker { async fn handle(&mut self, _ctx: &mut Context, msg: Unsubscribe) { self.subscribes.remove(&msg.id); } } #[async_trait::async_trait] impl + Clone> Handler> for Broker { async fn handle(&mut self, _ctx: &mut Context, msg: Publish) { for sender in self.subscribes.values_mut() { if let Some(sender) = sender.downcast_mut::>() { sender.send(msg.0.clone()).ok(); } } } } impl + Clone> Addr> { /// Publishes a message of the specified type. pub fn publish(&mut self, msg: T) -> Result<()> { self.send(Publish(msg)) } }