pub mod sync { use std::io::Read; pub fn to_vec(mut read: impl Read) -> Vec { let mut output = vec![]; read.read_to_end(&mut output).unwrap(); output } } #[cfg(feature = "futures-io")] pub mod futures { pub mod bufread { pub use futures::io::AsyncBufRead; use crate::utils::{InputStream, TrackEof}; use futures::stream::{StreamExt as _, TryStreamExt as _}; pub fn from(input: &InputStream) -> impl AsyncBufRead { // By using the stream here we ensure that each chunk will require a separate // read/poll_fill_buf call to process to help test reading multiple chunks. TrackEof::new(input.stream().map(Ok).into_async_read()) } } pub mod read { use crate::utils::{block_on, pin_mut}; use futures::io::{copy_buf, AsyncRead, AsyncReadExt, BufReader, Cursor}; pub fn to_vec(read: impl AsyncRead) -> Vec { // TODO: https://github.com/rust-lang-nursery/futures-rs/issues/1510 // All current test cases are < 100kB let mut output = Cursor::new(vec![0; 102_400]); pin_mut!(read); let len = block_on(copy_buf(BufReader::with_capacity(2, read), &mut output)).unwrap(); let mut output = output.into_inner(); output.truncate(len as usize); output } pub fn poll_read(reader: impl AsyncRead, output: &mut [u8]) -> std::io::Result { pin_mut!(reader); block_on(reader.read(output)) } } pub mod write { use crate::utils::{block_on, Pin, TrackClosed}; use futures::io::{AsyncWrite, AsyncWriteExt as _}; use futures_test::io::AsyncWriteTestExt as _; pub fn to_vec( input: &[Vec], create_writer: impl for<'a> FnOnce( &'a mut (dyn AsyncWrite + Unpin), ) -> Pin>, limit: usize, ) -> Vec { let mut output = Vec::new(); { let mut test_writer = TrackClosed::new( (&mut output) .limited_write(limit) .interleave_pending_write(), ); { let mut writer = create_writer(&mut test_writer); for chunk in input { block_on(writer.write_all(chunk)).unwrap(); block_on(writer.flush()).unwrap(); } block_on(writer.close()).unwrap(); } assert!(test_writer.is_closed()); } output } } } #[cfg(feature = "tokio")] pub mod tokio { pub mod bufread { use crate::utils::{InputStream, TrackEof}; use bytes::Bytes; use futures::stream::StreamExt; pub use tokio::io::AsyncBufRead; use tokio_util::io::StreamReader; pub fn from(input: &InputStream) -> impl AsyncBufRead { // By using the stream here we ensure that each chunk will require a separate // read/poll_fill_buf call to process to help test reading multiple chunks. TrackEof::new(StreamReader::new( input.stream().map(Bytes::from).map(std::io::Result::Ok), )) } } pub mod read { use crate::utils::{block_on, pin_mut, tokio_ext::copy_buf}; use std::io::Cursor; use tokio::io::{AsyncRead, AsyncReadExt, BufReader}; pub fn to_vec(read: impl AsyncRead) -> Vec { let mut output = Cursor::new(vec![0; 102_400]); pin_mut!(read); let len = block_on(copy_buf(BufReader::with_capacity(2, read), &mut output)).unwrap(); let mut output = output.into_inner(); output.truncate(len as usize); output } pub fn poll_read(reader: impl AsyncRead, output: &mut [u8]) -> std::io::Result { pin_mut!(reader); block_on(reader.read(output)) } } pub mod write { use crate::utils::{ block_on, tokio_ext::AsyncWriteTestExt as _, track_closed::TrackClosed, Pin, }; use std::io::Cursor; use tokio::io::{AsyncWrite, AsyncWriteExt as _}; pub fn to_vec( input: &[Vec], create_writer: impl for<'a> FnOnce( &'a mut (dyn AsyncWrite + Unpin), ) -> Pin>, limit: usize, ) -> Vec { let mut output = Cursor::new(Vec::new()); { let mut test_writer = TrackClosed::new( (&mut output) .limited_write(limit) .interleave_pending_write(), ); { let mut writer = create_writer(&mut test_writer); for chunk in input { block_on(writer.write_all(chunk)).unwrap(); block_on(writer.flush()).unwrap(); } block_on(writer.shutdown()).unwrap(); } assert!(test_writer.is_closed()); } output.into_inner() } } }