#![allow(dead_code)] use criterion::{black_box, criterion_group, criterion_main, Criterion}; use loole::{bounded, unbounded, Receiver, RecvError, SendError, Sender}; use std::{error::Error, future::Future, thread, time::Instant}; use std::fmt::{Debug, Display}; const MESSAGES_NO: usize = 1_000_000; const MESSAGE_SIZE: usize = 256; const BUFFER_SIZE: Option = Some(100); type MsgType = StackType; pub struct Type(usize, T); impl From for Type { #[inline(always)] fn from(value: usize) -> Self { Self(value, Default::default()) } } impl Debug for Type { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.write_fmt(format_args!("index: {}, len: {}", self.0, self.1.len())) } } impl Display for Type { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.write_fmt(format_args!("index: {}, len: {}", self.0, self.1.len())) } } pub struct FilledArray([u8; SIZE]); impl Default for FilledArray { #[inline(always)] fn default() -> Self { Self([0; SIZE]) } } pub struct FilledVec(Vec); impl Default for FilledVec { #[inline(always)] fn default() -> Self { Self(vec![0; SIZE]) } } pub trait Length { fn len(&self) -> usize; } impl Length for FilledArray { #[inline(always)] fn len(&self) -> usize { self.0.len() } } impl Length for FilledVec { #[inline(always)] fn len(&self) -> usize { self.0.len() } } pub type StackType = Type>; pub type HeapType = Type>; pub struct BenchResult { pub throughput: usize, } #[derive(Debug)] pub enum BenchError { ZeroCapacityNotSupported, AsyncNotSupported, MpmcNotSupported, } impl Display for BenchError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { BenchError::ZeroCapacityNotSupported => write!(f, "zero capacity not supported"), BenchError::AsyncNotSupported => write!(f, "async not supported"), BenchError::MpmcNotSupported => write!(f, "mpmc not supported"), } } } impl Error for BenchError {} pub enum JoinHandle { Sync(thread::JoinHandle), Async(tokio::task::JoinHandle), } impl JoinHandle { pub async fn join(self) -> T { match self { JoinHandle::Sync(h) => h.join().unwrap(), JoinHandle::Async(h) => h.await.unwrap(), } } } pub async fn calculate_benchmark_result( senders: Vec>, receivers: Vec>, ) -> BenchResult { let t = Instant::now(); let mut sent_no = 0; for t in senders { sent_no += t.join().await; } let mut recv_no = 0; for t in receivers { recv_no += t.join().await; } let elapsed = t.elapsed(); assert_eq!(sent_no, recv_no); let throughput = (recv_no as f32 / elapsed.as_secs_f32()).round() as usize; BenchResult { throughput } } pub fn crate_name() -> &'static str { "loole" } pub fn send(tx: &Sender, msg: T) -> Result<(), SendError> { tx.send(msg) } pub fn recv(rx: &Receiver) -> Result { rx.recv() } pub fn send_async( tx: &Sender, msg: T, ) -> impl Future>> + '_ { tx.send_async(msg) } pub fn recv_async(rx: &Receiver) -> impl Future> + '_ { rx.recv_async() } async fn bench_helper( senders_no: usize, receivers_no: usize, cap: Option, msg_no: usize, create_sender: S, create_receiver: R, ) -> Result where T: From + Send + 'static, S: Fn(Sender, usize) -> JoinHandle, R: Fn(Receiver) -> JoinHandle, { let (tx, rx) = cap.map_or_else(unbounded::, bounded); let senders = create_senders(tx, senders_no, msg_no, create_sender); let receivers = create_receivers(rx, receivers_no, create_receiver); Ok(calculate_benchmark_result(senders, receivers).await) } fn create_senders( tx: Sender, senders_no: usize, msg_no: usize, create_sender: S, ) -> Vec> where S: Fn(Sender, usize) -> JoinHandle, { let mut senders = Vec::with_capacity(senders_no); for i in 0..senders_no { let n = msg_no / senders_no + if i < msg_no % senders_no { 1 } else { 0 }; senders.push(create_sender(tx.clone(), n)); } senders } fn create_receivers( rx: Receiver, receivers_no: usize, create_receiver: R, ) -> Vec> where R: Fn(Receiver) -> JoinHandle, { let mut receivers = Vec::with_capacity(receivers_no); for _ in 0..receivers_no { receivers.push(create_receiver(rx.clone())); } receivers } fn create_sync_sender(tx: Sender, n: usize) -> JoinHandle where T: From + Send + 'static, { JoinHandle::Sync(thread::spawn(move || { for k in 0..n { match send(&tx, k.into()) { Ok(_) => (), Err(_) => println!("error: channel closed at: {}", k), } } n })) } fn create_async_sender(tx: Sender, n: usize) -> JoinHandle where T: From + Send + 'static, { JoinHandle::Async(tokio::spawn(async move { for k in 0..n { match send_async(&tx, k.into()).await { Ok(_) => (), Err(_) => println!("error: channel closed at: {}", k), } } n })) } fn create_sync_receiver(rx: Receiver) -> JoinHandle where T: From + Send + 'static, { JoinHandle::Sync(thread::spawn(move || { let mut c = 0; loop { match recv(&rx) { Ok(_) => c += 1, Err(_) => break c, } } })) } fn create_async_receiver(rx: Receiver) -> JoinHandle where T: From + Send + 'static, { JoinHandle::Async(tokio::spawn(async move { let mut c = 0; loop { match recv_async(&rx).await { Ok(_) => c += 1, Err(_) => break c, } } })) } pub async fn bench_sync_sync( senders_no: usize, receivers_no: usize, buffer_size: Option, msg_no: usize, ) -> Result where T: From + Send + 'static, { bench_helper( senders_no, receivers_no, buffer_size, msg_no, create_sync_sender::, create_sync_receiver::, ) .await } pub async fn bench_async_async( senders_no: usize, receivers_no: usize, buffer_size: Option, msg_no: usize, ) -> Result where T: From + Send + Sync + 'static, { bench_helper( senders_no, receivers_no, buffer_size, msg_no, create_async_sender::, create_async_receiver::, ) .await } pub async fn bench_async_sync( senders_no: usize, receivers_no: usize, buffer_size: Option, msg_no: usize, ) -> Result where T: From + Send + Sync + 'static, { bench_helper( senders_no, receivers_no, buffer_size, msg_no, create_async_sender::, create_sync_receiver::, ) .await } pub async fn bench_sync_async( senders_no: usize, receivers_no: usize, buffer_size: Option, msg_no: usize, ) -> Result where T: From + Send + Sync + 'static, { bench_helper( senders_no, receivers_no, buffer_size, msg_no, create_sync_sender::, create_async_receiver::, ) .await } async fn async_select_recv_buffer_0(msg_no: usize) { let count = msg_no; let (tx1, rx1) = bounded(0); let (tx2, rx2) = bounded(0); tokio::spawn(async move { for i in (0..count).into_iter().filter(|n| n % 2 == 0) { tx1.send_async(i).await.unwrap(); } }); tokio::spawn(async move { for i in (0..count).into_iter().filter(|n| n % 2 == 1) { tx2.send_async(i).await.unwrap(); } }); let mut result = Vec::new(); loop { let n = tokio::select! { n = rx1.recv_async() => n, n = rx2.recv_async() => n, }; if let Ok(n) = n { result.push(n); if result.len() == count { break; } } } result.sort(); let expected = (0..count).collect::>(); assert_eq!(result.len(), expected.len()); assert_eq!(result, expected); } fn criterion_benchmarks(c: &mut Criterion) { let msg_no = black_box(MESSAGES_NO); let buffer_size = BUFFER_SIZE; let sample_size = 10; let rt = tokio::runtime::Builder::new_multi_thread() .enable_all() .build() .unwrap(); { let mut select = c.benchmark_group("select"); select.sample_size(sample_size); select.bench_function("async_select_recv_buffer_0", |b| { b.iter(|| rt.block_on(async_select_recv_buffer_0(msg_no))) }); } { let mut mpsc = c.benchmark_group("MPSC"); mpsc.sample_size(sample_size); mpsc.bench_function("5000_sync_1_sync", |b| { b.iter(|| { rt.block_on(bench_sync_sync::( black_box(5_000), black_box(1), black_box(buffer_size), msg_no, )) }) }); mpsc.bench_function("5000_async_1_async", |b| { b.iter(|| { rt.block_on(bench_async_async::( black_box(5_000), black_box(1), black_box(buffer_size), msg_no, )) }) }); mpsc.bench_function("5000_async_1_sync", |b| { b.iter(|| { rt.block_on(bench_async_sync::( black_box(5_000), black_box(1), black_box(buffer_size), msg_no, )) }) }); mpsc.bench_function("5000_sync_1_async", |b| { b.iter(|| { rt.block_on(bench_sync_async::( black_box(5_000), black_box(1), black_box(buffer_size), msg_no, )) }) }); } { let mut mpmc = c.benchmark_group("MPMC"); mpmc.sample_size(sample_size); mpmc.bench_function("5000_sync_10_sync", |b| { b.iter(|| { rt.block_on(bench_sync_sync::( black_box(5_000), black_box(10), black_box(buffer_size), msg_no, )) }) }); mpmc.bench_function("5000_async_10_async", |b| { b.iter(|| { rt.block_on(bench_async_async::( black_box(5_000), black_box(10), black_box(buffer_size), msg_no, )) }) }); mpmc.bench_function("5000_async_10_sync", |b| { b.iter(|| { rt.block_on(bench_async_sync::( black_box(5_000), black_box(10), black_box(buffer_size), msg_no, )) }) }); mpmc.bench_function("5000_sync_10_async", |b| { b.iter(|| { rt.block_on(bench_sync_async::( black_box(5_000), black_box(10), black_box(buffer_size), msg_no, )) }) }); } { let mut spsc = c.benchmark_group("SPSC"); spsc.sample_size(sample_size); spsc.bench_function("1_sync_1_sync", |b| { b.iter(|| { rt.block_on(bench_sync_sync::( black_box(1), black_box(1), black_box(buffer_size), msg_no, )) }) }); spsc.bench_function("1_async_1_async", |b| { b.iter(|| { rt.block_on(bench_async_async::( black_box(1), black_box(1), black_box(buffer_size), msg_no, )) }) }); spsc.bench_function("1_async_1_sync", |b| { b.iter(|| { rt.block_on(bench_async_sync::( black_box(1), black_box(1), black_box(buffer_size), msg_no, )) }) }); spsc.bench_function("1_sync_1_async", |b| { b.iter(|| { rt.block_on(bench_sync_async::( black_box(1), black_box(1), black_box(buffer_size), msg_no, )) }) }); } } criterion_group!(benches, criterion_benchmarks); criterion_main!(benches);