extern crate futures; extern crate futures_cpupool; extern crate tk_http; extern crate http_file_headers; extern crate tk_listen; extern crate tokio_core; extern crate tokio_io; #[macro_use] extern crate log; #[macro_use] extern crate lazy_static; use std::time::Duration; use std::path::{Path}; use std::sync::Arc; use futures::{Future, Stream, Async}; use futures::future::{ok, FutureResult, Either, loop_fn, Loop}; use futures_cpupool::{CpuPool, CpuFuture}; use tk_listen::ListenExt; use tokio_io::AsyncWrite; use tokio_core::net::TcpListener; use tokio_core::reactor::Core; use tk_http::server; use tk_http::Status; use http_file_headers::{Input, Output, Config}; const MAX_SIMULTANEOUS_CONNECTIONS: usize = 500; const TIME_TO_WAIT_ON_ERROR: u64 = 100; lazy_static! { static ref POOL: CpuPool = CpuPool::new(8); static ref CONFIG: Arc = Config::new() .add_index_file("index.html") .done(); } type ResponseFuture = Box, Error=server::Error>>; struct Codec { fut: Option>, } struct Dispatcher { } fn common_headers(e: &mut server::Encoder) { e.format_header("Server", format_args!("serve_dir/{}", env!("CARGO_PKG_VERSION"))).unwrap(); } fn respond_error(status: Status, mut e: server::Encoder) -> FutureResult, server::Error> { let body = format!("{} {}", status.code(), status.reason()); e.status(status); e.add_length(body.as_bytes().len() as u64).unwrap(); common_headers(&mut e); if e.done_headers().unwrap() { e.write_body(body.as_bytes()); } ok(e.done()) } impl server::Codec for Codec { type ResponseFuture = ResponseFuture; fn recv_mode(&mut self) -> server::RecvMode { server::RecvMode::buffered_upfront(0) } fn data_received(&mut self, data: &[u8], end: bool) -> Result, server::Error> { debug_assert!(end && data.len() == 0); Ok(Async::Ready(0)) } fn start_response(&mut self, mut e: server::Encoder) -> Self::ResponseFuture { Box::new(self.fut.take().unwrap().then(move |result| { match result { Ok(Output::File(outf)) | Ok(Output::FileRange(outf)) => { if outf.is_partial() { e.status(Status::PartialContent); } else { e.status(Status::Ok); } e.add_length(outf.content_length()).unwrap(); common_headers(&mut e); for (name, val) in outf.headers() { e.format_header(name, val).unwrap(); } // add headers if e.done_headers().unwrap() { // start writing body Either::B(loop_fn((e, outf), |(mut e, mut outf)| { POOL.spawn_fn(move || { outf.read_chunk(&mut e).map(|b| (b, e, outf)) }).and_then(|(b, e, outf)| { e.wait_flush(4096).map(move |e| (b, e, outf)) }).map(|(b, e, outf)| { if b == 0 { Loop::Break(e.done()) } else { Loop::Continue((e, outf)) } }).map_err(|e| server::Error::custom(e)) })) } else { Either::A(ok(e.done())) } } Ok(Output::FileHead(head)) | Ok(Output::NotModified(head)) => { if head.is_not_modified() { e.status(Status::NotModified); } else if head.is_partial() { e.status(Status::PartialContent); e.add_length(head.content_length()).unwrap(); } else { e.status(Status::Ok); e.add_length(head.content_length()).unwrap(); } common_headers(&mut e); for (name, val) in head.headers() { e.format_header(name, val).unwrap(); } assert_eq!(e.done_headers().unwrap(), false); Either::A(ok(e.done())) } Ok(Output::InvalidRange) => { Either::A(respond_error( Status::RequestRangeNotSatisfiable, e)) } Ok(Output::InvalidMethod) => { Either::A(respond_error( Status::MethodNotAllowed, e)) } Ok(Output::NotFound) | Ok(Output::Directory) => { Either::A(respond_error(Status::NotFound, e)) } Err(status) => { Either::A(respond_error(status, e)) } } })) } } impl server::Dispatcher for Dispatcher { type Codec = Codec; fn headers_received(&mut self, head: &server::Head) -> Result { let inp = Input::from_headers(&*CONFIG, head.method(), head.headers()); let path = Path::new("./public").join(head.path() .expect("only static requests expected") // fails on OPTIONS * .trim_left_matches(|x| x == '/')); let fut = POOL.spawn_fn(move || { inp.probe_file(&path).map_err(|e| { error!("Error reading file {:?}: {}", path, e); Status::InternalServerError }) }); Ok(Codec { fut: Some(fut), }) } } fn main() { let addr = "127.0.0.1:8000".parse().unwrap(); let mut lp = Core::new().unwrap(); let h1 = lp.handle(); let listener = TcpListener::bind(&addr, &lp.handle()).unwrap(); let cfg = server::Config::new().done(); println!("Listening on {}", addr); lp.run( listener.incoming() .sleep_on_error(Duration::from_millis(TIME_TO_WAIT_ON_ERROR), &h1) .map(move |(socket, _addr)| { server::Proto::new(socket, &cfg, Dispatcher {}, &h1) .map_err(|e| { println!("Connection error: {}", e); }) }) .listen(MAX_SIMULTANEOUS_CONNECTIONS) ).unwrap(); // stream doesn't end in this case }