use crate::addr::ActorEvent; use crate::broker::{Subscribe, Unsubscribe}; use crate::runtime::{sleep, spawn}; use crate::{ActorId, Addr, Broker, Error, Handler, Message, Result, Service, StreamHandler}; use futures::{ channel::mpsc, future::{AbortHandle, Abortable}, Stream, StreamExt, }; use once_cell::sync::OnceCell; use slab::Slab; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Weak}; use std::time::Duration; pub type RunningFuture = futures::future::Shared>; ///An actor execution context. pub struct Context { actor_id: ActorId, tx: Weak>>, pub(crate) rx_exit: Option, pub(crate) streams: Slab, pub(crate) intervals: Slab, } impl Context { pub(crate) fn new( rx_exit: Option, ) -> ( Self, mpsc::UnboundedReceiver>, Arc>>, ) { static ACTOR_ID: OnceCell = OnceCell::new(); // Get an actor id let actor_id = ACTOR_ID .get_or_init(Default::default) .fetch_add(1, Ordering::Relaxed); let (tx, rx) = mpsc::unbounded::>(); let tx = Arc::new(tx); let weak_tx = Arc::downgrade(&tx); ( Self { actor_id, tx: weak_tx, rx_exit, streams: Default::default(), intervals: Default::default(), }, rx, tx, ) } /// Returns the address of the actor. pub fn address(&self) -> Addr { Addr { actor_id: self.actor_id, // This getting unwrap panics tx: self.tx.upgrade().unwrap(), rx_exit: self.rx_exit.clone(), } } /// Returns the id of the actor. pub fn actor_id(&self) -> ActorId { self.actor_id } /// Stop the actor. pub fn stop(&self, err: Option) { if let Some(tx) = self.tx.upgrade() { mpsc::UnboundedSender::clone(&*tx) .start_send(ActorEvent::Stop(err)) .ok(); } } /// Stop the supervisor. /// /// this is ignored by normal actors pub fn stop_supervisor(&self, err: Option) { if let Some(tx) = self.tx.upgrade() { mpsc::UnboundedSender::clone(&*tx) .start_send(ActorEvent::StopSupervisor(err)) .ok(); } } pub fn stopped(&self) -> bool { self.rx_exit .as_ref() .map(|x| x.peek().is_some()) .unwrap_or(true) } pub fn abort_intervals(&mut self) { for handle in self.intervals.drain() { handle.abort() } } pub fn abort_streams(&mut self) { for handle in self.streams.drain() { handle.abort(); } } /// Create a stream handler for the actor. /// /// # Examples /// ```rust /// use hannibal::*; /// use futures::stream; /// use std::time::Duration; /// /// #[message(result = i32)] /// struct GetSum; /// /// #[derive(Default)] /// struct MyActor(i32); /// /// impl StreamHandler for MyActor { /// async fn handle(&mut self, _ctx: &mut Context, msg: i32) { /// self.0 += msg; /// } /// /// async fn started(&mut self, _ctx: &mut Context) { /// println!("stream started"); /// } /// /// async fn finished(&mut self, _ctx: &mut Context) { /// println!("stream finished"); /// } /// } /// /// impl Handler for MyActor { /// async fn handle(&mut self, _ctx: &mut Context, _msg: GetSum) -> i32 { /// self.0 /// } /// } /// /// impl Actor for MyActor { /// async fn started(&mut self, ctx: &mut Context) -> Result<()> { /// let values = (0..100).collect::>(); /// ctx.add_stream(stream::iter(values)); /// Ok(()) /// } /// } /// /// #[hannibal::main] /// async fn main() -> Result<()> { /// let mut addr = MyActor::start_default().await?; /// sleep(Duration::from_secs(1)).await; // Wait for the stream to complete /// let res = addr.call(GetSum).await?; /// assert_eq!(res, (0..100).sum::()); /// Ok(()) /// } /// ``` pub fn add_stream(&mut self, mut stream: S) where S: Stream + Unpin + Send + 'static, S::Item: 'static + Send, A: StreamHandler, { let tx = self.tx.clone(); let entry = self.streams.vacant_entry(); let id = entry.key(); let (handle, registration) = futures::future::AbortHandle::new_pair(); entry.insert(handle); let fut = async move { if let Some(tx) = tx.upgrade() { mpsc::UnboundedSender::clone(&*tx) .start_send(ActorEvent::Exec(Box::new(move |actor, ctx| { Box::pin(async move { StreamHandler::started(actor, ctx).await; }) }))) .ok(); } else { return; } while let Some(msg) = stream.next().await { if let Some(tx) = tx.upgrade() { let res = mpsc::UnboundedSender::clone(&*tx).start_send(ActorEvent::Exec( Box::new(move |actor, ctx| { Box::pin(async move { StreamHandler::handle(actor, ctx, msg).await; }) }), )); if res.is_err() { return; } } else { return; } } if let Some(tx) = tx.upgrade() { mpsc::UnboundedSender::clone(&*tx) .start_send(ActorEvent::Exec(Box::new(move |actor, ctx| { Box::pin(async move { StreamHandler::finished(actor, ctx).await; }) }))) .ok(); } if let Some(tx) = tx.upgrade() { mpsc::UnboundedSender::clone(&*tx) .start_send(ActorEvent::RemoveStream(id)) .ok(); } }; spawn(Abortable::new(fut, registration)); } /// Sends the message `msg` to self after a specified period of time. /// /// We use `Sender` instead of `Addr` so that the interval doesn't keep reference to address and prevent the actor from being dropped and stopped pub fn send_later(&mut self, msg: T, after: Duration) where A: Handler, T: Message, { let sender = self.address().sender(); let entry = self.intervals.vacant_entry(); let (handle, registration) = futures::future::AbortHandle::new_pair(); entry.insert(handle); spawn(Abortable::new( async move { sleep(after).await; sender.send(msg).ok(); }, registration, )); } /// Sends the message to self, at a specified fixed interval. /// The message is created each time using a closure `f`. pub fn send_interval_with(&mut self, f: F, dur: Duration) where A: Handler, F: Fn() -> T + Sync + Send + 'static, T: Message, { let sender = self.address().sender(); let entry = self.intervals.vacant_entry(); let (handle, registration) = futures::future::AbortHandle::new_pair(); entry.insert(handle); spawn(Abortable::new( async move { loop { sleep(dur).await; if sender.send(f()).is_err() { break; } } }, registration, )); } /// Sends the message `msg` to self, at a specified fixed interval. pub fn send_interval(&mut self, msg: T, dur: Duration) where A: Handler, T: Message + Clone + Sync, { self.send_interval_with(move || msg.clone(), dur); } /// Subscribes to a message of a specified type. pub async fn subscribe>(&self) -> Result<()> where A: Handler, { let broker = Broker::::from_registry().await?; let sender = self.address().sender(); broker .send(Subscribe { id: self.actor_id, sender, }) .ok(); Ok(()) } /// Unsubscribe to a message of a specified type. pub async fn unsubscribe>(&self) -> Result<()> { let broker = Broker::::from_registry().await?; broker.send(Unsubscribe { id: self.actor_id }) } }