use bytes::{BufMut, BytesMut}; use futures::{future::{self, Either}, prelude::*}; use holly::{actor, sink, stream, prelude::*}; use std::{io, net::SocketAddr}; use tokio::{ codec::{Encoder, Decoder, Framed}, net::{TcpListener, TcpStream}, runtime::Runtime }; // The data frame we exchange over TCP/IP between players. #[derive(Clone, Debug)] enum Frame { Ping(u8), Pong(u8), Finish } // The type which encodes and decodes `Frames` to bytes. struct Codec; impl Encoder for Codec { type Item = Frame; type Error = io::Error; fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { dst.reserve(2); match item { Frame::Ping(n) => dst.put(&[1, n][..]), Frame::Pong(n) => dst.put(&[2, n][..]), Frame::Finish => dst.put(&[3, 0][..]), } Ok(()) } } impl Decoder for Codec { type Item = Frame; type Error = io::Error; fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { if src.len() < 2 { return Ok(None) } match &src.split_to(2)[..] { [1, n] => Ok(Some(Frame::Ping(*n))), [2, n] => Ok(Some(Frame::Pong(*n))), [3, 0] => Ok(Some(Frame::Finish)), _ => Err(io::ErrorKind::InvalidData.into()) } } } // The message type a player understands. enum PlayerMessage { // The player should listen on the network and report back to // the given address when ready. Listen(SocketAddr, Addr), // The player should connect to the given socket address and report // back when connected. Connect(SocketAddr, Addr), // The player has received a data frame over the network. Data(Frame), // The player has encountered an I/O error. IoError(io::Error) } // When reading data from the network, this conversion will map the // possible items read to a player message. impl From> for PlayerMessage { fn from(x: stream::Event<(), Frame, io::Error>) -> Self { match x { stream::Event::Item(_, f) => PlayerMessage::Data(f), stream::Event::End(_) => PlayerMessage::IoError(io::ErrorKind::UnexpectedEof.into()), stream::Event::Error(_, e) => PlayerMessage::IoError(e) } } } // Errors a player can encounter. #[derive(Debug)] enum Error { Actor(holly::Error), Io(io::Error) } impl From for Error { fn from(e: io::Error) -> Self { Error::Io(e) } } impl From for Error { fn from(e: holly::Error) -> Self { Error::Actor(e) } } // The actual player actor. enum Player { // When created a player is in `Init` state. Init, // When playing, a player actor is in `Running` state. Running { input: stream::KillCord, output: Box + Send> }, // The actor is closing down. Closing { output: Box + Send> } } impl Actor, Error> for Player where T: From<()> + Send + 'static { type Result = Box>, Error = Error> + Send>; fn process(self, ctx: &mut Context>, msg: Option>) -> Self::Result { match self { Player::Init => match msg { Some(PlayerMessage::Listen(a, reply)) => { println!("Player {}: Listening at: {}", ctx.id(), a); let f = future::result(TcpListener::bind(&a)).from_err() .and_then(move |l| { reply.send(()).from_err().map(|_| l) }) .and_then(move |l| l.incoming().from_err() .into_future() .map_err(|e| e.0) .and_then(move |(conn, _)| match conn { None => Either::A(future::ok(State::Done)), Some(c) => { let (sink, stream) = Framed::new(c, Codec).split(); let (r, s) = stream::stoppable((), stream); let output = Box::new(sink); let state = Player::Running { input: r, output }; let boxed = Box::new(s.map(Into::into)); Either::B(future::ok(State::Stream(state, boxed))) } })); Box::new(f) } Some(PlayerMessage::Connect(a, reply)) => { println!("Player {}: Connecting to: {}", ctx.id(), a); let f = TcpStream::connect(&a).from_err() .and_then(move |c| { let (sink, stream) = Framed::new(c, Codec).split(); let (r, s) = stream::stoppable((), stream); let output = Box::new(sink); let state = Player::Running { input: r, output }; reply.send(()).from_err().map(|_| { State::Stream(state, Box::new(s.map(Into::into))) }) }); Box::new(f) } _ => Box::new(future::ok(State::Done)) } Player::Running { input, output } => match msg { Some(PlayerMessage::IoError(e)) => { println!("Player {}: i/o error: {}", ctx.id(), e); Box::new(future::ok(State::Done)) } Some(PlayerMessage::Data(Frame::Finish)) => { println!("Player {}: finish", ctx.id()); Box::new(future::ok(State::Close(Player::Closing { output }))) } Some(PlayerMessage::Data(Frame::Ping(0))) => { let id = ctx.id(); println!("Player {}: Ping(0)", id); let f = sink::send(output, Frame::Finish) .and_then(sink::flush) .map(move |output| State::Close(Player::Closing { output })) .map_err(move |e| { println!("Player {}: Error sending Ping(0): {}", id, e); e.into() }); Box::new(f) } Some(PlayerMessage::Data(Frame::Ping(n))) => { let id = ctx.id(); println!("Player {}: Ping({})", id, n); let f = sink::send(output, Frame::Pong(n - 1)) .and_then(sink::flush) .map(move |os| State::Ready(Player::Running { input, output: os })) .map_err(move |e| { println!("Player {}: Error sending Ping({}): {}", id, n - 1, e); e.into() }); Box::new(f) } Some(PlayerMessage::Data(Frame::Pong(0))) => { let id = ctx.id(); println!("Player {}: Pong(0)", id); let f = sink::send(output, Frame::Finish) .and_then(sink::flush) .map(move |output| State::Close(Player::Closing { output })) .map_err(move |e| { println!("Player {}: Error sending Pong(0): {}", id, e); e.into() }); Box::new(f) } Some(PlayerMessage::Data(Frame::Pong(n))) => { let id = ctx.id(); println!("Player {}: Pong({})", id, n); let f = sink::send(output, Frame::Ping(n - 1)) .and_then(sink::flush) .map(move |os| State::Ready(Player::Running { input, output: os })) .map_err(move |e| { println!("Player {}: Error sending Pong({}): {}", id, n - 1, e); e.into() }); Box::new(f) } Some(PlayerMessage::Listen(..)) | Some(PlayerMessage::Connect(..)) | None => { Box::new(future::ok(State::Done)) } } Player::Closing { output } => match msg { None => { let i = ctx.id(); println!("Player {}: closing output", i); let f = sink::close(output) .map(move |_| { println!("Player {}: done", i); State::Done }) .map_err(move |e| { println!("Player {}: Error closing output: {}", i, e); e.into() }); Box::new(f) } _ => Box::new(future::ok(State::Ready(Player::Closing { output }))) } } } } // `Root` is the actor creating and managing players. enum Root { Init, // Player 1 (the server) has been started. ServerStarted, // Player 2 (the client) has been started. ClientStarted(Addr>), } // The message, root understands. enum RootMessage { Start, Reply(()), PlayerFailed(actor::Fail) } impl From<()> for RootMessage { fn from(r: ()) -> Self { RootMessage::Reply(r) } } impl From> for RootMessage { fn from(x: actor::Fail) -> Self { RootMessage::PlayerFailed(x) } } impl Actor for Root { type Result = Box, Error = holly::Error> + Send>; fn process(self, ctx: &mut Context, msg: Option) -> Self::Result { match (self, msg) { (Root::Init, Some(RootMessage::Start)) => { println!("Root: Starting server."); let listen_addr = "127.0.0.1:12345".parse().unwrap(); let addr = ctx.address(); let opts = actor::Options::default().supervisor(ctx.address()); let f = future::result(ctx.scheduler().spawn_ext(Player::Init, opts)) .and_then(move |player| { player.send(PlayerMessage::Listen(listen_addr, addr.cast())) }) .map(|_| State::Ready(Root::ServerStarted)); Box::new(f) } (Root::ServerStarted, Some(RootMessage::Reply(()))) => { println!("Root: Starting client."); let connect_addr = "127.0.0.1:12345".parse().unwrap(); let addr = ctx.address(); let opts = actor::Options::default().supervisor(ctx.address()); let f = future::result(ctx.scheduler().spawn_ext(Player::Init, opts)) .and_then(move |player| { player.send(PlayerMessage::Connect(connect_addr, addr.cast())) }) .map(move |player| { State::Ready(Root::ClientStarted(player)) }); Box::new(f) } (Root::ClientStarted(player), Some(RootMessage::Reply(()))) => { println!("Root: Player {} - Go!", player.id()); let f = player.send(PlayerMessage::Data(Frame::Ping(32))) .map(|_| State::Done); Box::new(f) } (_, Some(RootMessage::PlayerFailed(f))) => { println!("Root: Player {} failed: {:?}", f.id(), f.error()); Box::new(future::ok(State::Done)) } _ => Box::new(future::ok(State::Done)) } } } #[test] fn ping_pong_over_tcp() -> Result<(), Box> { let rt = Runtime::new()?; let exec = Scheduler::new(rt.executor()); let mut r = exec.spawn(Root::Init)?; r.send_now(RootMessage::Start)?; rt.shutdown_on_idle().wait().unwrap(); Ok(()) }