#![allow(dead_code)] use std::{ io, marker::Unpin, pin::Pin, task::{self, Poll}, }; use {futures_03_dep::ready, partial_io::PartialOp}; pub struct PartialAsyncRead { inner: R, ops: Box + Send>, } impl PartialAsyncRead where R: Unpin, { pub fn new(inner: R, ops: I) -> Self where I: IntoIterator, I::IntoIter: Send + 'static, { PartialAsyncRead { inner, ops: Box::new(ops.into_iter()), } } } impl tokio_02_dep::io::AsyncRead for PartialAsyncRead where R: tokio_02_dep::io::AsyncRead + Unpin, { fn poll_read( mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &mut [u8], ) -> Poll> { match self.ops.next() { Some(PartialOp::Limited(n)) => { let len = std::cmp::min(n, buf.len()); Pin::new(&mut self.inner).poll_read(cx, &mut buf[..len]) } Some(PartialOp::Err(err)) => { if err == io::ErrorKind::WouldBlock { cx.waker().wake_by_ref(); Poll::Pending } else { Err(io::Error::new( err, "error during read, generated by partial-io", )) .into() } } Some(PartialOp::Unlimited) | None => Pin::new(&mut self.inner).poll_read(cx, buf), } } } impl tokio_03_dep::io::AsyncRead for PartialAsyncRead where R: tokio_03_dep::io::AsyncRead + Unpin, { fn poll_read( mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &mut tokio_03_dep::io::ReadBuf<'_>, ) -> Poll> { match self.ops.next() { Some(PartialOp::Limited(n)) => { let len = std::cmp::min(n, buf.remaining()); buf.initialize_unfilled(); let mut sub_buf = buf.take(len); ready!(Pin::new(&mut self.inner).poll_read(cx, &mut sub_buf))?; let filled = sub_buf.filled().len(); buf.advance(filled); Poll::Ready(Ok(())) } Some(PartialOp::Err(err)) => { if err == io::ErrorKind::WouldBlock { cx.waker().wake_by_ref(); Poll::Pending } else { Err(io::Error::new( err, "error during read, generated by partial-io", )) .into() } } Some(PartialOp::Unlimited) | None => Pin::new(&mut self.inner).poll_read(cx, buf), } } } impl tokio_dep::io::AsyncRead for PartialAsyncRead where R: tokio_dep::io::AsyncRead + Unpin, { fn poll_read( mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &mut tokio_dep::io::ReadBuf<'_>, ) -> Poll> { match self.ops.next() { Some(PartialOp::Limited(n)) => { let len = std::cmp::min(n, buf.remaining()); buf.initialize_unfilled(); let mut sub_buf = buf.take(len); ready!(Pin::new(&mut self.inner).poll_read(cx, &mut sub_buf))?; let filled = sub_buf.filled().len(); buf.advance(filled); Poll::Ready(Ok(())) } Some(PartialOp::Err(err)) => { if err == io::ErrorKind::WouldBlock { cx.waker().wake_by_ref(); Poll::Pending } else { Err(io::Error::new( err, "error during read, generated by partial-io", )) .into() } } Some(PartialOp::Unlimited) | None => Pin::new(&mut self.inner).poll_read(cx, buf), } } } pub struct FuturesPartialAsyncRead { inner: R, ops: Box + Send>, } impl FuturesPartialAsyncRead where R: crate::futures::io::AsyncRead + Unpin, { pub fn new(inner: R, ops: I) -> Self where I: IntoIterator, I::IntoIter: Send + 'static, { FuturesPartialAsyncRead { inner, ops: Box::new(ops.into_iter()), } } } impl crate::futures::io::AsyncRead for FuturesPartialAsyncRead where R: crate::futures::io::AsyncRead + Unpin, { fn poll_read( mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &mut [u8], ) -> Poll> { match self.ops.next() { Some(PartialOp::Limited(n)) => { let len = std::cmp::min(n, buf.len()); Pin::new(&mut self.inner).poll_read(cx, &mut buf[..len]) } Some(PartialOp::Err(err)) => { if err == io::ErrorKind::WouldBlock { cx.waker().wake_by_ref(); Poll::Pending } else { Err(io::Error::new( err, "error during read, generated by partial-io", )) .into() } } Some(PartialOp::Unlimited) | None => Pin::new(&mut self.inner).poll_read(cx, buf), } } }