use coerce::actor::context::ActorContext; use coerce::actor::message::Handler; use coerce::actor::system::ActorSystem; use coerce::actor::{ActorRefErr, IntoActor, ToActorId}; use coerce::persistent::journal::provider::StorageProvider; use coerce::persistent::journal::storage::{JournalEntry, JournalStorage, JournalStorageRef}; use coerce::persistent::journal::types::JournalTypes; use coerce::persistent::{ PersistFailurePolicy, Persistence, PersistentActor, Recover, RecoveryFailurePolicy, Retry, }; use coerce_macros::JsonMessage; use parking_lot::Mutex; use serde::{Deserialize, Serialize}; use std::collections::VecDeque; use std::error::Error; use std::fmt::{Debug, Display, Formatter}; use std::sync::Arc; use tracing::info; #[macro_use] extern crate serde; #[macro_use] extern crate async_trait; pub mod util; #[derive(Default)] struct PersistenceState { messages: Option>, snapshot: Option, next_errors: VecDeque, } #[derive(Default)] pub struct MockPersistence { state: Mutex, } struct Provider(Arc); impl StorageProvider for Provider { fn journal_storage(&self) -> Option { Some(self.0.clone()) } } struct TestActor { recovery_policy: RecoveryFailurePolicy, persist_policy: PersistFailurePolicy, recovered_message: Option, } #[derive(JsonMessage, Serialize, Deserialize)] #[result("()")] struct Message; impl PersistentActor for TestActor { fn configure(types: &mut JournalTypes) { types.message::("Message"); } fn recovery_failure_policy(&self) -> RecoveryFailurePolicy { self.recovery_policy } fn persist_failure_policy(&self) -> PersistFailurePolicy { self.persist_policy } } #[async_trait] impl Recover for TestActor { async fn recover(&mut self, message: Message, _ctx: &mut ActorContext) { self.recovered_message = Some(message); } } #[async_trait] impl Handler for TestActor { async fn handle(&mut self, message: Message, ctx: &mut ActorContext) { if self.persist(&message, ctx).await.is_ok() { info!("received message"); } } } #[derive(Debug)] struct MockErr; impl Display for MockErr { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!(f, "Mock error") } } impl Error for MockErr {} #[tokio::test] pub async fn test_persistent_actor_recovery_failure_retry_until_success() { util::create_trace_logger(); let persistence = Arc::new(MockPersistence::default()); let provider = Provider(persistence.clone()); let system = ActorSystem::new().to_persistent(Persistence::from(provider)); persistence.set_next_err(MockErr.into()); persistence.set_next_err(MockErr.into()); persistence.set_next_err(MockErr.into()); persistence.set_next_err(MockErr.into()); persistence.set_next_err(MockErr.into()); let _actor = TestActor { recovery_policy: RecoveryFailurePolicy::Retry(Retry::UntilSuccess { delay: None }), persist_policy: Default::default(), recovered_message: None, } .into_actor(Some("TestActor".to_actor_id()), &system) .await .unwrap(); info!("actor created") } #[tokio::test] pub async fn test_persistent_actor_recovery_failure_stop_actor() { util::create_trace_logger(); let persistence = Arc::new(MockPersistence::default()); let provider = Provider(persistence.clone()); let system = ActorSystem::new().to_persistent(Persistence::from(provider)); persistence.set_next_err(MockErr.into()); let actor = TestActor { recovery_policy: RecoveryFailurePolicy::StopActor, persist_policy: Default::default(), recovered_message: None, } .into_actor(Some("TestActor".to_actor_id()), &system) .await; assert_eq!(actor.unwrap_err(), ActorRefErr::ActorStartFailed) } #[tokio::test] pub async fn test_persistent_actor_persist_failure_panic() { util::create_trace_logger(); let persistence = Arc::new(MockPersistence::default()); let provider = Provider(persistence.clone()); let system = ActorSystem::new().to_persistent(Persistence::from(provider)); let actor = TestActor { recovery_policy: Default::default(), persist_policy: PersistFailurePolicy::Panic, recovered_message: None, } .into_actor(Some("TestActor".to_actor_id()), &system) .await; let actor = actor.unwrap(); assert!(actor.status().await.is_ok()); persistence.set_next_err(MockErr.into()); let result = actor.send(Message).await; assert!(result.is_err()); } #[tokio::test] pub async fn test_persistent_actor_persist_failure_retry_until_success() { util::create_trace_logger(); let persistence = Arc::new(MockPersistence::default()); let provider = Provider(persistence.clone()); let system = ActorSystem::new().to_persistent(Persistence::from(provider)); let actor = TestActor { recovery_policy: Default::default(), persist_policy: PersistFailurePolicy::Retry(Retry::UntilSuccess { delay: None }), recovered_message: None, } .into_actor(Some("TestActor".to_actor_id()), &system) .await; let actor = actor.unwrap(); assert!(actor.status().await.is_ok()); persistence.set_next_err(MockErr.into()); persistence.set_next_err(MockErr.into()); persistence.set_next_err(MockErr.into()); persistence.set_next_err(MockErr.into()); persistence.set_next_err(MockErr.into()); let result = actor.send(Message).await; assert!(result.is_ok()); } impl MockPersistence { pub fn set_next_err(&self, err: anyhow::Error) { let mut state = self.state.lock(); state.next_errors.push_back(err); } } #[async_trait] impl JournalStorage for MockPersistence { async fn write_snapshot( &self, _persistence_id: &str, _entry: JournalEntry, ) -> anyhow::Result<()> { let mut state = self.state.lock(); if let Some(error) = state.next_errors.pop_front() { Err(error) } else { Ok(()) } } async fn write_message( &self, _persistence_id: &str, _entry: JournalEntry, ) -> anyhow::Result<()> { let mut state = self.state.lock(); if let Some(error) = state.next_errors.pop_front() { Err(error) } else { Ok(()) } } async fn write_message_batch( &self, persistent_id: &str, entries: Vec, ) -> anyhow::Result<()> { todo!() } async fn read_latest_snapshot( &self, _persistence_id: &str, ) -> anyhow::Result> { let mut state = self.state.lock(); if let Some(error) = state.next_errors.pop_front() { Err(error) } else { Ok(state.snapshot.clone()) } } async fn read_latest_messages( &self, _persistence_id: &str, _from_sequence: i64, ) -> anyhow::Result>> { let mut state = self.state.lock(); if let Some(error) = state.next_errors.pop_front() { Err(error) } else { Ok(state.messages.clone()) } } async fn read_message( &self, persistence_id: &str, sequence_id: i64, ) -> anyhow::Result> { todo!() } async fn read_messages( &self, persistence_id: &str, from_sequence: i64, to_sequence: i64, ) -> anyhow::Result>> { todo!() } async fn delete_messages_to( &self, persistence_id: &str, to_sequence: i64, ) -> anyhow::Result<()> { todo!() } async fn delete_all(&self, _persistence_id: &str) -> anyhow::Result<()> { Ok(()) } }