#![cfg(nightly)] #![feature(test)] #![allow(clippy::suspicious_map)] extern crate test; use arrow_parquet::file::reader::{FileReader, SerializedFileReader}; use once_cell::sync::Lazy; use std::{fs, fs::File, future::Future, path::PathBuf}; use test::Bencher; use tokio::runtime::Runtime; use amadeus::prelude::*; static RT: Lazy = Lazy::new(|| { tokio::runtime::Builder::new() .threaded_scheduler() .enable_all() .build() .unwrap() }); static POOL: Lazy = Lazy::new(|| ThreadPool::new(None, None).unwrap()); #[derive(Data, Clone, PartialEq, Debug)] struct TenKayVeeTwo { binary_field: List, int32_field: i32, int64_field: i64, boolean_field: bool, float_field: f32, double_field: f64, flba_field: List, // [u8;1024], int96_field: DateTime, } #[derive(Data, Clone, PartialEq, Debug)] struct StockSimulated { bp1: Option, bp2: Option, bp3: Option, bp4: Option, bp5: Option, bs1: Option, bs2: Option, bs3: Option, bs4: Option, bs5: Option, ap1: Option, ap2: Option, ap3: Option, ap4: Option, ap5: Option, as1: Option, as2: Option, as3: Option, as4: Option, as5: Option, valid: Option, __index_level_0__: Option, } #[bench] fn parquet_10k(b: &mut Bencher) { let file = "amadeus-testing/parquet/10k-v2.parquet"; // 669,034 bytes run(b, file, || async { let rows = Parquet::<_, TenKayVeeTwo>::new(PathBuf::from(file)) .await .unwrap(); assert_eq!( rows.par_stream() .map(|row: Result<_, _>| row.unwrap()) .count(&*POOL) .await, 10_000 ); }) } #[bench] fn parquet_stock(b: &mut Bencher) { let file = "amadeus-testing/parquet/stock_simulated.parquet"; // 1,289,419 bytes run(b, file, || async { let rows = Parquet::<_, StockSimulated>::new(PathBuf::from(file)) .await .unwrap(); assert_eq!( rows.par_stream() .map(|row: Result<_, _>| row.unwrap()) .count(&*POOL) .await, 42_000 ); }) } #[bench] fn parquet_10k_arrow(b: &mut Bencher) { let file = "amadeus-testing/parquet/10k-v2.parquet"; // 669,034 bytes run(b, file, || async { let parquet_reader = SerializedFileReader::new(File::open(file).unwrap()).unwrap(); assert_eq!(parquet_reader.get_row_iter(None).unwrap().count(), 10_000); }) } #[bench] fn parquet_stock_arrow(b: &mut Bencher) { let file = "amadeus-testing/parquet/stock_simulated.parquet"; // 1,289,419 bytes run(b, file, || async { let parquet_reader = SerializedFileReader::new(File::open(file).unwrap()).unwrap(); assert_eq!(parquet_reader.get_row_iter(None).unwrap().count(), 42_000); }) } fn run(b: &mut Bencher, file: &str, mut task: impl FnMut() -> F) where F: Future, { RT.enter(|| { let _ = Lazy::force(&POOL); b.bytes = fs::metadata(file).unwrap().len(); b.iter(|| RT.handle().block_on(task())) }) }