use coerce::actor::context::ActorContext; use coerce::actor::{Actor, ActorCreationErr, ActorFactory, ActorRecipe}; use std::collections::HashMap; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering::Relaxed; use std::sync::Arc; #[derive(Clone)] pub struct ShardedActor { id: String, actor_started: Arc, actor_stopped: Arc, actor_dropped: Arc, } #[async_trait] impl Actor for ShardedActor { async fn stopped(&mut self, _ctx: &mut ActorContext) { self.actor_stopped.store(true, Relaxed); } } pub struct ShardedActorRecipe { id: String, } impl ActorRecipe for ShardedActorRecipe { fn read_from_bytes(bytes: &Vec) -> Option { Some(Self { id: String::from_utf8(bytes.clone()).unwrap(), }) } fn write_to_bytes(&self) -> Option> { Some(self.id.clone().into_bytes()) } } pub struct ShardedActorFactory { state_store: Arc>, } impl Clone for ShardedActorFactory { fn clone(&self) -> Self { Self { state_store: self.state_store.clone(), } } } impl ShardedActorFactory { pub fn new(actors: Vec) -> Self { Self { state_store: Arc::new(actors.into_iter().map(|a| (a.id.clone(), a)).collect()), } } pub fn assert_actor_started(&self, id: &str) { let actor_started = self .state_store .get(id) .unwrap() .actor_started .load(Relaxed); assert!(actor_started) } pub fn assert_actor_stopped(&self, id: &str) { let actor_stopped = self .state_store .get(id) .unwrap() .actor_stopped .load(Relaxed); assert!(actor_stopped) } pub fn assert_actor_dropped(&self, id: &str) { let actor_dropped = self .state_store .get(id) .unwrap() .actor_dropped .load(Relaxed); assert!(actor_dropped) } } #[async_trait] impl ActorFactory for ShardedActorFactory { type Actor = ShardedActor; type Recipe = ShardedActorRecipe; async fn create(&self, recipe: ShardedActorRecipe) -> Result { self.state_store.get(&recipe.id).cloned().ok_or_else(|| { ActorCreationErr::InvalidRecipe(format!( "no actor with id `{}` in state_store", &recipe.id )) }) } } impl Drop for ShardedActor { fn drop(&mut self) { self.actor_dropped.store(true, Relaxed); } }