use retty_io::deprecated::{EventLoop, EventLoopBuilder, Handler}; use retty_io::net::{TcpListener, TcpStream}; use retty_io::*; use slab::Slab; use std::collections::LinkedList; use std::time::Duration; use std::{io, thread}; use {localhost, sleep_ms, TryRead, TryWrite}; // Don't touch the connection slab const SERVER: Token = Token(10_000_000); const CLIENT: Token = Token(10_000_001); #[cfg(windows)] const N: usize = 10_000; #[cfg(unix)] const N: usize = 1_000_000; struct EchoConn { sock: TcpStream, token: Option, count: usize, buf: Vec, } impl EchoConn { fn new(sock: TcpStream) -> EchoConn { let mut ec = EchoConn { sock: sock, token: None, buf: Vec::with_capacity(22), count: 0, }; unsafe { ec.buf.set_len(22) }; ec } fn writable(&mut self, event_loop: &mut EventLoop) -> io::Result<()> { event_loop.reregister( &self.sock, self.token.unwrap(), Ready::readable(), PollOpt::edge() | PollOpt::oneshot(), ) } fn readable(&mut self, event_loop: &mut EventLoop) -> io::Result<()> { loop { match self.sock.try_read(&mut self.buf[..]) { Ok(None) => { break; } Ok(Some(_)) => { self.count += 1; if self.count % 10000 == 0 { info!("Received {} messages", self.count); } if self.count == N { event_loop.shutdown(); } } Err(_) => { break; } }; } event_loop.reregister( &self.sock, self.token.unwrap(), Ready::readable(), PollOpt::edge() | PollOpt::oneshot(), ) } } struct EchoServer { sock: TcpListener, conns: Slab, } impl EchoServer { fn accept(&mut self, event_loop: &mut EventLoop) -> io::Result<()> { debug!("server accepting socket"); let sock = self.sock.accept().unwrap().0; let conn = EchoConn::new(sock); let tok = self.conns.insert(conn); // Register the connection self.conns[tok].token = Some(Token(tok)); event_loop .register( &self.conns[tok].sock, Token(tok), Ready::readable(), PollOpt::edge() | PollOpt::oneshot(), ) .expect("could not register socket with event loop"); Ok(()) } fn conn_readable(&mut self, event_loop: &mut EventLoop, tok: Token) -> io::Result<()> { debug!("server conn readable; tok={:?}", tok); self.conn(tok).readable(event_loop) } fn conn_writable(&mut self, event_loop: &mut EventLoop, tok: Token) -> io::Result<()> { debug!("server conn writable; tok={:?}", tok); self.conn(tok).writable(event_loop) } fn conn<'a>(&'a mut self, tok: Token) -> &'a mut EchoConn { &mut self.conns[tok.into()] } } struct EchoClient { sock: TcpStream, backlog: LinkedList, token: Token, count: u32, } // Sends a message and expects to receive the same exact message, one at a time impl EchoClient { fn new(sock: TcpStream, tok: Token) -> EchoClient { EchoClient { sock: sock, backlog: LinkedList::new(), token: tok, count: 0, } } fn readable(&mut self, _event_loop: &mut EventLoop) -> io::Result<()> { Ok(()) } fn writable(&mut self, event_loop: &mut EventLoop) -> io::Result<()> { debug!("client socket writable"); while self.backlog.len() > 0 { match self .sock .try_write(self.backlog.front().unwrap().as_bytes()) { Ok(None) => { break; } Ok(Some(_)) => { self.backlog.pop_front(); self.count += 1; if self.count % 10000 == 0 { info!("Sent {} messages", self.count); } } Err(e) => { debug!("not implemented; client err={:?}", e); break; } } } if self.backlog.len() > 0 { event_loop .reregister( &self.sock, self.token, Ready::writable(), PollOpt::edge() | PollOpt::oneshot(), ) .unwrap(); } Ok(()) } } struct Echo { server: EchoServer, client: EchoClient, } impl Echo { fn new(srv: TcpListener, client: TcpStream) -> Echo { Echo { server: EchoServer { sock: srv, conns: Slab::with_capacity(128), }, client: EchoClient::new(client, CLIENT), } } } impl Handler for Echo { type Timeout = usize; type Message = String; fn ready(&mut self, event_loop: &mut EventLoop, token: Token, events: Ready) { if events.is_readable() { match token { SERVER => self.server.accept(event_loop).unwrap(), CLIENT => self.client.readable(event_loop).unwrap(), i => self.server.conn_readable(event_loop, i).unwrap(), } } if events.is_writable() { match token { SERVER => panic!("received writable for token 0"), CLIENT => self.client.writable(event_loop).unwrap(), _ => self.server.conn_writable(event_loop, token).unwrap(), } } } fn notify(&mut self, event_loop: &mut EventLoop, msg: String) { match self.client.sock.try_write(msg.as_bytes()) { Ok(Some(n)) => { self.client.count += 1; if self.client.count % 10000 == 0 { info!("Sent {} bytes: count {}", n, self.client.count); } } _ => { self.client.backlog.push_back(msg); event_loop .reregister( &self.client.sock, self.client.token, Ready::writable(), PollOpt::edge() | PollOpt::oneshot(), ) .unwrap(); } } } } #[test] pub fn test_echo_server() { debug!("Starting TEST_ECHO_SERVER"); let mut b = EventLoopBuilder::new(); b.notify_capacity(1_048_576) .messages_per_tick(64) .timer_tick(Duration::from_millis(100)) .timer_wheel_size(1_024) .timer_capacity(65_536); let mut event_loop = b.build().unwrap(); let addr = localhost(); let srv = TcpListener::bind(&addr).unwrap(); info!("listen for connections"); event_loop .register( &srv, SERVER, Ready::readable(), PollOpt::edge() | PollOpt::oneshot(), ) .unwrap(); let sock = TcpStream::connect(&addr).unwrap(); // Connect to the server event_loop .register( &sock, CLIENT, Ready::writable(), PollOpt::edge() | PollOpt::oneshot(), ) .unwrap(); let chan = event_loop.channel(); let go = move || { let mut i = N; sleep_ms(1_000); let message = "THIS IS A TEST MESSAGE".to_string(); while i > 0 { chan.send(message.clone()).unwrap(); i -= 1; if i % 10000 == 0 { info!("Enqueued {} messages", N - i); } } }; let t = thread::spawn(go); // Start the event loop event_loop.run(&mut Echo::new(srv, sock)).unwrap(); t.join().unwrap(); }