use coerce::actor::context::ActorContext; use coerce::actor::describe::Describe; use coerce::actor::message::{Handler, Message}; use coerce::actor::system::ActorSystem; use coerce::actor::{Actor, ActorId, CoreActorRef, IntoActor, IntoActorId}; use std::time::Duration; use tokio::sync::oneshot; use tokio::time::timeout; pub mod util; #[macro_use] extern crate tracing; #[macro_use] extern crate async_trait; #[macro_use] extern crate serde; struct TestSupervisor { count: usize, max_depth: usize, all_child_actors_stopped: Option>, } struct StopAll; impl Message for StopAll { type Result = (); } #[async_trait] impl Actor for TestSupervisor { async fn started(&mut self, ctx: &mut ActorContext) { for i in 0..self.count { let _child = ctx .spawn( format!("spawned-{i}").into_actor_id(), SpawnedActor { depth: 1, max_depth: self.max_depth, }, ) .await .unwrap(); } } async fn on_child_stopped(&mut self, id: &ActorId, ctx: &mut ActorContext) { info!("child stopped (id={})", &id); if ctx.supervised_count() == 0 { if let Some(cb) = self.all_child_actors_stopped.take() { let _ = cb.send(id.clone()); } } } } #[async_trait] impl Handler for TestSupervisor { async fn handle(&mut self, _: StopAll, ctx: &mut ActorContext) { if let Some(supervised) = ctx.supervised() { for child in &supervised.children { let _ = child.1.actor_ref().stop().await; } } } } struct SpawnedActor { depth: usize, max_depth: usize, } #[async_trait] impl Actor for SpawnedActor { async fn started(&mut self, ctx: &mut ActorContext) { if self.depth > self.max_depth { return; } let _child = ctx .spawn( format!("spawned-{}", self.depth).into_actor_id(), SpawnedActor { depth: self.depth + 1, max_depth: self.max_depth, }, ) .await .unwrap(); } } #[tokio::test] pub async fn test_actor_child_spawn_and_stop() { util::create_trace_logger(); const EXPECTED_ACTOR_COUNT: usize = 10; const EXPECTED_DEPTH: usize = 10; let system = ActorSystem::new(); let actor_id = "actor".to_string(); let (all_child_actors_stopped, on_all_child_actors_stopped) = oneshot::channel(); let test_actor = TestSupervisor { count: EXPECTED_ACTOR_COUNT, max_depth: EXPECTED_DEPTH, all_child_actors_stopped: Some(all_child_actors_stopped), } .into_actor(Some(actor_id), &system) .await .expect("create actor"); let (tx, rx) = oneshot::channel(); let _ = test_actor.describe(Describe { sender: Some(tx), current_depth: 0, ..Default::default() }); let actor_description = rx.await.unwrap(); info!("{:#?}", actor_description); let actor_count = actor_description .supervised .map_or(0, |s| s.actors.iter().filter(|n| n.is_ok()).count()); assert_eq!(EXPECTED_ACTOR_COUNT, actor_count); let _ = test_actor.notify(StopAll); timeout(Duration::from_secs(5), async move { on_all_child_actors_stopped.await }) .await .expect("parent didn't receive the child-terminated notification"); let (tx, rx) = oneshot::channel(); let _ = test_actor.describe(Describe { sender: Some(tx), current_depth: 0, ..Default::default() }); let x = rx.await; info!("{:#?}", x); let actor_count = x .unwrap() .supervised .map_or(0, |s| s.actors.iter().filter(|n| n.is_ok()).count()); assert_eq!(0, actor_count); system.shutdown().await; }