macro_rules! create_two_threads_benchmark { ($($id:literal, $create:expr, $push:expr, $pop:expr);+) => { use std::convert::TryInto as _; use std::sync::{Arc, Barrier}; use criterion::{black_box, criterion_group, criterion_main}; fn help_with_type_inference(create: Create, push: Push, pop: Pop) -> (Create, Push, Pop) where Create: Fn(usize) -> (P, C), Push: Fn(&mut P, u8) -> bool, Pop: Fn(&mut C) -> Option, { (create, push, pop) } #[allow(unused)] fn criterion_benchmark(criterion: &mut criterion::Criterion) { $( let (create, push, pop) = help_with_type_inference($create, $push, $pop); // Just a quick check if the ring buffer works as expected: let (mut p, mut c) = create(2); assert!(pop(&mut c).is_none()); assert!(push(&mut p, 1)); assert!(push(&mut p, 2)); assert!(!push(&mut p, 3)); assert_eq!(pop(&mut c).unwrap(), 1); assert_eq!(pop(&mut c).unwrap(), 2); assert!(pop(&mut c).is_none()); )+ let mut group_large = criterion.benchmark_group("two-threads-large"); group_large.throughput(criterion::Throughput::Bytes(1)); $( group_large.bench_function($id, |b| { b.iter_custom(|iters| { let (create, push, pop) = help_with_type_inference($create, $push, $pop); // Queue is so long that there is no contention between threads. let (mut p, mut c) = create((2 * iters).try_into().unwrap()); for i in 0..iters { push(&mut p, i as u8); } let barrier = Arc::new(Barrier::new(3)); let push_thread = { let barrier = Arc::clone(&barrier); std::thread::spawn(move || { barrier.wait(); let start_pushing = std::time::Instant::now(); for i in 0..iters { // NB: This conversion truncates: push(&mut p, i as u8); } let stop_pushing = std::time::Instant::now(); (start_pushing, stop_pushing) }) }; let trigger_thread = { let barrier = Arc::clone(&barrier); std::thread::spawn(move || { // Try to force other threads to go to sleep on barrier. std::thread::yield_now(); std::thread::yield_now(); std::thread::yield_now(); barrier.wait(); // Hopefully, the other two threads now wake up at the same time. }) }; barrier.wait(); let start_popping = std::time::Instant::now(); for _ in 0..iters { black_box(pop(&mut c)); } let stop_popping = std::time::Instant::now(); let (start_pushing, stop_pushing) = push_thread.join().unwrap(); trigger_thread.join().unwrap(); let total = stop_pushing .max(stop_popping) .duration_since(start_pushing.min(start_popping)); /* if start_pushing < start_popping { println!( "popping started {:?} after pushing", start_popping.duration_since(start_pushing) ); } else { println!( "pushing started {:?} after popping", start_pushing.duration_since(start_popping) ); } */ // The goal is that both threads are finished at around the same time. // This can be checked with the following output. /* if stop_pushing < stop_popping { let diff = stop_popping.duration_since(stop_pushing); println!( "popping stopped {diff:?} after pushing ({:.1}% of total time)", (diff.as_secs_f64() / total.as_secs_f64()) * 100.0 ); } else { let diff = stop_pushing.duration_since(stop_popping); println!( "pushing stopped {diff:?} after popping ({:.1}% of total time)", (diff.as_secs_f64() / total.as_secs_f64()) * 100.0 ); } */ #[allow(clippy::let_and_return)] total }); }); )+ group_large.finish(); let mut group_small = criterion.benchmark_group("two-threads-small"); group_small.throughput(criterion::Throughput::Bytes(1)); $( group_small.bench_function($id, |b| { b.iter_custom(|iters| { let (create, push, pop) = help_with_type_inference($create, $push, $pop); // Queue is very short in order to force a lot of contention between threads. let (mut p, mut c) = create(2); let push_thread = { std::thread::spawn(move || { // The timing starts once both threads are ready. let start = std::time::Instant::now(); for i in 0..iters { while !push(&mut p, i as u8) { std::hint::spin_loop(); } } start }) }; // While the second thread is still starting up, this thread will busy-wait. for i in 0..iters { loop { if let Some(x) = pop(&mut c) { assert_eq!(x, i as u8); break; } std::hint::spin_loop(); } } // The timing stops once all items have been received. let stop = std::time::Instant::now(); let start = push_thread.join().unwrap(); stop.duration_since(start) }); }); )+ group_small.finish(); } criterion_group!(benches, criterion_benchmark); criterion_main!(benches); }; } create_two_threads_benchmark!( "rtrb", rtrb::RingBuffer::new, |p, i| p.push(i).is_ok(), |c| c.pop().ok() );