#[macro_use] extern crate criterion; use criterion::{Bencher, Criterion}; use hyperbridge::channel; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::Relaxed; use std::sync::Arc; use std::thread; const VALUES: usize = 10000; const THREADS: usize = 16; fn test_hyperbridge_mpsc(b: &mut Bencher) { b.iter(|| { let (sender, receiver) = channel::new(); let mut counter = 0; let mut handles = vec![]; for i in 0..THREADS { let ch = sender.clone(); let jh = thread::spawn(move || { for _ in 0..VALUES { ch.send(i).unwrap(); } }); handles.push(jh); } let mut iters = THREADS * VALUES; while iters > 0 { match receiver.try_recv() { Ok(Some(v)) => { counter += v as usize; iters -= 1; } _ => {} } } let total: usize = (0..THREADS).map(|i| i * VALUES).sum(); for jh in handles.drain(..) { let _ = jh.join(); } assert_eq!(counter, total); }); } fn test_crossbeam_mpsc(b: &mut Bencher) { b.iter(|| { let (sender, receiver) = crossbeam::channel::unbounded(); let mut counter = 0; let mut handles = vec![]; for i in 0..THREADS { let ch = sender.clone(); let jh = thread::spawn(move || { for _ in 0..VALUES { ch.send(i).unwrap(); } }); handles.push(jh); } let mut iters = THREADS * VALUES; while iters > 0 { match receiver.try_recv() { Ok(v) => { counter += v as usize; iters -= 1; } _ => {} } } let total: usize = (0..THREADS).map(|i| i * VALUES).sum(); for jh in handles.drain(..) { let _ = jh.join(); } assert_eq!(counter, total); }); } fn test_hyperbridge_mpmc(b: &mut Bencher) { b.iter(|| { let (sender, receiver) = channel::new(); let counter = Arc::new(AtomicUsize::new(0)); let mut handles = vec![]; for i in 0..THREADS { let ch = sender.clone(); let jh = thread::spawn(move || { for _ in 0..VALUES { ch.send(i).unwrap(); } }); handles.push(jh); } for _ in 0..THREADS { let ch = receiver.clone(); let local_counter = counter.clone(); let jh = thread::spawn(move || { let mut iters = VALUES; while iters > 0 { if let Ok(Some(v)) = ch.try_recv() { local_counter.fetch_add(v as usize, Relaxed); iters -= 1; } } }); handles.push(jh); } for jh in handles.drain(..) { let _ = jh.join(); } let total: usize = (0..THREADS).map(|i| i * VALUES).sum(); assert_eq!(counter.load(Relaxed), total); }); } fn test_crossbeam_mpmc(b: &mut Bencher) { b.iter(|| { let (sender, receiver) = crossbeam::channel::unbounded(); let counter = Arc::new(AtomicUsize::new(0)); let mut handles = vec![]; for i in 0..THREADS { let ch = sender.clone(); let jh = thread::spawn(move || { for _ in 0..VALUES { ch.send(i).unwrap(); } }); handles.push(jh); } for _ in 0..THREADS { let ch = receiver.clone(); let local_counter = counter.clone(); let jh = thread::spawn(move || { let mut iters = VALUES; while iters > 0 { if let Ok(v) = ch.try_recv() { local_counter.fetch_add(v as usize, Relaxed); iters -= 1; } } }); handles.push(jh); } for jh in handles.drain(..) { let _ = jh.join(); } let total: usize = (0..THREADS).map(|i| i * VALUES).sum(); assert_eq!(counter.load(Relaxed), total); }); } fn hyperbridge_channel(b: &mut Criterion) { b.bench_function("hyperbridge-mpsc", |b| test_hyperbridge_mpsc(b)); b.bench_function("hyperbridge-mpmc", |b| test_hyperbridge_mpmc(b)); } fn crossbeam_channel(b: &mut Criterion) { b.bench_function("crossbeam-mpsc", |b| test_crossbeam_mpsc(b)); b.bench_function("crossbeam-mpmc", |b| test_crossbeam_mpmc(b)); } criterion_group!(compare, hyperbridge_channel, crossbeam_channel); criterion_main!(compare);