#![allow(clippy::unwrap_used)]
use std::sync::Arc;
use std::time::Duration;

use criterion::async_executor::FuturesExecutor;
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use futures_executor::block_on;
use futures_util::io::Cursor;
use futures_util::{pin_mut, TryStreamExt};
use itertools::Itertools;
use vortex_array::array::{ChunkedArray, PrimitiveArray};
use vortex_array::stream::ArrayStreamExt;
use vortex_array::validity::Validity;
use vortex_array::{Context, IntoArray};
use vortex_serde::io::FuturesAdapter;
use vortex_serde::stream_reader::StreamArrayReader;
use vortex_serde::stream_writer::StreamArrayWriter;

// 100 record batches, 100k rows each
// take from the first 20 batches and last batch
// compare with arrow
fn ipc_array_reader_take(c: &mut Criterion) {
    let ctx = Arc::new(Context::default());

    let indices = (0..20)
        .map(|i| i * 100_000 + 1)
        .chain([98 * 100_000 + 1])
        .collect_vec();
    let mut group = c.benchmark_group("ipc_array_reader_take");

    group.bench_function("vortex", |b| {
        let array = ChunkedArray::from_iter(
            (0..100i32)
                .map(|i| vec![i; 100_000])
                .map(|vec| PrimitiveArray::from_vec(vec, Validity::AllValid).into_array()),
        )
        .into_array();

        let buffer = block_on(async { StreamArrayWriter::new(vec![]).write_array(array).await })
            .unwrap()
            .into_inner();

        let indices = indices.clone().into_array();

        b.to_async(FuturesExecutor).iter(|| async {
            let stream_reader =
                StreamArrayReader::try_new(FuturesAdapter(Cursor::new(&buffer)), ctx.clone())
                    .await
                    .unwrap()
                    .load_dtype()
                    .await
                    .unwrap();
            let stream = stream_reader
                .into_array_stream()
                .take_rows(indices.clone())
                .unwrap();
            pin_mut!(stream);

            while let Some(arr) = stream.try_next().await.unwrap() {
                black_box(arr);
            }
        });
    });
}

criterion_group!(
    name = benches;
    config = Criterion::default().measurement_time(Duration::from_secs(10));
    targets = ipc_array_reader_take
);
criterion_main!(benches);