use aktrs::actor::{testkit::ActorTestKit, SpawnOptions}; use aktrs::util::event::{self, EventBus}; #[derive(Debug, Copy, Clone, Eq, PartialEq)] struct Message(&'static str, &'static [u8]); fn hello() -> Message { Message("hello", b"world") } fn goodbye() -> Message { Message("goodbye", b"world") } fn classifier() -> event::Classifier { event::Classifier::new(|Message(key, _): &Message| *key == "hello") } fn retainer() -> impl event::Retainer { struct HelloRetainer(Option); impl event::Retainer for HelloRetainer { type Event = Message; fn hint(&mut self, Message(key, payload): &Message) { if *key == "hello" { self.0 = Some(Message(key, payload)); } } fn get_retained(&self) -> Vec { self.0.iter().copied().collect() } } HelloRetainer(None) } #[aktrs::test] fn event_bus_subscribe_with_classifier(mut t: ActorTestKit) -> aktrs::Result<()> { let mut event_bus = t.spawn(EventBus::actor(), SpawnOptions::default()); t.block_on(event_bus.tell(event::Command::Publish(hello())))?; let mut probe = t.create_anonymous_test_probe(); let pid = probe.pid(); t.block_on(event_bus.tell(event::Command::Subscribe(pid.clone(), classifier())))?; probe.expect_no_message(); t.block_on(event_bus.tell(event::Command::Publish(hello())))?; probe.expect_message(hello()); t.block_on(event_bus.tell(event::Command::Publish(goodbye())))?; probe.expect_no_message(); t.block_on(event_bus.tell(event::Command::Unsubscribe(pid.into_unspecified())))?; t.block_on(event_bus.tell(event::Command::Publish(hello())))?; probe.expect_no_message(); Ok(()) } #[aktrs::test] fn event_bus_with_retention_subscribe_with_classifier(mut t: ActorTestKit) -> aktrs::Result<()> { let mut event_bus = t.spawn(EventBus::actor_with_retention(retainer()), SpawnOptions::default()); t.block_on(event_bus.tell(event::Command::Publish(hello())))?; let mut probe = t.create_anonymous_test_probe(); let pid = probe.pid(); t.block_on(event_bus.tell(event::Command::Subscribe(pid.clone(), classifier())))?; probe.expect_message(hello()); t.block_on(event_bus.tell(event::Command::Publish(hello())))?; probe.expect_message(hello()); t.block_on(event_bus.tell(event::Command::Publish(goodbye())))?; probe.expect_no_message(); t.block_on(event_bus.tell(event::Command::Unsubscribe(pid.into_unspecified())))?; t.block_on(event_bus.tell(event::Command::Publish(hello())))?; probe.expect_no_message(); Ok(()) } #[aktrs::test] fn event_bus_subscribe_to_all(mut t: ActorTestKit) -> aktrs::Result<()> { let mut event_bus = t.spawn(EventBus::actor(), SpawnOptions::default()); t.block_on(event_bus.tell(event::Command::Publish(hello())))?; let mut probe = t.create_anonymous_test_probe(); let pid = probe.pid(); t.block_on(event_bus.tell(event::Command::SubscribeToAll(pid.clone())))?; probe.expect_no_message(); t.block_on(event_bus.tell(event::Command::Publish(hello())))?; probe.expect_message(hello()); t.block_on(event_bus.tell(event::Command::Publish(goodbye())))?; probe.expect_message(goodbye()); t.block_on(event_bus.tell(event::Command::Unsubscribe(pid.into_unspecified())))?; t.block_on(event_bus.tell(event::Command::Publish(hello())))?; probe.expect_no_message(); Ok(()) } #[aktrs::test] fn event_bus_with_retention_subscribe_to_all(mut t: ActorTestKit) -> aktrs::Result<()> { let mut event_bus = t.spawn(EventBus::actor_with_retention(retainer()), SpawnOptions::default()); t.block_on(event_bus.tell(event::Command::Publish(hello())))?; let mut probe = t.create_anonymous_test_probe(); let pid = probe.pid(); t.block_on(event_bus.tell(event::Command::SubscribeToAll(pid.clone())))?; probe.expect_message(hello()); t.block_on(event_bus.tell(event::Command::Publish(goodbye())))?; probe.expect_message(goodbye()); t.block_on(event_bus.tell(event::Command::Unsubscribe(pid.into_unspecified())))?; t.block_on(event_bus.tell(event::Command::Publish(hello())))?; probe.expect_no_message(); Ok(()) } #[aktrs::test] fn event_bus_unsubscribe_does_not_affect_other_actors(mut t: ActorTestKit) -> aktrs::Result<()> { let mut event_bus = t.spawn(EventBus::actor(), SpawnOptions::default()); let mut probe1 = t.create_anonymous_test_probe(); let mut probe2 = t.create_anonymous_test_probe(); t.block_on(event_bus.tell(event::Command::SubscribeToAll(probe1.pid())))?; t.block_on(event_bus.tell(event::Command::SubscribeToAll(probe2.pid())))?; t.block_on(event_bus.tell(event::Command::Publish(())))?; probe1.expect_message(()); probe2.expect_message(()); t.block_on(event_bus.tell(event::Command::Unsubscribe(probe1.pid().into_unspecified())))?; t.block_on(event_bus.tell(event::Command::Publish(())))?; probe1.expect_no_message(); probe2.expect_message(()); Ok(()) }