use futures::{future, prelude::*}; use holly::{Error, prelude::*}; use tokio_threadpool::ThreadPool; enum Message { Ping(usize, Addr), Pong(usize, Addr), Fin } struct Player; impl Actor for Player { type Result = Box, Error = Error> + Send>; fn process(self, ctx: &mut Context, msg: Option) -> Self::Result { match msg { Some(Message::Ping(0, p)) => { println!("{}: Ping(0)", ctx.id()); Box::new(p.send(Message::Fin).map(move |_| State::Done)) } Some(Message::Pong(0, p)) => { println!("{}: Pong(0)", ctx.id()); Box::new(p.send(Message::Fin).map(move |_| State::Done)) } Some(Message::Ping(n, p)) => { println!("{}: Ping({})", ctx.id(), n); let a = ctx.address(); Box::new(p.send(Message::Pong(n - 1, a)).map(move |_| State::Ready(self))) } Some(Message::Pong(n, p)) => { println!("{}: Pong({})", ctx.id(), n); let a = ctx.address(); Box::new(p.send(Message::Ping(n - 1, a)).map(move |_| State::Ready(self))) } Some(Message::Fin) => { println!("{}: Fin", ctx.id()); Box::new(future::ok(State::Done)) } None => { println!("{}: EOS", ctx.id()); Box::new(future::ok(State::Done)) } } } } #[test] fn ping_pong() -> Result<(), Error> { let pool = ThreadPool::new(); let exec = Scheduler::new(pool.sender().clone()); let mut a = exec.spawn(Player)?; let b = exec.spawn(Player)?; a.send_now(Message::Ping(9, b))?; pool.shutdown_on_idle().wait().unwrap(); Ok(()) } #[test] fn ping_pong_on_current_thread() -> Result<(), Error> { let mut runtime = tokio::runtime::current_thread::Runtime::new().unwrap(); let scheduler = Scheduler::single_threaded(runtime.handle()); let mut a = scheduler.spawn(Player)?; let b = scheduler.spawn(Player)?; a.send_now(Message::Ping(9, b))?; runtime.run().unwrap(); Ok(()) }