#[allow(unused)] mod hello_world_actor { use abcgen::*; #[events] #[derive(Debug, Clone)] pub enum HelloWorldActorEvent { SomeoneAskedMyName(String), Message(String), } #[doc = " Some errors that we want to return in our actor's handler methods"] #[derive(thiserror :: Error, Debug)] pub enum HelloWorldError { #[error("Actor already stopped")] AlreadyStopped, #[error("HelloWorldErrors 1")] Error1, #[error("HelloWorldErrors 2")] Error2, } #[doc = " It is useful to implement the From for the errors"] #[doc = " that can be returned by the actor's handler methods so that"] #[doc = " the proxy can return a flat Result instead"] #[doc = " of a nested Result, AbcgenError>"] impl From for HelloWorldError { fn from(_: abcgen::AbcgenError) -> Self { HelloWorldError::AlreadyStopped } } #[derive(thiserror :: Error, Debug)] pub enum SomeOtherError { #[error("SomeOtherErrors 1")] Error1, #[error("SomeOtherErrors 2")] Error2, } #[actor] pub struct HelloWorldActor { pub event_sender: Option, } impl HelloWorldActor { #[doc = " The following function *must* be implemented by the user and is called by the run function"] async fn start(&mut self, task_sender: TaskSender, event_sender: EventSender) { self.event_sender = Some(event_sender); println!("Hello, World!"); tokio::spawn(async move { tokio::time::sleep(std::time::Duration::from_secs(1)).await; send_task ! (task_sender (this) => { this . still_here () . await ; }); }); } #[doc = " The following function must be implemented by the user and is called befor termination"] async fn shutdown(&mut self) { println!("Goodbye, World!"); } #[doc = " following function is meant to handle a message because it is marked with `#[message_handler]`"] #[doc = " abcgen generates a message for it"] #[doc = " that should have the following signature:"] #[doc = " ```"] #[doc = " HelloWorldActorMessage::TellMeYourName({caller: String, respond_to: tokio::sync::oneshot::Sender>})"] #[doc = " ```"] #[doc = " A specular function is generated on the proxy that can be called to send the message and receive the response."] #[doc = " In this case the fuction of the proxy will return the same `Result` because there is"] #[doc = " a conversion `From` for HelloWorldError otherwise it would return a nested `Result, AbcgenError>`"] #[doc = ""] #[message_handler] async fn tell_me_your_name(&mut self, caller: String) -> Result { self.event_sender .as_ref() .unwrap() .send(HelloWorldActorEvent::SomeoneAskedMyName(caller.clone())) .unwrap(); println!("Hello {}, I am HelloWorldActor", caller); Ok("HelloWorldActor".to_string()) } #[doc = " The following function is meant to handle a message because it is marked with `#[message_handler]`"] #[doc = " In this case the fuction generated on the proxy will return a nested `Result, AbcgenError>`"] #[message_handler] async fn do_that(&mut self) -> Result<(), SomeOtherError> { println!("do_that called"); Ok(()) } #[doc = " The following function can be enqueued as a task to executed in the actor's task"] fn still_here(&mut self) -> PinnedFuture<()> { Box::pin(async { self.event_sender .as_ref() .unwrap() .send(HelloWorldActorEvent::Message( "Hello world again, I'm still here.".to_string(), )) .unwrap(); }) } } type EventSender = tokio::sync::broadcast::Sender; pub type TaskSender = tokio::sync::mpsc::Sender>; #[derive(Debug)] pub enum HelloWorldActorMessage { TellMeYourName { caller: String, respond_to: tokio::sync::oneshot::Sender>, }, DoThat { respond_to: tokio::sync::oneshot::Sender>, }, } impl HelloWorldActor { pub fn run(self) -> HelloWorldActorProxy { let (msg_sender, mut msg_receiver) = tokio::sync::mpsc::channel(10usize); let (event_sender, _) = tokio::sync::broadcast::channel::(10usize); let event_sender_clone = event_sender.clone(); let (stop_sender, stop_receiver) = tokio::sync::oneshot::channel::<()>(); let (task_sender, mut task_receiver) = tokio::sync::mpsc::channel::>(10usize); tokio::spawn(async move { let mut actor = self; actor.start(task_sender, event_sender_clone).await; tokio::select! { _ = actor . select_receivers (& mut msg_receiver , & mut task_receiver) => { log :: debug ! ("(abcgen) all proxies dropped") ; } _ = stop_receiver => { log :: debug ! ("(abcgen) stop signal received") ; } } actor.shutdown().await; }); let proxy = HelloWorldActorProxy { message_sender: msg_sender, stop_signal: Some(stop_sender), events: event_sender, }; proxy } async fn select_receivers( &mut self, msg_receiver: &mut tokio::sync::mpsc::Receiver, task_receiver: &mut tokio::sync::mpsc::Receiver>, ) { loop { tokio::select! { msg = msg_receiver . recv () => { match msg { Some (msg) => { self . dispatch (msg) . await ; } None => { break ; } } } , task = task_receiver . recv () => { if let Some (task) = task { task (self) . await ; } } } } } async fn dispatch(&mut self, message: HelloWorldActorMessage) { match message { HelloWorldActorMessage::TellMeYourName { caller, respond_to } => { let result = self.tell_me_your_name(caller).await; respond_to.send(result).unwrap(); } HelloWorldActorMessage::DoThat { respond_to } => { let result = self.do_that().await; respond_to.send(result).unwrap(); } } } } pub struct HelloWorldActorProxy { message_sender: tokio::sync::mpsc::Sender, events: tokio::sync::broadcast::Sender, stop_signal: std::option::Option>, } impl HelloWorldActorProxy { pub fn is_running(&self) -> bool { match self.stop_signal.as_ref() { Some(s) => !s.is_closed(), None => false, } } pub fn stop(&mut self) -> Result<(), AbcgenError> { match self.stop_signal.take() { Some(tx) => tx.send(()).map_err(|_e: ()| AbcgenError::ActorShutDown), None => Err(AbcgenError::ActorShutDown), } } pub async fn stop_and_wait(&mut self) -> Result<(), AbcgenError> { self.stop()?; while self.is_running() { tokio::time::sleep(std::time::Duration::from_millis(100)).await; } Ok(()) } pub fn get_events(&self) -> tokio::sync::broadcast::Receiver { self.events.subscribe() } pub async fn tell_me_your_name(&self, caller: String) -> Result { let (tx, rx) = tokio::sync::oneshot::channel(); let msg = HelloWorldActorMessage::TellMeYourName { caller, respond_to: tx, }; let send_res = self.message_sender.send(msg).await; match send_res { Ok(_) => rx .await .unwrap_or_else(|e| Err(AbcgenError::ActorShutDown.into())), Err(e) => Err(AbcgenError::ActorShutDown.into()), } } pub async fn do_that(&self) -> Result, AbcgenError> { let (tx, rx) = tokio::sync::oneshot::channel(); let msg = HelloWorldActorMessage::DoThat { respond_to: tx }; let send_res = self.message_sender.send(msg).await; match send_res { Ok(_) => rx.await.map_err(|e| AbcgenError::ActorShutDown), Err(e) => Err(AbcgenError::ActorShutDown), } } } } use abcgen::AbcgenError; use hello_world_actor::{HelloWorldActor, HelloWorldActorEvent, SomeOtherError}; #[tokio::main] async fn main() { let actor = HelloWorldActor { event_sender: None }; let proxy = actor.run(); let mut events_rx = proxy.get_events().resubscribe(); tokio::spawn(async move { while let Ok(event) = events_rx.recv().await { match event { HelloWorldActorEvent::SomeoneAskedMyName(name) => { println!("{} asked my name", name); } HelloWorldActorEvent::Message(msg) => { println!("Actor said: \"{}\"", msg); } } } }); let do_that_res: Result, AbcgenError> = proxy.do_that().await; let thename = proxy.tell_me_your_name("Alice".to_string()).await.unwrap(); println!("The actor replied with name: \"{}\"", thename); match do_that_res { Ok(Ok(_)) => println!("do_that succeeded"), Ok(Err(e)) => println!("do_that failed: {:?}", e), Err(e) => println!("do_that failed: {:?}", e), } tokio::time::sleep(std::time::Duration::from_secs(2)).await; }