extern crate bytes;
extern crate futures;
extern crate tokio;
extern crate tokio_simplified;
use bytes::BytesMut;
use futures::Stream;
use std::net::{IpAddr, Ipv4Addr};
use tokio::codec::{Decoder, Encoder};
use tokio::net::{TcpListener, TcpStream};
use tokio_simplified::IoManagerBuilder;
struct LineCodec;
impl Decoder for LineCodec {
type Item = String;
type Error = std::io::Error;
fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> {
let line_end_index = src.iter().position(|x| x.clone() == '\n' as u8);
Ok(match line_end_index {
None => None,
Some(index) => {
let line = src.split_to(index);
src.split_to(1);
Some(String::from_utf8(line.to_vec()).unwrap())
}
})
}
fn decode_eof(&mut self, src: &mut BytesMut) -> Result , Self::Error> {
if src.len() > 0 {
return Ok(Some(String::from_utf8(src.to_vec()).unwrap()));
}
Err(std::io::Error::from(std::io::ErrorKind::ConnectionAborted))
}
}
impl Encoder for LineCodec {
type Item = String;
type Error = std::io::Error;
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
dst.extend(item.as_bytes());
dst.extend(b"\r\n");
Ok(())
}
}
fn process_socket(socket: TcpStream) {
println!("New Client");
let (sink, stream) = LineCodec.framed(socket).split();
let trx = IoManagerBuilder::new(sink, stream)
.with_filter(|frame, writer| {
if frame.to_lowercase().contains("hello there") {
writer.write("General Kenobi!".to_string());
return None;
}
Some(frame)
})
.with_error_handler(move |error| {
println!("{}", error);
})
.build();
let mut writer = trx.get_writer();
trx.on_receive(move |frame| {
println!("Got frame: {}", frame);
writer.write("Hi there".to_string());
Ok(())
});
}
fn main() {
println!("Hello Tokio");
let addr = std::net::SocketAddr::new(IpAddr::from(Ipv4Addr::new(0, 0, 0, 0)), 6000);
let listener = TcpListener::bind(&addr);
match listener {
Ok(listener) => tokio::run(
listener
.incoming()
.map_err(|e| eprintln!("failed to accept socket; error = {:?}", e))
.for_each(|socket| {
process_socket(socket);
Ok(())
}),
),
_ => {}
};
}