#![allow(dead_code)] //! Various runtimes for hyper use std::{ future::Future, pin::Pin, task::{Context, Poll}, time::{Duration, Instant}, }; use hyper::rt::{Sleep, Timer}; use pin_project_lite::pin_project; #[derive(Clone)] /// An Executor that uses the tokio runtime. pub struct TokioExecutor; impl hyper::rt::Executor for TokioExecutor where F: std::future::Future + Send + 'static, F::Output: Send + 'static, { fn execute(&self, fut: F) { tokio::task::spawn(fut); } } /// A Timer that uses the tokio runtime. #[derive(Clone, Debug)] pub struct TokioTimer; impl Timer for TokioTimer { fn sleep(&self, duration: Duration) -> Pin> { Box::pin(TokioSleep { inner: tokio::time::sleep(duration), }) } fn sleep_until(&self, deadline: Instant) -> Pin> { Box::pin(TokioSleep { inner: tokio::time::sleep_until(deadline.into()), }) } fn reset(&self, sleep: &mut Pin>, new_deadline: Instant) { if let Some(sleep) = sleep.as_mut().downcast_mut_pin::() { sleep.reset(new_deadline.into()) } } } struct TokioTimeout { inner: Pin>>, } impl Future for TokioTimeout where T: Future, { type Output = Result; fn poll( mut self: Pin<&mut Self>, context: &mut Context<'_>, ) -> Poll { self.inner.as_mut().poll(context) } } // Use TokioSleep to get tokio::time::Sleep to implement Unpin. // see https://docs.rs/tokio/latest/tokio/time/struct.Sleep.html pin_project! { pub(crate) struct TokioSleep { #[pin] pub(crate) inner: tokio::time::Sleep, } } impl Future for TokioSleep { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { self.project().inner.poll(cx) } } impl Sleep for TokioSleep {} impl TokioSleep { pub fn reset(self: Pin<&mut Self>, deadline: Instant) { self.project().inner.as_mut().reset(deadline.into()); } } pin_project! { #[derive(Debug)] pub struct TokioIo { #[pin] inner: T, } } impl TokioIo { pub fn new(inner: T) -> Self { Self { inner } } pub fn inner(self) -> T { self.inner } } impl hyper::rt::Read for TokioIo where T: tokio::io::AsyncRead, { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, mut buf: hyper::rt::ReadBufCursor<'_>, ) -> Poll> { let n = unsafe { let mut tbuf = tokio::io::ReadBuf::uninit(buf.as_mut()); match tokio::io::AsyncRead::poll_read( self.project().inner, cx, &mut tbuf, ) { Poll::Ready(Ok(())) => tbuf.filled().len(), other => return other, } }; unsafe { buf.advance(n); } Poll::Ready(Ok(())) } } impl hyper::rt::Write for TokioIo where T: tokio::io::AsyncWrite, { fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { tokio::io::AsyncWrite::poll_write(self.project().inner, cx, buf) } fn poll_flush( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { tokio::io::AsyncWrite::poll_flush(self.project().inner, cx) } fn poll_shutdown( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { tokio::io::AsyncWrite::poll_shutdown(self.project().inner, cx) } fn is_write_vectored(&self) -> bool { tokio::io::AsyncWrite::is_write_vectored(&self.inner) } fn poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[std::io::IoSlice<'_>], ) -> Poll> { tokio::io::AsyncWrite::poll_write_vectored(self.project().inner, cx, bufs) } } impl tokio::io::AsyncRead for TokioIo where T: hyper::rt::Read, { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, tbuf: &mut tokio::io::ReadBuf<'_>, ) -> Poll> { //let init = tbuf.initialized().len(); let filled = tbuf.filled().len(); let sub_filled = unsafe { let mut buf = hyper::rt::ReadBuf::uninit(tbuf.unfilled_mut()); match hyper::rt::Read::poll_read( self.project().inner, cx, buf.unfilled(), ) { Poll::Ready(Ok(())) => buf.filled().len(), other => return other, } }; let n_filled = filled + sub_filled; // At least sub_filled bytes had to have been initialized. let n_init = sub_filled; unsafe { tbuf.assume_init(n_init); tbuf.set_filled(n_filled); } Poll::Ready(Ok(())) } } impl tokio::io::AsyncWrite for TokioIo where T: hyper::rt::Write, { fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { hyper::rt::Write::poll_write(self.project().inner, cx, buf) } fn poll_flush( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { hyper::rt::Write::poll_flush(self.project().inner, cx) } fn poll_shutdown( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { hyper::rt::Write::poll_shutdown(self.project().inner, cx) } fn is_write_vectored(&self) -> bool { hyper::rt::Write::is_write_vectored(&self.inner) } fn poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[std::io::IoSlice<'_>], ) -> Poll> { hyper::rt::Write::poll_write_vectored(self.project().inner, cx, bufs) } }