use aktrs::actor::{self, testkit::ActorTestKit, Behaviors, Pid, SpawnOptions}; use aktrs::reexport::anyhow::bail; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{Arc, Barrier}; use std::thread; use std::time::Duration; #[aktrs::test] fn behaviors_deferred(mut t: ActorTestKit) -> aktrs::Result<()> { static OK: AtomicBool = AtomicBool::new(false); let b = Arc::new(Barrier::new(2)); let b2 = Arc::clone(&b); let actor = Behaviors::with_context(move |_: &mut actor::Context<()>| { OK.store(true, Ordering::Relaxed); b2.wait(); Ok(Behaviors::stopped()) }); t.spawn(actor, SpawnOptions::default()); b.wait(); assert!(OK.load(Ordering::Relaxed)); Ok(()) } #[aktrs::test] fn behaviors_receive_message(mut t: ActorTestKit) -> aktrs::Result<()> { static OK: AtomicBool = AtomicBool::new(false); let b = Arc::new(Barrier::new(2)); let b2 = Arc::clone(&b); let actor = Behaviors::receive_message(move |_, _: ()| { OK.store(true, Ordering::Relaxed); b2.wait(); Ok(Behaviors::stopped()) }); let mut pid = t.spawn(actor, SpawnOptions::default()); t.block_on(pid.tell(()))?; b.wait(); assert!(OK.load(Ordering::Relaxed)); Ok(()) } #[aktrs::test] fn behaviors_receive_message_once(mut t: ActorTestKit) -> aktrs::Result<()> { static OK: AtomicBool = AtomicBool::new(false); let b = Arc::new(Barrier::new(2)); let b2 = Arc::clone(&b); let actor = Behaviors::receive_message_once(move |_, _: ()| { OK.store(true, Ordering::Relaxed); b2.wait(); Ok(Behaviors::stopped()) }); let mut pid = t.spawn(actor, SpawnOptions::default()); t.block_on(pid.tell(()))?; b.wait(); assert!(OK.load(Ordering::Relaxed)); Ok(()) } #[aktrs::test] #[should_panic] #[ignore] // FIXME fn behaviors_receive_message_once_improper_usage(mut t: ActorTestKit) -> aktrs::Result<()> { let actor = Behaviors::receive_message_once(move |_, _: ()| { Ok(Behaviors::same()) }); let mut pid = t.spawn(actor, SpawnOptions::default()); t.block_on(pid.tell(()))?; t.block_on(pid.tell(()))?; Ok(()) } #[aktrs::test] fn behaviors_deferred_receive_message(mut t: ActorTestKit) -> aktrs::Result<()> { static OK: AtomicBool = AtomicBool::new(false); let b = Arc::new(Barrier::new(2)); let b2 = Arc::clone(&b); let actor = Behaviors::with_context(|_| { Ok(Behaviors::receive_message(move |_, _: ()| { OK.store(true, Ordering::Relaxed); b2.wait(); Ok(Behaviors::stopped()) }) .into()) }); let mut pid = t.spawn(actor, SpawnOptions::default()); t.block_on(pid.tell(()))?; b.wait(); assert!(OK.load(Ordering::Relaxed)); Ok(()) } #[aktrs::test] fn behaviors_same(mut t: ActorTestKit) -> aktrs::Result<()> { static COUNT: AtomicUsize = AtomicUsize::new(0); let b = Arc::new(Barrier::new(2)); let b2 = Arc::clone(&b); let actor = Behaviors::receive_message(move |_, _: ()| { COUNT.fetch_add(1, Ordering::Relaxed); b2.wait(); Ok(Behaviors::same()) }); let mut pid = t.spawn(actor, SpawnOptions::default()); t.block_on(pid.tell(()))?; b.wait(); assert_eq!(COUNT.load(Ordering::Relaxed), 1); t.block_on(pid.tell(()))?; b.wait(); assert_eq!(COUNT.load(Ordering::Relaxed), 2); Ok(()) } #[aktrs::test] fn behaviors_deferred_same(mut t: ActorTestKit) -> aktrs::Result<()> { static COUNT: AtomicUsize = AtomicUsize::new(0); let b = Arc::new(Barrier::new(2)); let b2 = Arc::clone(&b); let actor = Behaviors::receive_message(move |_, _: ()| { COUNT.fetch_add(1, Ordering::Relaxed); b2.wait(); Ok(Behaviors::with_context(|_| Ok(Behaviors::same()))) }); let mut pid = t.spawn(actor, SpawnOptions::default()); t.block_on(pid.tell(()))?; b.wait(); assert_eq!(COUNT.load(Ordering::Relaxed), 1); t.block_on(pid.tell(()))?; b.wait(); assert_eq!(COUNT.load(Ordering::Relaxed), 2); Ok(()) } #[aktrs::test] #[should_panic] fn behaviors_unhandled_child_failed(mut t: ActorTestKit) -> aktrs::Result<()> { let actor = Behaviors::<()>::with_context(|_| bail!("goodbye, world")); t.spawn(actor, SpawnOptions::default()); Ok(()) } #[aktrs::test] fn behaviors_receive_signal_child_failed(mut t: ActorTestKit) -> aktrs::Result<()> { static OK: AtomicBool = AtomicBool::new(false); let b = Arc::new(Barrier::new(2)); let b2 = Arc::clone(&b); let actor = Behaviors::with_context(|cx| { let child = Behaviors::with_context(|_: &mut actor::Context<()>| { bail!("goodbye, world"); }); cx.spawn(child, SpawnOptions::default()); Ok(Behaviors::receive_signal(move |_, s| match s { actor::Signal::ChildFailed(_, _) => { OK.store(true, Ordering::Relaxed); b2.wait(); Ok(Behaviors::stopped()) } other => panic!("received an unexpected signal: {:?}", other), }) .into()) }); let mut pid = t.spawn(actor, SpawnOptions::default()); t.block_on(pid.tell(()))?; b.wait(); assert!(OK.load(Ordering::Relaxed)); Ok(()) } #[aktrs::test] fn ask_ok(mut t: ActorTestKit) -> aktrs::Result<()> { struct Ping(&'static str, Pid); struct Pong(Option<&'static str>); let server = Behaviors::receive_message(|_, Ping(msg, mut pid): Ping| { let _ = pid.try_tell(Pong(Some(msg))); Ok(Behaviors::stopped()) }); let server_pid = t.spawn(server, SpawnOptions::default()); let actor = Behaviors::with_context(move |cx| { let timeout = Duration::from_millis(50); let create_request = |tx| Ping("hello, world!", tx); cx.ask(server_pid, timeout, create_request, |result| result.unwrap_or(Pong(None))); Ok(Behaviors::receive_message(|_, Pong(message): Pong| { assert_eq!(message, Some("hello, world!")); Ok(Behaviors::stopped()) }) .into()) }); t.spawn(actor, SpawnOptions::default()); Ok(()) } #[aktrs::test] fn ask_timeout(mut t: ActorTestKit) -> aktrs::Result<()> { struct Ping(&'static str, Pid); struct Pong(Option<&'static str>); let server = Behaviors::receive_message(|_, _msg: Ping| Ok(Behaviors::unhandled())); let server_pid = t.spawn(server, SpawnOptions::default()); let actor = Behaviors::with_context(move |cx| { let timeout = Duration::from_millis(50); let create_request = |tx| Ping("hello, world!", tx); cx.ask(server_pid, timeout, create_request, |result| result.unwrap_or(Pong(None))); Ok(Behaviors::receive_message(|_, Pong(message): Pong| { assert!(message.is_none()); Ok(Behaviors::stopped()) }) .into()) }); t.spawn(actor, SpawnOptions::default()); thread::sleep(Duration::from_millis(100)); Ok(()) }