#[macro_use] extern crate riker_testkit; use riker::actors::*; use riker_testkit::probe::channel::{probe, ChannelProbe}; use riker_testkit::probe::{Probe, ProbeReceive}; #[derive(Clone, Debug)] pub struct Panic; #[derive(Clone, Debug)] pub struct TestProbe(ChannelProbe<(), ()>); #[derive(Default)] struct DumbActor; impl Actor for DumbActor { type Msg = (); fn recv(&mut self, _: &Context, _: Self::Msg, _: Sender) {} } #[actor(TestProbe, Panic)] #[derive(Default)] struct PanicActor; impl Actor for PanicActor { type Msg = PanicActorMsg; fn pre_start(&mut self, ctx: &Context) { ctx.actor_of::("child_a").unwrap(); ctx.actor_of::("child_b").unwrap(); ctx.actor_of::("child_c").unwrap(); ctx.actor_of::("child_d").unwrap(); } fn recv(&mut self, ctx: &Context, msg: Self::Msg, sender: Sender) { self.receive(ctx, msg, sender); } } impl Receive for PanicActor { type Msg = PanicActorMsg; fn receive(&mut self, _ctx: &Context, msg: TestProbe, _sender: Sender) { msg.0.event(()); } } impl Receive for PanicActor { type Msg = PanicActorMsg; fn receive(&mut self, _ctx: &Context, _msg: Panic, _sender: Sender) { panic!("// TEST PANIC // TEST PANIC // TEST PANIC //"); } } // Test Restart Strategy #[actor(TestProbe, Panic)] #[derive(Default)] struct RestartSup { actor_to_fail: Option>, } impl Actor for RestartSup { type Msg = RestartSupMsg; fn pre_start(&mut self, ctx: &Context) { self.actor_to_fail = ctx.actor_of::("actor-to-fail").ok(); } fn supervisor_strategy(&self) -> Strategy { Strategy::Restart } fn recv(&mut self, ctx: &Context, msg: Self::Msg, sender: Sender) { self.receive(ctx, msg, sender) } } impl Receive for RestartSup { type Msg = RestartSupMsg; fn receive(&mut self, _ctx: &Context, msg: TestProbe, sender: Sender) { self.actor_to_fail.as_ref().unwrap().tell(msg, sender); } } impl Receive for RestartSup { type Msg = RestartSupMsg; fn receive(&mut self, _ctx: &Context, _msg: Panic, _sender: Sender) { self.actor_to_fail.as_ref().unwrap().tell(Panic, None); } } #[test] fn supervision_restart_failed_actor() { let sys = ActorSystem::new().unwrap(); for i in 0..100 { let sup = sys .actor_of::(&format!("supervisor_{}", i)) .unwrap(); // Make the test actor panic sup.tell(Panic, None); let (probe, listen) = probe::<()>(); sup.tell(TestProbe(probe), None); p_assert_eq!(listen, ()); } } // Test Escalate Strategy #[actor(TestProbe, Panic)] #[derive(Default)] struct EscalateSup { actor_to_fail: Option>, } impl Actor for EscalateSup { type Msg = EscalateSupMsg; fn pre_start(&mut self, ctx: &Context) { self.actor_to_fail = ctx.actor_of::("actor-to-fail").ok(); } fn supervisor_strategy(&self) -> Strategy { Strategy::Escalate } fn recv(&mut self, ctx: &Context, msg: Self::Msg, sender: Sender) { self.receive(ctx, msg, sender); // match msg { // // We just resend the messages to the actor that we're concerned about testing // TestMsg::Panic => self.actor_to_fail.try_tell(msg, None).unwrap(), // TestMsg::Probe(_) => self.actor_to_fail.try_tell(msg, None).unwrap(), // }; } } impl Receive for EscalateSup { type Msg = EscalateSupMsg; fn receive(&mut self, _ctx: &Context, msg: TestProbe, sender: Sender) { self.actor_to_fail.as_ref().unwrap().tell(msg, sender); } } impl Receive for EscalateSup { type Msg = EscalateSupMsg; fn receive(&mut self, _ctx: &Context, _msg: Panic, _sender: Sender) { self.actor_to_fail.as_ref().unwrap().tell(Panic, None); } } #[actor(TestProbe, Panic)] #[derive(Default)] struct EscRestartSup { escalator: Option>, } impl Actor for EscRestartSup { type Msg = EscRestartSupMsg; fn pre_start(&mut self, ctx: &Context) { self.escalator = ctx.actor_of::("escalate-supervisor").ok(); } fn recv(&mut self, ctx: &Context, msg: Self::Msg, sender: Sender) { self.receive(ctx, msg, sender); // match msg { // // We resend the messages to the parent of the actor that is/has panicked // TestMsg::Panic => self.escalator.try_tell(msg, None).unwrap(), // TestMsg::Probe(_) => self.escalator.try_tell(msg, None).unwrap(), // }; } fn supervisor_strategy(&self) -> Strategy { Strategy::Restart } } impl Receive for EscRestartSup { type Msg = EscRestartSupMsg; fn receive(&mut self, _ctx: &Context, msg: TestProbe, sender: Sender) { self.escalator.as_ref().unwrap().tell(msg, sender); } } impl Receive for EscRestartSup { type Msg = EscRestartSupMsg; fn receive(&mut self, _ctx: &Context, _msg: Panic, _sender: Sender) { self.escalator.as_ref().unwrap().tell(Panic, None); } } #[test] fn supervision_escalate_failed_actor() { let sys = ActorSystem::new().unwrap(); let sup = sys.actor_of::("supervisor").unwrap(); // Make the test actor panic sup.tell(Panic, None); let (probe, listen) = probe::<()>(); std::thread::sleep(std::time::Duration::from_millis(2000)); sup.tell(TestProbe(probe), None); p_assert_eq!(listen, ()); sys.print_tree(); }