extern crate futures; extern crate tokio_io; extern crate tokio_core; extern crate tk_bufstream; extern crate tokio_service; use std::io; use std::str; use std::io::Write; use std::net::SocketAddr; use std::env; use futures::{Future, Poll, Async}; use futures::future::{FutureResult, ok}; use futures::stream::Stream; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_core::net::TcpListener; use tokio_core::reactor::Core; use tokio_service::Service; use tk_bufstream::IoBuf; struct LineProto where T: Service, { io: IoBuf, service: T, in_flight: Option, } struct LineService; impl LineProto where T: Service, { fn new(socket: S, service: T) -> LineProto { LineProto { io: IoBuf::new(socket), service: service, in_flight: None, } } } impl Future for LineProto where T: Service, { type Item = (); type Error = io::Error; fn poll(&mut self) -> Poll<(), io::Error> { self.io.flush()?; loop { if let Some(mut future) = self.in_flight.take() { match future.poll()? { Async::Ready(value) => { // This is how we emulate a protocol serializer writeln!(&mut self.io.out_buf, "Echo: {}", value) .expect("buffer write never fails"); self.io.flush()?; } Async::NotReady => { self.in_flight = Some(future); return Ok(Async::NotReady); } } } let endline = self.io.in_buf[..].iter().position(|&x| x == b'\n'); if let Some(pos) = endline { let chunk = self.io.in_buf[..pos].to_vec(); self.io.in_buf.consume(pos+1); // consume together with '\n' // Only process valid utf-8 "requests" if let Ok(line) = String::from_utf8(chunk) { self.in_flight = Some(self.service.call(line)); continue; } } else { let nbytes = self.io.read()?; if nbytes == 0 { if self.io.done() { return Ok(Async::Ready(())); } else { return Ok(Async::NotReady); } } } } } } impl Service for LineService { type Request = String; type Response = String; type Error = io::Error; type Future = FutureResult; fn call(&self, input: String) -> Self::Future { // To emulate some useful service we trim and replace // all spaces into pluses ok(input.trim().replace(" ", "+")) } } fn main() { let addr = env::args().nth(1).unwrap_or("127.0.0.1:7777".to_string()); let addr = addr.parse::().unwrap(); let mut lp = Core::new().unwrap(); let handle = lp.handle(); let socket = TcpListener::bind(&addr, &handle).unwrap(); println!("Listening on: {}", addr); let done = socket.incoming().for_each(|(socket, _addr)| { handle.spawn( LineProto::new(socket, LineService) .map_err(|e| println!("Connection error: {}", e)) ); Ok(()) }); lp.run(done).unwrap(); }