#![feature(iter_next_chunk)] use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion}; use rayon::prelude::*; use stream_fusion::{IntoFusion, WindowIterator}; use tokio::task::spawn_blocking; #[inline(never)] async fn fusion_chain, const N: usize>(iter: I) -> Option { WindowIterator::<_, 1024>::from(iter) .fusion() .map(|item| item + 1) .map(|item| item + 1) .reduce::<_, N>(|r, item| r ^ item) .await .unwrap() } #[inline(never)] fn sync_chain>(iter: I) -> Option { iter.map(|item| item + 1) .map(|item| item + 1) .reduce(|r, item| r ^ item) } #[inline(never)] async fn rayon_chain>(mut iter: I) -> Option { let mut tasks = Vec::with_capacity(8); loop { match iter.next_chunk::<512>() { Ok(item) => tasks.push(spawn_blocking(move || { item.as_slice() .par_iter() .map(|item| item + 1) .map(|item| item + 1) .sum() })), Err(iter) => { let v = iter.collect::>(); tasks.push(spawn_blocking(move || { v.as_slice() .par_iter() .map(|item| item + 1) .map(|item| item + 1) .sum::() })); break; } } } let mut i = 0; for task in tasks { i += task.await.unwrap(); } Some(i) } fn fusion(c: &mut Criterion) { let range = 0..8192; let runtime = tokio::runtime::Builder::new_multi_thread() .worker_threads(8) .enable_all() .build() .unwrap(); c.bench_with_input( BenchmarkId::new("fusion with 32", range.len()), &range, |b, range| { b.to_async(&runtime) .iter(|| black_box(fusion_chain::<_, 512>(black_box(range.clone())))); }, ); } fn sync(c: &mut Criterion) { let range = 0..8192; let runtime = tokio::runtime::Builder::new_multi_thread() .worker_threads(8) .enable_all() .build() .unwrap(); c.bench_with_input(BenchmarkId::new("sync", range.len()), &range, |b, range| { b.to_async(&runtime) .iter(|| async { black_box(sync_chain(black_box(range.clone()))) }); }); } fn rayon(c: &mut Criterion) { let range = 0..8192; let runtime = tokio::runtime::Builder::new_multi_thread() .worker_threads(8) .enable_all() .build() .unwrap(); c.bench_with_input( BenchmarkId::new("rayon", range.len()), &range, |b, range| { b.to_async(&runtime) .iter(|| black_box(rayon_chain(black_box(range.clone())))); }, ); } criterion_group!(benches, fusion, sync, rayon); criterion_main!(benches);