extern crate env_logger; extern crate robots; use std::any::Any; use std::sync::{Arc, Mutex}; use std::sync::mpsc::{channel, Sender}; use std::time::Duration; use robots::actors::{Actor, ActorSystem, ActorCell, ActorContext, ActorRef, Props}; #[derive(Debug, PartialEq)] enum Res { Ok, Err, } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// #[derive(Copy, Clone)] enum InternalStateMessage { Set(u32), Get, Panic, } struct InternalState { last: Mutex, sender: Arc>>, } impl Actor for InternalState { fn receive(&self, message: Box, context: ActorCell) { if let Ok(message) = Box::::downcast::(message) { match *message { InternalStateMessage::Get => { context.complete(context.sender(), *self.last.lock().unwrap()) } InternalStateMessage::Set(message) => { // Here mixing the test actor for the two tests might seem a bit weird, // but we would get two very similar actors otherwise. let mut last = self.last.lock().unwrap(); if message <= *last { let _ = self.sender.lock().unwrap().send(Res::Err); } else { *last = message; } if *last == 1000 { let _ = self.sender.lock().unwrap().send(Res::Ok); } } InternalStateMessage::Panic => { panic!("The actor panicked as it was asked to.") }, } } } } impl InternalState { fn new(sender: Arc>>) -> InternalState { InternalState { last: Mutex::new(0), sender: sender, } } } #[test] fn read_messages_in_order() { let actor_system = ActorSystem::new("test".to_owned()); actor_system.spawn_threads(9); let (tx, rx) = channel(); let tx = Arc::new(Mutex::new(tx)); let props = Props::new(Arc::new(InternalState::new), tx); let actor_ref_1 = actor_system.actor_of(props.clone(), "sender".to_owned()); let actor_ref_2 = actor_system.actor_of(props.clone(), "receiver".to_owned()); for i in 1..1001 { actor_ref_1.tell_to(actor_ref_2.clone(), InternalStateMessage::Set(i as u32)); } let res = rx.recv(); assert_eq!(Ok(Res::Ok), res); actor_system.shutdown(); } #[test] fn recover_from_panic() { let actor_system = ActorSystem::new("test".to_owned()); let (tx, _rx) = channel(); let tx = Arc::new(Mutex::new(tx)); let props = Props::new(Arc::new(InternalState::new), tx); let requester = actor_system.actor_of(props.clone(), "sender".to_owned()); let answerer = actor_system.actor_of(props.clone(), "receiver".to_owned()); requester.tell_to(answerer.clone(), InternalStateMessage::Set(10)); let res = actor_system.ask(answerer.clone(), InternalStateMessage::Get, "future_1".to_owned()); let res: u32 = actor_system.extract_result(res); assert_eq!(10u32, res); requester.tell_to(answerer.clone(), InternalStateMessage::Panic); let res = actor_system.ask(answerer, InternalStateMessage::Get, "future_2".to_owned()); let res: u32 = actor_system.extract_result(res); assert_eq!(0u32, res); actor_system.shutdown(); } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// struct Resolver; impl Actor for Resolver { fn receive(&self, message: Box, context: ActorCell) { if let Ok(message) = Box::::downcast::(message) { let future = context.identify_actor(*message, "resolver_request".to_owned()); context.forward_result_to_future::>(future, context.sender()); } } } impl Resolver { fn new(_dummy: ()) -> Resolver { Resolver } } #[test] fn resolve_name_real_path() { let actor_system = ActorSystem::new("test".to_owned()); let props = Props::new(Arc::new(Resolver::new), ()); let answerer = actor_system.actor_of(props.clone(), "answerer".to_owned()); let requester = actor_system.actor_of(props.clone(), "sender".to_owned()); // We wait to be sure that the actors will be registered to the name resolver. std::thread::sleep(Duration::from_millis(100)); let res = actor_system.ask(answerer, "/user/sender".to_owned(), "future".to_owned()); let res: Option = actor_system.extract_result(res); assert_eq!(requester.path(), res.unwrap().path()); actor_system.shutdown(); } #[test] fn resolve_name_fake_path() { let actor_system = ActorSystem::new("test".to_owned()); let props = Props::new(Arc::new(Resolver::new), ()); let answerer = actor_system.actor_of(props.clone(), "answerer".to_owned()); // We wait to be sure that the actors will be registered to the name resolver. std::thread::sleep(Duration::from_millis(100)); let res = actor_system.ask(answerer, "/foo/bar".to_owned(), "future".to_owned()); let res: Option = actor_system.extract_result(res); match res { None => {} Some(_) => panic!("The name resolver gave an ActorRef when he should not."), }; actor_system.shutdown(); } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// struct SimpleActor1 { sender: Arc>>, } impl Actor for SimpleActor1 { fn receive(&self, _message: Box, _context: ActorCell) { let _ = self.sender.lock().unwrap().send(Res::Ok); } } impl SimpleActor1 { fn new(sender: Arc>>) -> SimpleActor1 { SimpleActor1 { sender: sender, } } } #[test] fn receive_message () { env_logger::init().unwrap(); let actor_system = ActorSystem::new("test".to_owned()); let (tx, rx) = channel(); let tx = Arc::new(Mutex::new(tx)); let props = Props::new(Arc::new(SimpleActor1::new), tx); let actor_ref = actor_system.actor_of(props.clone(), "actor".to_owned()); actor_system.tell(actor_ref, ()); let res = rx.recv(); assert_eq!(Ok(Res::Ok), res); actor_system.shutdown(); } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// struct SimpleActor2 { sender: Arc>>, state: i32, } impl Actor for SimpleActor2 { fn receive(&self, message: Box, _context: ActorCell) { if let Ok(message) = Box::::downcast::(message) { if *message == self.state { let _ = self.sender.lock().unwrap().send(Res::Ok); } else { let _ = self.sender.lock().unwrap().send(Res::Err); } } } } impl SimpleActor2 { fn new(args: (Arc>>, i32)) -> SimpleActor2 { SimpleActor2 { sender: args.0, state: args.1, } } } #[test] fn receive_correct_message () { let actor_system = ActorSystem::new("test".to_owned()); let (tx, rx) = channel(); let tx = Arc::new(Mutex::new(tx)); let value = 42; let props = Props::new(Arc::new(SimpleActor2::new), (tx, value)); let actor_ref = actor_system.actor_of(props.clone(), "actor".to_owned()); actor_system.tell(actor_ref, value); let res = rx.recv(); assert_eq!(Ok(Res::Ok), res); actor_system.shutdown(); } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// struct SimpleActor3; impl Actor for SimpleActor3 { fn receive(&self, _message: Box, _context: ActorCell) { panic!("Panic as planned, should start an failure handler."); } } impl SimpleActor3 { fn new(_args: ()) -> SimpleActor3 { SimpleActor3 } } #[derive(Clone)] enum SimpleActor4Messages { RegisterMe, ActorFailure, } struct SimpleActor4 { sender: Arc>>, } impl Actor for SimpleActor4 { fn receive(&self, message: Box, context: ActorCell) { if let Ok(message) = Box::::downcast::(message) { match *message { SimpleActor4Messages::RegisterMe => { context.monitor(context.sender(), Arc::new(|_failure, context| { // When the monitored actor failed, the monitorer pipes a message to itself. context.tell(context.actor_ref(), SimpleActor4Messages::ActorFailure); })); context.tell(context.sender(), ()); }, SimpleActor4Messages::ActorFailure => { let _ = self.sender.lock().unwrap().send(Res::Ok); } } } } } impl SimpleActor4 { fn new(args: Arc>>) -> SimpleActor4 { SimpleActor4 { sender: args } } } #[test] fn receive_failure_notifications () { let actor_system = ActorSystem::new("test".to_owned()); let (tx, rx) = channel(); let tx = Arc::new(Mutex::new(tx)); let props_panicker = Props::new(Arc::new(SimpleActor3::new), ()); let props_handler = Props::new(Arc::new(SimpleActor4::new), tx); let panicker = actor_system.actor_of(props_panicker.clone(), "panicker".to_owned()); let handler = actor_system.actor_of(props_handler.clone(), "handler".to_owned()); panicker.tell_to(handler.clone(), SimpleActor4Messages::RegisterMe); let res = rx.recv(); assert_eq!(Ok(Res::Ok), res); actor_system.shutdown(); } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// struct SimpleActor5; impl Actor for SimpleActor5 { fn receive(&self, _message: Box, context: ActorCell) { context.fail("failure"); } } impl SimpleActor5 { fn new(_args: ()) -> SimpleActor5 { SimpleActor5 } } #[derive(Clone)] enum SimpleActor6Messages { RegisterMe, ActorFailureOk, ActorFailureErr, } struct SimpleActor6 { sender: Arc>>, } impl Actor for SimpleActor6 { fn receive(&self, message: Box, context: ActorCell) { if let Ok(message) = Box::::downcast::(message) { match *message { SimpleActor6Messages::RegisterMe => { context.monitor(context.sender(), Arc::new(|failure, context| { // When the monitored actor failed, the monitorer pipes a message to itself. match failure.reason() { "failure" => context.tell(context.actor_ref(), SimpleActor6Messages::ActorFailureOk), _ => context.tell(context.actor_ref(), SimpleActor6Messages::ActorFailureErr), }; })); context.tell(context.sender(), ()); }, SimpleActor6Messages::ActorFailureOk => { let _ = self.sender.lock().unwrap().send(Res::Ok); }, SimpleActor6Messages::ActorFailureErr => { let _ = self.sender.lock().unwrap().send(Res::Err); }, } } } } impl SimpleActor6 { fn new(args: Arc>>) -> SimpleActor6 { SimpleActor6 { sender: args } } } #[test] fn receive_failure_reason_ok () { let actor_system = ActorSystem::new("test".to_owned()); let (tx, rx) = channel(); let tx = Arc::new(Mutex::new(tx)); let props_panicker = Props::new(Arc::new(SimpleActor5::new), ()); let props_handler = Props::new(Arc::new(SimpleActor6::new), tx); let panicker = actor_system.actor_of(props_panicker.clone(), "panicker".to_owned()); let handler = actor_system.actor_of(props_handler.clone(), "handler".to_owned()); panicker.tell_to(handler.clone(), SimpleActor6Messages::RegisterMe); let res = rx.recv(); assert_eq!(Ok(Res::Ok), res); actor_system.shutdown(); } #[test] fn receive_failure_reason_err () { let actor_system = ActorSystem::new("test".to_owned()); let (tx, rx) = channel(); let tx = Arc::new(Mutex::new(tx)); // This actors does not fail with the "failure" reason, so the reason will not be correct let props_panicker = Props::new(Arc::new(SimpleActor3::new), ()); let props_handler = Props::new(Arc::new(SimpleActor6::new), tx); let panicker = actor_system.actor_of(props_panicker.clone(), "panicker".to_owned()); let handler = actor_system.actor_of(props_handler.clone(), "handler".to_owned()); panicker.tell_to(handler.clone(), SimpleActor6Messages::RegisterMe); let res = rx.recv(); assert_eq!(Ok(Res::Err), res); actor_system.shutdown(); }