use std::net::SocketAddr; use futures::io::Error; use futures::{ io, io::{AsyncRead, AsyncReadExt, Result as FutIoResult}, AsyncWriteExt, StreamExt, }; use mirai::tcp::{TcpConnection, TcpListenerStream, TcpRecvStream, TcpSendStream}; use stage::actors::{ Actor, ActorCtx, ActorError, ActorRef, ActorResult, BuildActorOp, Message, MsgError, }; pub struct TcpRecvActor { conn: TcpRecvStream, handler: Actor, } pub struct TcpSendActor { conn: TcpSendStream, recv: ActorRef, } pub struct TcpListenerActor { listener: TcpListenerStream, } #[derive(Default)] pub struct Echo { barrier: (), rx: Option, tx: Option, } impl TcpRecvActor { pub async fn handle(mut self, ctx: ActorCtx, _: Message) -> ActorResult { let mut buf = Vec::new(); // Read from the stream. Mirai will wake this when there are bytes to be read. let msg = match self.conn.read_to_end(&mut buf).await { Ok(_) => TcpRecvMsg(Ok(buf)), Err(err) => TcpRecvMsg(Err(err)), }; // Make sure we keep working let _ = ctx.this.send(Continue); // Pass the bytes to the handler for it to deserialize and process. match self.handler.send(msg) { Ok(_) => Ok(self), Err(_) => Err(ActorError::Stopping), } } } impl TcpSendActor { pub async fn handle(mut self, ctx: ActorCtx, msg: Message) -> ActorResult { // This Actor runs on internal input, not external networking, so we are waiting for the // appropriate message. if let Some(cmd) = msg.try_cast::() { match cmd { TcpSendCmd::Send(bytes) => { // Write and flush. Flushing should be done after whole messages. self.conn.write(bytes.as_slice()).await; self.conn.flush().await; Ok(self) } TcpSendCmd::Close => { // For our example, the Sender also acts as the control point for the // connection. You may want something more elaborate, but for this, we just shut // both down. ctx.sys.stop_actor(self.recv); Err(ActorError::Stopping) } } } else { Ok(self) } } } impl TcpListenerActor { pub async fn handle(mut self, ctx: ActorCtx, msg: Message) -> ActorResult { // Await each incoming connection. The Actor will use no CPU time until the system gives the // event letting us know we have a connection. let incoming = match self.listener.next().await.unwrap() { Ok(inc) => inc, Err(err) => return Err(ActorError::StdError(err.into())), }; // Now we split the connection, so we can have parallel read/write. let (tx, rx) = incoming.split().unwrap(); let build_echo = BuildActorOp::new(None, Echo::default(), Echo::handle); // Unwrap is safe, for now, because the only error for creating an Actor is if there let echo_actor = ctx.sys.new_actor(build_echo).await.unwrap(); let rx_actor_state = TcpRecvActor { conn: rx, handler: echo_actor.clone() }; } } impl Echo { pub async fn handle(mut self, ctx: ActorCtx, msg: Message) -> ActorResult { if let Some(msg) = msg.try_cast::() { let msg = String::from_utf8(msg.0.unwrap().clone()).unwrap(); self.tx.unwrap().send(TcpSendCmd::Send(msg.into_bytes())).unwrap(); Ok(self) } else if let Some(msg) = msg.try_cast::() { self.rx = Some(msg.0.clone()); Ok(self) } else if let Some(msg) = msg.try_cast::() { self.tx = Some(msg.0.clone()); Ok(self) } else { Ok(self) } } } pub struct TcpRecvMsg(Result, io::Error>); pub enum TcpSendCmd { Send(Vec), Close, } pub struct Continue; // Informs the Actor that the enclosed Actor is sending them inbound TCP messages pub struct ReadingTcp(pub Actor); // Informs the Actor that the enclosed Actor is prepared to send outbound TCP messages pub struct WritingTcp(pub Actor); actor_msg!(Continue, TcpRecvMsg, TcpSendCmd, ReadingTcp, WritingTcp);