#![allow(clippy::unwrap_used)] use std::sync::Arc; use std::time::Duration; use arrow_array::{Array, Int32Array, RecordBatch}; use arrow_ipc::reader::StreamReader; use arrow_ipc::writer::{IpcWriteOptions, StreamWriter as ArrowStreamWriter}; use arrow_ipc::{CompressionType, MetadataVersion}; use arrow_schema::{DataType, Field, Schema}; use criterion::async_executor::FuturesExecutor; use criterion::{black_box, criterion_group, criterion_main, Criterion}; use futures_executor::block_on; use futures_util::io::Cursor; use futures_util::{pin_mut, TryStreamExt}; use itertools::Itertools; use vortex_array::array::PrimitiveArray; use vortex_array::compress::CompressionStrategy; use vortex_array::compute::take; use vortex_array::{Context, IntoArray}; use vortex_sampling_compressor::SamplingCompressor; use vortex_serde::io::FuturesAdapter; use vortex_serde::stream_reader::StreamArrayReader; use vortex_serde::stream_writer::StreamArrayWriter; fn ipc_take(c: &mut Criterion) { let mut group = c.benchmark_group("ipc_take"); let indices = Int32Array::from(vec![10, 11, 12, 13, 100_000, 2_999_999]); group.bench_function("arrow", |b| { let mut buffer = vec![]; { let field = Field::new("uid", DataType::Int32, true); let schema = Schema::new(vec![field]); let options = IpcWriteOptions::try_new(32, false, MetadataVersion::V5) .unwrap() .try_with_compression(Some(CompressionType::LZ4_FRAME)) .unwrap(); let mut writer = ArrowStreamWriter::try_new_with_options(&mut buffer, &schema, options).unwrap(); let array = Int32Array::from((0i32..3_000_000).rev().collect_vec()); let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap(); writer.write(&batch).unwrap(); } b.iter(|| { let mut cursor = std::io::Cursor::new(&buffer); let mut reader = StreamReader::try_new(&mut cursor, None).unwrap(); let batch = reader.next().unwrap().unwrap(); let array_from_batch = batch.column(0); let array = array_from_batch .as_any() .downcast_ref::<Int32Array>() .unwrap(); black_box(arrow_select::take::take(array, &indices, None).unwrap()); }); }); group.bench_function("vortex", |b| { let indices = PrimitiveArray::from(vec![10, 11, 12, 13, 100_000, 2_999_999]).into_array(); let uncompressed = PrimitiveArray::from((0i32..3_000_000).rev().collect_vec()).into_array(); let ctx = Context::default(); let compressor: &dyn CompressionStrategy = &SamplingCompressor::default(); let compressed = compressor.compress(&uncompressed).unwrap(); // Try running take over an ArrayView. let buffer = block_on(async { StreamArrayWriter::new(vec![]).write_array(compressed).await }) .unwrap() .into_inner(); let ctx_ref = &Arc::new(ctx); let ro_buffer = buffer.as_slice(); let indices_ref = &indices; b.to_async(FuturesExecutor).iter(|| async move { let stream_reader = StreamArrayReader::try_new(FuturesAdapter(Cursor::new(ro_buffer)), ctx_ref.clone()) .await? .load_dtype() .await?; let reader = stream_reader.into_array_stream(); pin_mut!(reader); let array_view = reader.try_next().await?.unwrap(); black_box(take(&array_view, indices_ref)) }); }); } criterion_group!( name = benches; config = Criterion::default().measurement_time(Duration::from_secs(10)); targets = ipc_take); criterion_main!(benches);