#![deny(warnings)] use std::cell::Cell; use std::rc::Rc; use tokio::sync::oneshot; use hyper::body::{Bytes, HttpBody}; use hyper::header::{HeaderMap, HeaderValue}; use hyper::service::{make_service_fn, service_fn}; use hyper::{Error, Response, Server}; use std::marker::PhantomData; use std::pin::Pin; use std::task::{Context, Poll}; struct Body { // Our Body type is !Send and !Sync: _marker: PhantomData<*const ()>, data: Option, } impl From for Body { fn from(a: String) -> Self { Body { _marker: PhantomData, data: Some(a.into()), } } } impl HttpBody for Body { type Data = Bytes; type Error = Error; fn poll_data( self: Pin<&mut Self>, _: &mut Context<'_>, ) -> Poll>> { Poll::Ready(self.get_mut().data.take().map(Ok)) } fn poll_trailers( self: Pin<&mut Self>, _: &mut Context<'_>, ) -> Poll>, Self::Error>> { Poll::Ready(Ok(None)) } } fn main() { pretty_env_logger::init(); // Configure a runtime that runs everything on the current thread let rt = tokio::runtime::Builder::new_current_thread() .enable_all() .build() .expect("build runtime"); // Combine it with a `LocalSet, which means it can spawn !Send futures... let local = tokio::task::LocalSet::new(); local.block_on(&rt, run()); } async fn run() { let addr = ([127, 0, 0, 1], 3000).into(); // Using a !Send request counter is fine on 1 thread... let counter = Rc::new(Cell::new(0)); let make_service = make_service_fn(move |_| { // For each connection, clone the counter to use in our service... let cnt = counter.clone(); async move { Ok::<_, Error>(service_fn(move |_| { let prev = cnt.get(); cnt.set(prev + 1); let value = cnt.get(); async move { Ok::<_, Error>(Response::new(Body::from(format!("Request #{}", value)))) } })) } }); let server = Server::bind(&addr).executor(LocalExec).serve(make_service); // Just shows that with_graceful_shutdown compiles with !Send, // !Sync HttpBody. let (_tx, rx) = oneshot::channel::<()>(); let server = server.with_graceful_shutdown(async move { rx.await.ok(); }); println!("Listening on http://{}", addr); // The server would block on current thread to await !Send futures. if let Err(e) = server.await { eprintln!("server error: {}", e); } } // Since the Server needs to spawn some background tasks, we needed // to configure an Executor that can spawn !Send futures... #[derive(Clone, Copy, Debug)] struct LocalExec; impl hyper::rt::Executor for LocalExec where F: std::future::Future + 'static, // not requiring `Send` { fn execute(&self, fut: F) { // This will spawn into the currently running `LocalSet`. tokio::task::spawn_local(fut); } }