// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #[macro_use] extern crate criterion; use criterion::{BatchSize, Criterion}; extern crate arrow; extern crate datafusion; use std::sync::Arc; use arrow::{ array::{ArrayRef, Int64Array, StringArray}, record_batch::RecordBatch, }; use tokio::runtime::Runtime; use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use datafusion::physical_plan::{ collect, expressions::{col, PhysicalSortExpr}, memory::MemoryExec, }; use datafusion::prelude::SessionContext; use datafusion_physical_expr_common::sort_expr::LexOrdering; // Initialise the operator using the provided record batches and the sort key // as inputs. All record batches must have the same schema. fn sort_preserving_merge_operator( session_ctx: Arc, batches: Vec, sort: &[&str], ) { let schema = batches[0].schema(); let sort = sort .iter() .map(|name| PhysicalSortExpr { expr: col(name, &schema).unwrap(), options: Default::default(), }) .collect::(); let exec = MemoryExec::try_new( &batches.into_iter().map(|rb| vec![rb]).collect::>(), schema, None, ) .unwrap(); let merge = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec))); let task_ctx = session_ctx.task_ctx(); let rt = Runtime::new().unwrap(); rt.block_on(collect(merge, task_ctx)).unwrap(); } // Produces `n` record batches of row size `m`. Each record batch will have // identical contents except for if the `batch_offset` is set. In that case the // values for column "d" in each subsequent record batch will be offset in // value. // // The `rows_per_key` value controls how many rows are generated per "key", // which is defined as columns a, b and c. fn batches( n: usize, m: usize, rows_per_sort_key: usize, batch_offset: usize, ) -> Vec { let mut rbs = Vec::with_capacity(n); let mut curr_batch_offset = 0; for _ in 0..n { let mut col_a = Vec::with_capacity(m); let mut col_b = Vec::with_capacity(m); let mut col_c = Vec::with_capacity(m); let mut col_d = Vec::with_capacity(m); let mut j = 0; let mut current_rows_per_sort_key = 0; for i in 0..m { if current_rows_per_sort_key == rows_per_sort_key { current_rows_per_sort_key = 0; j = i; } col_a.push(Some(format!("a-{j:?}"))); col_b.push(Some(format!("b-{j:?}"))); col_c.push(Some(format!("c-{j:?}"))); col_d.push(Some((i + curr_batch_offset) as i64)); current_rows_per_sort_key += 1; } col_a.sort(); col_b.sort(); col_c.sort(); let col_a: ArrayRef = Arc::new(StringArray::from_iter(col_a)); let col_b: ArrayRef = Arc::new(StringArray::from_iter(col_b)); let col_c: ArrayRef = Arc::new(StringArray::from_iter(col_c)); let col_d: ArrayRef = Arc::new(Int64Array::from(col_d)); let rb = RecordBatch::try_from_iter(vec![ ("a", col_a), ("b", col_b), ("c", col_c), ("d", col_d), ]) .unwrap(); rbs.push(rb); curr_batch_offset += batch_offset; } rbs } fn criterion_benchmark(c: &mut Criterion) { let small_batch = batches(1, 100, 10, 0).remove(0); let large_batch = batches(1, 1000, 1, 0).remove(0); let benches = vec![ // Two batches with identical rows. They will need to be merged together // with one row from each batch being taken until both batches are // drained. ("interleave_batches", batches(2, 1000, 10, 1)), // Two batches with a small overlapping region of rows for each unique // sort key. ("merge_batches_some_overlap_small", batches(2, 1000, 10, 5)), // Two batches with a large overlapping region of rows for each unique // sort key. ( "merge_batches_some_overlap_large", batches(2, 1000, 250, 125), ), // Two batches with no overlapping region of rows for each unique // sort key. For a given unique sort key all rows are drained from one // batch, then all the rows for the same key from the second batch. // This repeats until all rows are drained. There are a small number of // rows (10) for each unique sort key. ("merge_batches_no_overlap_small", batches(2, 1000, 10, 12)), // As above but this time there are a larger number of rows (250) for // each unique sort key - still no overlaps. ("merge_batches_no_overlap_large", batches(2, 1000, 250, 252)), // Merges two batches where one batch is significantly larger than the // other. ( "merge_batches_small_into_large", vec![large_batch, small_batch], ), ]; let ctx = Arc::new(SessionContext::new()); for (name, input) in benches { let ctx_clone = ctx.clone(); c.bench_function(name, move |b| { b.iter_batched( || input.clone(), |input| { sort_preserving_merge_operator( ctx_clone.clone(), input, &["a", "b", "c", "d"], ); }, BatchSize::LargeInput, ) }); } } criterion_group!(benches, criterion_benchmark); criterion_main!(benches);