use std::io::Cursor; use futures::io::Cursor as AsyncCursor; use futures::SinkExt; use re_arrow2::array::Array; use re_arrow2::chunk::Chunk; use re_arrow2::datatypes::Schema; use re_arrow2::error::Result; use re_arrow2::io::ipc::read; use re_arrow2::io::ipc::write::file_async::FileSink; use re_arrow2::io::ipc::write::WriteOptions; use re_arrow2::io::ipc::IpcField; use crate::io::ipc::common::read_arrow_stream; use crate::io::ipc::common::read_gzip_json; async fn write_( schema: &Schema, ipc_fields: &[IpcField], batches: &[Chunk>], ) -> Result> { let mut result = AsyncCursor::new(vec![]); let options = WriteOptions { compression: None }; let mut sink = FileSink::new( &mut result, schema.clone(), Some(ipc_fields.to_vec()), options, ); for batch in batches { sink.feed((batch, Some(ipc_fields)).into()).await?; } sink.close().await?; drop(sink); Ok(result.into_inner()) } async fn test_file(version: &str, file_name: &str) -> Result<()> { let (schema, ipc_fields, batches) = read_arrow_stream(version, file_name, None); let result = write_(&schema, &ipc_fields, &batches).await?; let mut reader = Cursor::new(result); let metadata = read::read_file_metadata(&mut reader)?; let reader = read::FileReader::new(reader, metadata, None, None); let schema = &reader.metadata().schema; let ipc_fields = reader.metadata().ipc_schema.fields.clone(); // read expected JSON output let (expected_schema, expected_ipc_fields, expected_batches) = read_gzip_json(version, file_name).unwrap(); assert_eq!(schema, &expected_schema); assert_eq!(ipc_fields, expected_ipc_fields); let batches = reader.collect::>>()?; assert_eq!(batches, expected_batches); Ok(()) } #[tokio::test] async fn write_async() -> Result<()> { test_file("1.0.0-littleendian", "generated_primitive").await }