#![cfg(feature = "futures")] use easy_hex::Hex; use futures::{AsyncReadExt as _, AsyncSeekExt as _, AsyncWriteExt as _}; use pretty_assertions::assert_eq; use proptest::prelude::*; use test_utils::ReaderAction; use zstd_framed::{ async_reader::AsyncZstdReader, async_writer::AsyncZstdWriter, table::futures::read_seek_table, }; mod test_utils; proptest! { #[test] fn test_async_reader_futures_decode( data in test_utils::arb_data(), level in test_utils::arb_zstd_level(), ) { let encoded = zstd::encode_all(&data[..], level).unwrap(); futures::executor::block_on(async move { let mut decoder = AsyncZstdReader::builder_futures(&encoded[..]).build().unwrap(); let mut decoded = vec![]; decoder.read_to_end(&mut decoded).await.unwrap(); assert_eq!(Hex(decoded), data); }); } #[test] fn test_async_reader_futures_decode_from_writer( data in test_utils::arb_data(), level in test_utils::arb_zstd_level(), frame_size in prop::option::of(test_utils::arb_frame_size()), ) { futures::executor::block_on(async move { let mut encoded = vec![]; let mut encoder = AsyncZstdWriter::builder(&mut encoded).with_compression_level(level); if let Some(frame_size) = frame_size { encoder = encoder.with_seek_table(frame_size); } let mut encoder = encoder.build().unwrap(); encoder.write_all(&data[..]).await.unwrap(); encoder.close().await.unwrap(); let mut decoder = AsyncZstdReader::builder_futures(&encoded[..]).build().unwrap(); let mut decoded = vec![]; decoder.read_to_end(&mut decoded).await.unwrap(); assert_eq!(Hex(decoded), data); }); } #[test] fn test_async_reader_futures_decode_framed( frames in test_utils::arb_data_framed(), level in test_utils::arb_zstd_level(), ) { futures::executor::block_on(async move { let mut encoded = vec![]; let mut encoder = AsyncZstdWriter::builder(&mut encoded).with_compression_level(level).build().unwrap(); for frame in &frames { encoder.write_all(&frame[..]).await.unwrap(); encoder.finish_frame().unwrap(); } encoder.close().await.unwrap(); let mut decoder = AsyncZstdReader::builder_futures(&encoded[..]).build().unwrap(); let mut decoded = vec![]; decoder.read_to_end(&mut decoded).await.unwrap(); let data = frames.iter().flat_map(|frame| frame.iter()).copied().collect::>(); assert_eq!(Hex(decoded), Hex(data)); }); } #[test] fn test_async_reader_futures_seek_from_start_then_decode( (data, pos) in test_utils::arb_data_with_pos(), level in test_utils::arb_zstd_level(), frame_size in prop::option::of(test_utils::arb_frame_size()), ) { futures::executor::block_on(async move { let mut encoded = vec![]; let mut encoder = AsyncZstdWriter::builder(&mut encoded).with_compression_level(level); if let Some(frame_size) = frame_size { encoder = encoder.with_seek_table(frame_size); } let mut encoder = encoder.build().unwrap(); encoder.write_all(&data[..]).await.unwrap(); encoder.close().await.unwrap(); let mut decoded = vec![]; let mut decoder = AsyncZstdReader::builder_futures(futures::io::Cursor::new(&encoded[..])).build().unwrap().seekable(); decoder.seek(std::io::SeekFrom::Start(pos.try_into().unwrap())).await.unwrap(); decoder.read_to_end(&mut decoded).await.unwrap(); assert_eq!(Hex(&decoded[..]), Hex(&data[pos..])); }); } #[test] fn test_async_reader_futures_seek_from_end_then_decode( (data, pos) in test_utils::arb_data_with_pos(), level in test_utils::arb_zstd_level(), frame_size in prop::option::of(test_utils::arb_frame_size()), ) { futures::executor::block_on(async move { let mut encoded = vec![]; let mut encoder = AsyncZstdWriter::builder(&mut encoded).with_compression_level(level); if let Some(frame_size) = frame_size { encoder = encoder.with_seek_table(frame_size); } let mut encoder = encoder.build().unwrap(); encoder.write_all(&data[..]).await.unwrap(); encoder.close().await.unwrap(); let mut decoder = AsyncZstdReader::builder_futures(futures::io::Cursor::new(&encoded[..])).build().unwrap().seekable(); let mut decoded = vec![]; decoder.seek(std::io::SeekFrom::End(-i64::try_from(pos).unwrap())).await.unwrap(); decoder.read_to_end(&mut decoded).await.unwrap(); assert_eq!(Hex(&decoded[..]), Hex(&data[data.len() - pos..])); }); } #[test] fn test_async_reader_futures_seek_then_decode( (data, [pos_1, pos_2]) in test_utils::arb_data_with_positions(), level in test_utils::arb_zstd_level(), frame_size in prop::option::of(test_utils::arb_frame_size()), seek_type in test_utils::arb_seek_type(), ) { futures::executor::block_on(async move { let mut encoded = vec![]; let mut encoder = AsyncZstdWriter::builder(&mut encoded).with_compression_level(level); if let Some(frame_size) = frame_size { encoder = encoder.with_seek_table(frame_size); } let mut encoder = encoder.build().unwrap(); encoder.write_all(&data[..]).await.unwrap(); encoder.close().await.unwrap(); let mut decoder = AsyncZstdReader::builder_futures(futures::io::Cursor::new(&encoded[..])).build().unwrap().seekable(); let mut decoded_1 = vec![0; pos_1]; decoder.read_exact(&mut decoded_1[..]).await.unwrap(); assert_eq!(Hex(&decoded_1[..]), Hex(&data[..pos_1])); let mut decoded_2 = vec![]; let seeked_pos = decoder.seek(seek_type.seek_from(pos_1, pos_2, data.len())).await.unwrap(); assert_eq!(seeked_pos, pos_2 as u64); decoder.read_to_end(&mut decoded_2).await.unwrap(); assert_eq!(Hex(&decoded_2[..]), Hex(&data[pos_2..])); }); } #[test] fn test_async_reader_futures_multiple_seeks( (data, [pos_1, pos_2, pos_3]) in test_utils::arb_data_with_positions(), level in test_utils::arb_zstd_level(), frame_size in prop::option::of(test_utils::arb_frame_size()), [seek_type_1, seek_type_2, seek_type_3] in prop::array::uniform(test_utils::arb_seek_type()), ) { futures::executor::block_on(async move { let mut encoded = vec![]; let mut encoder = AsyncZstdWriter::builder(&mut encoded).with_compression_level(level); if let Some(frame_size) = frame_size { encoder = encoder.with_seek_table(frame_size); } let mut encoder = encoder.build().unwrap(); encoder.write_all(&data[..]).await.unwrap(); encoder.close().await.unwrap(); let mut decoder = AsyncZstdReader::builder_futures(futures::io::Cursor::new(&encoded[..])).build().unwrap().seekable(); let mut decoded = vec![]; let seeked_pos = decoder.seek(seek_type_1.seek_from(0, pos_1, data.len())).await.unwrap(); assert_eq!(seeked_pos, pos_1 as u64); decoder.read_to_end(&mut decoded).await.unwrap(); assert_eq!(Hex(&decoded[..]), Hex(&data[pos_1..])); let mut decoded = vec![]; decoder.seek(std::io::SeekFrom::Start(seeked_pos)).await.unwrap(); let seeked_pos = decoder.seek(seek_type_2.seek_from(pos_1, pos_2, data.len())).await.unwrap(); assert_eq!(seeked_pos, pos_2 as u64); decoder.read_to_end(&mut decoded).await.unwrap(); assert_eq!(Hex(&decoded[..]), Hex(&data[pos_2..])); let mut decoded = vec![]; decoder.seek(std::io::SeekFrom::Start(seeked_pos)).await.unwrap(); let seeked_pos = decoder.seek(seek_type_3.seek_from(pos_2, pos_3, data.len())).await.unwrap(); assert_eq!(seeked_pos, pos_3 as u64); decoder.read_to_end(&mut decoded).await.unwrap(); assert_eq!(Hex(&decoded[..]), Hex(&data[pos_3..])); }); } #[test] fn test_async_reader_futures_seek_frame_boundary_without_table( (frames, pos) in test_utils::arb_data_framed_with_frame_boundary_pos(), level in test_utils::arb_zstd_level(), ) { futures::executor::block_on(async move { let mut encoded = vec![]; let mut encoder = AsyncZstdWriter::builder(&mut encoded) .with_compression_level(level) .build() .unwrap(); for frame in &frames { encoder.write_all(&frame[..]).await.unwrap(); encoder.finish_frame().unwrap(); } encoder.close().await.unwrap(); let (reader, watcher_actions) = test_utils::ReaderWatcher::new(futures::io::Cursor::new(&encoded[..])); let mut decoder = AsyncZstdReader::builder_futures(reader).build().unwrap().seekable(); let seeked_pos = decoder.seek(std::io::SeekFrom::Start(u64::try_from(pos).unwrap())).await.unwrap(); assert_eq!(seeked_pos, pos as u64); let actions = std::mem::take(&mut *watcher_actions.write().unwrap()); assert!(actions.iter().all(|action| matches!(action, ReaderAction::Read(_)))); }); } #[test] fn test_async_reader_futures_seek_frame_boundary_with_table( (frames, pos) in test_utils::arb_data_framed_with_frame_boundary_pos(), level in test_utils::arb_zstd_level(), ) { let data_len = frames.iter().map(|frame| frame.len()).sum::(); prop_assume!(pos > 0 && pos < data_len); futures::executor::block_on(async move { let mut encoded = vec![]; let mut encoder = AsyncZstdWriter::builder(&mut encoded) .with_compression_level(level) .with_seek_table(u32::MAX) .build() .unwrap(); for frame in &frames { encoder.write_all(&frame[..]).await.unwrap(); encoder.finish_frame().unwrap(); } encoder.close().await.unwrap(); let table = read_seek_table(&mut futures::io::Cursor::new(&encoded[..])).await.unwrap().unwrap(); let frame = table.frames() .find(|frame| frame.decompressed_range().start == u64::try_from(pos).unwrap()) .unwrap(); let (reader, watcher_actions) = test_utils::ReaderWatcher::new(futures::io::Cursor::new(&encoded[..])); let mut decoder = AsyncZstdReader::builder_futures(reader) .with_seek_table(table) .build() .unwrap() .seekable(); let seeked_pos = decoder.seek(std::io::SeekFrom::Start(u64::try_from(pos).unwrap())).await.unwrap(); assert_eq!(seeked_pos, pos as u64); let actions = std::mem::take(&mut *watcher_actions.write().unwrap()); assert_eq!( actions, [ ReaderAction::Seek(std::io::SeekFrom::Start(frame.compressed_range().start)) ], ); }); } }