use crate::{Actor, ActorId, Caller, Context, Error, Handler, Message, Result, Sender}; use futures::channel::{mpsc, oneshot}; use futures::future::Shared; use futures::Future; use std::hash::{Hash, Hasher}; use std::pin::Pin; use std::sync::{Arc, Mutex, Weak}; type ExecFuture<'a> = Pin + Send + 'a>>; pub(crate) type ExecFn = Box FnOnce(&'a mut A, &'a mut Context) -> ExecFuture<'a> + Send + 'static>; pub(crate) enum ActorEvent { Exec(ExecFn), Stop(Option), RemoveStream(usize), } /// The address of an actor. /// /// When all references to `Addr` are dropped, the actor ends. /// You can use `Clone` trait to create multiple copies of `Addr`. pub struct Addr { pub(crate) actor_id: ActorId, pub(crate) tx: Arc>>, pub(crate) rx_exit: Option>>, } impl Clone for Addr { fn clone(&self) -> Self { Self { actor_id: self.actor_id, tx: self.tx.clone(), rx_exit: self.rx_exit.clone(), } } } impl Addr { pub fn downgrade(&self) -> WeakAddr { WeakAddr { actor_id: self.actor_id, tx: Arc::downgrade(&self.tx), rx_exit: self.rx_exit.clone(), } } } impl PartialEq for Addr { fn eq(&self, other: &Self) -> bool { self.actor_id == other.actor_id } } impl Hash for Addr { fn hash(&self, state: &mut H) { self.actor_id.hash(state) } } impl Addr { /// Returns the id of the actor. pub fn actor_id(&self) -> ActorId { self.actor_id } /// Stop the actor. pub fn stop(&mut self, err: Option) -> Result<()> { mpsc::UnboundedSender::clone(&*self.tx).start_send(ActorEvent::Stop(err))?; Ok(()) } /// Send a message `msg` to the actor and wait for the return value. pub async fn call(&self, msg: T) -> Result where A: Handler, { let (tx, rx) = oneshot::channel(); mpsc::UnboundedSender::clone(&*self.tx).start_send(ActorEvent::Exec(Box::new( move |actor, ctx| { Box::pin(async move { let res = Handler::handle(actor, ctx, msg).await; let _ = tx.send(res); }) }, )))?; Ok(rx.await?) } /// Send a message `msg` to the actor without waiting for the return value. pub fn send>(&self, msg: T) -> Result<()> where A: Handler, { mpsc::UnboundedSender::clone(&*self.tx).start_send(ActorEvent::Exec(Box::new( move |actor, ctx| { Box::pin(async move { Handler::handle(actor, ctx, msg).await; }) }, )))?; Ok(()) } /// Create a `Caller` for a specific message type pub fn caller(&self) -> Caller where A: Handler, { let weak_tx = Arc::downgrade(&self.tx); Caller { actor_id: self.actor_id.clone(), caller_fn: Mutex::new(Box::new(move |msg| { let weak_tx_option = weak_tx.upgrade(); Box::pin(async move { match weak_tx_option { Some(tx) => { let (oneshot_tx, oneshot_rx) = oneshot::channel(); mpsc::UnboundedSender::clone(&tx).start_send(ActorEvent::Exec( Box::new(move |actor, ctx| { Box::pin(async move { let res = Handler::handle(&mut *actor, ctx, msg).await; let _ = oneshot_tx.send(res); }) }), ))?; Ok(oneshot_rx.await?) } None => Err(anyhow::anyhow!("Actor Dropped")), } }) })), } } /// Create a `Sender` for a specific message type pub fn sender>(&self) -> Sender where A: Handler, { let weak_tx = Arc::downgrade(&self.tx); Sender { actor_id: self.actor_id.clone(), sender_fn: Box::new(move |msg| match weak_tx.upgrade() { Some(tx) => { mpsc::UnboundedSender::clone(&tx).start_send(ActorEvent::Exec(Box::new( move |actor, ctx| { Box::pin(async move { Handler::handle(&mut *actor, ctx, msg).await; }) }, )))?; Ok(()) } None => Ok(()), }), } } /// Wait for an actor to finish, and if the actor has finished, the function returns immediately. pub async fn wait_for_stop(self) { if let Some(rx_exit) = self.rx_exit { rx_exit.await.ok(); } else { futures::future::pending::<()>().await; } } } pub struct WeakAddr { pub(crate) actor_id: ActorId, pub(crate) tx: Weak>>, pub(crate) rx_exit: Option>>, } impl PartialEq for WeakAddr { fn eq(&self, other: &Self) -> bool { self.actor_id == other.actor_id } } impl Hash for WeakAddr { fn hash(&self, state: &mut H) { self.actor_id.hash(state) } } impl WeakAddr { pub fn upgrade(&self) -> Option> { match self.tx.upgrade() { Some(tx) => Some(Addr { actor_id: self.actor_id, tx, rx_exit: self.rx_exit.clone(), }), None => None, } } } impl Clone for WeakAddr { fn clone(&self) -> Self { Self { actor_id: self.actor_id, tx: self.tx.clone(), rx_exit: self.rx_exit.clone(), } } } impl WeakAddr { /// Returns the id of the actor. pub fn actor_id(&self) -> ActorId { self.actor_id } }