pub mod util; #[macro_use] extern crate serde; #[macro_use] extern crate async_trait; #[macro_use] extern crate coerce_macros; use coerce::actor::system::ActorSystem; use coerce::remote::system::RemoteActorSystem; use coerce::actor::{ActorCreationErr, ActorFactory, ActorRecipe}; use util::*; #[derive(Serialize, Deserialize)] pub struct TestActorRecipe { name: String, } impl ActorRecipe for TestActorRecipe { fn read_from_bytes(bytes: &Vec) -> Option { serde_json::from_slice(bytes).unwrap() } fn write_to_bytes(&self) -> Option> { serde_json::to_vec(&self).ok() } } #[derive(Clone)] pub struct TestActorFactory; #[async_trait] impl ActorFactory for TestActorFactory { type Actor = TestActor; type Recipe = TestActorRecipe; async fn create(&self, _recipe: Self::Recipe) -> Result { tracing::trace!("recipe create :D"); // could do some mad shit like look in the db for the user data etc, if fails - fail the actor creation Ok(TestActor { status: None, counter: 0, }) } } #[derive(Serialize, Deserialize)] pub struct EchoActorRecipe {} impl ActorRecipe for EchoActorRecipe { fn read_from_bytes(bytes: &Vec) -> Option { serde_json::from_slice(bytes).unwrap() } fn write_to_bytes(&self) -> Option> { serde_json::to_vec(&self).ok() } } #[derive(Clone)] pub struct EchoActorFactory; #[async_trait] impl ActorFactory for EchoActorFactory { type Actor = EchoActor; type Recipe = EchoActorRecipe; async fn create(&self, _recipe: Self::Recipe) -> Result { tracing::trace!("recipe create :D"); // could do some mad shit like look in the db for the user data etc, if fails - fail the actor creation Ok(EchoActor {}) } } #[tokio::test(flavor = "multi_thread", worker_threads = 6)] pub async fn test_remote_cluster_workers() { util::create_trace_logger(); let system = ActorSystem::new(); let _actor = system.new_tracked_actor(TestActor::new()).await.unwrap(); let remote = RemoteActorSystem::builder() .with_tag("remote-1") .with_id(1) .with_actor_system(system) .client_auth_jwt("token2", None) .build() .await; let remote_c = remote.clone(); let remote_2 = RemoteActorSystem::builder() .with_tag("remote-2") .with_id(2) .with_actor_system(ActorSystem::new()) .client_auth_jwt("token2", None) .build() .await; let remote_2_c = remote_2.clone(); let remote_3 = RemoteActorSystem::builder() .with_tag("remote-3") .with_id(3) .with_actor_system(ActorSystem::new()) .client_auth_jwt("token2", None) .build() .await; let remote_3_c = remote_3.clone(); remote .cluster_worker() .listen_addr("localhost:30101") .start() .await; remote_2 .cluster_worker() .listen_addr("localhost:30102") .with_seed_addr("localhost:30101") .start() .await; remote_3 .cluster_worker() .listen_addr("localhost:30103") .with_seed_addr("localhost:30101") .start() .await; let nodes_a = remote_c.get_nodes().await; let nodes_b = remote_2_c.get_nodes().await; let nodes_c = remote_3_c.get_nodes().await; tracing::info!("a: {:?}", &nodes_a); tracing::info!("b: {:?}", &nodes_b); tracing::info!("c: {:?}", &nodes_c); assert_eq!(nodes_a.len(), 3); assert_eq!(nodes_b.len(), 3); assert_eq!(nodes_c.len(), 3); let nodes_a_in_b = nodes_a.iter().filter(|n| nodes_b.contains(n)).count(); let nodes_a_in_c = nodes_a.iter().filter(|n| nodes_c.contains(n)).count(); let nodes_b_in_a = nodes_b.iter().filter(|n| nodes_a.contains(n)).count(); let nodes_b_in_c = nodes_b.iter().filter(|n| nodes_c.contains(n)).count(); let nodes_c_in_a = nodes_c.iter().filter(|n| nodes_a.contains(n)).count(); let nodes_c_in_b = nodes_c.iter().filter(|n| nodes_b.contains(n)).count(); assert_eq!(nodes_a_in_b, nodes_a.len()); assert_eq!(nodes_a_in_c, nodes_a.len()); assert_eq!(nodes_b_in_a, nodes_b.len()); assert_eq!(nodes_b_in_c, nodes_b.len()); assert_eq!(nodes_c_in_a, nodes_c.len()); assert_eq!(nodes_c_in_b, nodes_c.len()); }