use clap::Parser; use clap::Subcommand; use futures::future::BoxFuture; use futures::FutureExt; use lazy_static::lazy_static; use std::future::Future; use std::time::Duration; use std::time::Instant; trait Channel { const HAS_BATCH: bool; type Sender: ChannelSender + 'static; type Receiver: ChannelReceiver + 'static; fn bounded(capacity: usize) -> (Self::Sender, Self::Receiver); } trait ChannelSync { const HAS_BATCH: bool; type SyncSender: ChannelSyncSender + 'static; type SyncReceiver: ChannelSyncReceiver + 'static; fn bounded_sync( capacity: usize, ) -> (Self::SyncSender, Self::SyncReceiver); } trait ChannelSender: Clone + Send { type BatchSender: ChannelBatchSender; fn autobatch(self, batch_limit: usize, f: F) -> impl Future + Send where for<'a> F: (FnOnce(&'a mut Self::BatchSender) -> BoxFuture<'a, ()>) + Send + 'static; } trait ChannelBatchSender: Send { fn send(&mut self, value: T) -> impl Future + Send; } trait ChannelReceiver: Clone + Send { fn recv_vec<'a>( &'a self, element_limit: usize, vec: &'a mut Vec, ) -> impl Future + Send; } trait ChannelSyncSender: Clone + Send { type BatchSenderSync<'a>: ChannelBatchSenderSync where T: 'a; fn autobatch<'a, F>(&'a mut self, batch_limit: usize, f: F) where F: FnOnce(&mut Self::BatchSenderSync<'a>); } trait ChannelBatchSenderSync: Send { fn send(&mut self, value: T); } trait ChannelSyncReceiver: Clone + Send { fn recv_vec(&self, element_limit: usize, vec: &mut Vec); } // batch-channel, this crate struct BatchChannel; impl Channel for BatchChannel { const HAS_BATCH: bool = true; type Sender = batch_channel::Sender; type Receiver = batch_channel::Receiver; fn bounded(capacity: usize) -> (Self::Sender, Self::Receiver) { batch_channel::bounded(capacity) } } impl ChannelSync for BatchChannel { const HAS_BATCH: bool = true; type SyncSender = batch_channel::SyncSender; type SyncReceiver = batch_channel::SyncReceiver; fn bounded_sync( capacity: usize, ) -> (Self::SyncSender, Self::SyncReceiver) { batch_channel::bounded_sync(capacity) } } impl ChannelSender for batch_channel::Sender { type BatchSender = batch_channel::BatchSender; fn autobatch(self, batch_limit: usize, f: F) -> impl Future + Send where for<'a> F: (FnOnce(&'a mut Self::BatchSender) -> BoxFuture<'a, ()>) + Send + 'static, { async move { batch_channel::Sender::autobatch(self, batch_limit, |tx| { async move { () = f(tx).await; Ok(()) } .boxed() }) .await .expect("in this benchmark, receiver never drops") } } } impl ChannelBatchSender for batch_channel::BatchSender { fn send(&mut self, value: T) -> impl Future + Send { async move { batch_channel::BatchSender::send(self, value) .await .expect("in this benchmark, receiver never drops") } } } impl ChannelReceiver for batch_channel::Receiver { fn recv_vec<'a>( &'a self, element_limit: usize, vec: &'a mut Vec, ) -> impl Future + Send { batch_channel::Receiver::recv_vec(self, element_limit, vec) } } impl ChannelSyncSender for batch_channel::SyncSender { type BatchSenderSync<'a> = batch_channel::SyncBatchSender<'a, T> where T: 'a; fn autobatch<'a, F>(&'a mut self, batch_limit: usize, f: F) where F: FnOnce(&mut Self::BatchSenderSync<'a>), { batch_channel::SyncSender::autobatch(self, batch_limit, |tx| { f(tx); Ok(()) }) .expect("in this benchmark, receiver never drops") } } impl<'a, T: Send> ChannelBatchSenderSync for batch_channel::SyncBatchSender<'a, T> { fn send(&mut self, value: T) { batch_channel::SyncBatchSender::send(self, value) .expect("in this benchmark, receiver never drops") } } impl ChannelSyncReceiver for batch_channel::SyncReceiver { fn recv_vec(&self, element_limit: usize, vec: &mut Vec) { batch_channel::SyncReceiver::recv_vec(self, element_limit, vec) } } // Kanal struct KanalChannel; impl Channel for KanalChannel { const HAS_BATCH: bool = false; type Sender = kanal::AsyncSender; type Receiver = kanal::AsyncReceiver; fn bounded(capacity: usize) -> (Self::Sender, Self::Receiver) { kanal::bounded_async(capacity) } } impl ChannelSender for kanal::AsyncSender { type BatchSender = kanal::AsyncSender; fn autobatch(mut self, _batch_limit: usize, f: F) -> impl Future + Send where for<'a> F: (FnOnce(&'a mut Self::BatchSender) -> BoxFuture<'a, ()>) + Send + 'static, { async move { f(&mut self).await; } } } impl ChannelBatchSender for kanal::AsyncSender { fn send(&mut self, value: T) -> impl Future + Send { async move { kanal::AsyncSender::send(self, value) .await .expect("in this benchmark, receiver never drops") } } } impl ChannelReceiver for kanal::AsyncReceiver { fn recv_vec<'a>( &'a self, element_limit: usize, vec: &'a mut Vec, ) -> impl Future + Send { async move { let Ok(value) = self.recv().await else { return; }; vec.push(value); // Now try to read the rest. for _ in 0..element_limit { let Ok(Some(value)) = self.try_recv() else { return; }; vec.push(value); } } } } impl ChannelSync for KanalChannel { const HAS_BATCH: bool = false; type SyncSender = kanal::Sender; type SyncReceiver = kanal::Receiver; fn bounded_sync( capacity: usize, ) -> (Self::SyncSender, Self::SyncReceiver) { kanal::bounded(capacity) } } impl ChannelSyncSender for kanal::Sender { type BatchSenderSync<'a> = kanal::Sender where T: 'a; fn autobatch<'a, F>(&'a mut self, _batch_limit: usize, f: F) where F: FnOnce(&mut Self::BatchSenderSync<'a>), { f(self); } } impl ChannelBatchSenderSync for kanal::Sender { fn send(&mut self, value: T) { kanal::Sender::send(self, value).expect("in this benchmark, receiver never drops") } } impl ChannelSyncReceiver for kanal::Receiver { fn recv_vec(&self, element_limit: usize, vec: &mut Vec) { let Ok(value) = self.recv() else { return; }; vec.push(value); // Now try to read the rest. for _ in 1..element_limit { let Ok(Some(value)) = self.try_recv() else { return; }; vec.push(value); } } } // Crossbeam struct CrossbeamChannel; impl ChannelSync for CrossbeamChannel { const HAS_BATCH: bool = false; type SyncSender = crossbeam::channel::Sender; type SyncReceiver = crossbeam::channel::Receiver; fn bounded_sync( capacity: usize, ) -> (Self::SyncSender, Self::SyncReceiver) { crossbeam::channel::bounded(capacity) } } impl ChannelSyncSender for crossbeam::channel::Sender { type BatchSenderSync<'a> = crossbeam::channel::Sender where T: 'a; fn autobatch<'a, F>(&'a mut self, _batch_limit: usize, f: F) where F: FnOnce(&mut Self::BatchSenderSync<'a>), { f(self); } } impl ChannelBatchSenderSync for crossbeam::channel::Sender { fn send(&mut self, value: T) { crossbeam::channel::Sender::send(self, value) .expect("in this benchmark, receiver never drops") } } impl ChannelSyncReceiver for crossbeam::channel::Receiver { fn recv_vec(&self, element_limit: usize, vec: &mut Vec) { let Ok(value) = self.recv() else { return; }; vec.push(value); // Now try to read the rest. for _ in 1..element_limit { let Ok(value) = self.try_recv() else { return; }; vec.push(value); } } } // async-channel struct AsyncChannel; impl Channel for AsyncChannel { const HAS_BATCH: bool = false; type Sender = async_channel::Sender; type Receiver = async_channel::Receiver; fn bounded(capacity: usize) -> (Self::Sender, Self::Receiver) { async_channel::bounded(capacity) } } impl ChannelSender for async_channel::Sender { type BatchSender = async_channel::Sender; fn autobatch(mut self, _batch_limit: usize, f: F) -> impl Future + Send where for<'a> F: (FnOnce(&'a mut Self::BatchSender) -> BoxFuture<'a, ()>) + Send + 'static, { async move { f(&mut self).await; } } } impl ChannelBatchSender for async_channel::Sender { fn send(&mut self, value: T) -> impl Future + Send { async move { async_channel::Sender::send(self, value) .await .expect("in this benchmark, receiver never drops") } } } impl ChannelReceiver for async_channel::Receiver { fn recv_vec<'a>( &'a self, element_limit: usize, vec: &'a mut Vec, ) -> impl Future + Send { async move { let Ok(value) = self.recv().await else { return; }; vec.push(value); // Now try to read the rest. for _ in 1..element_limit { let Ok(value) = self.try_recv() else { return; }; vec.push(value); } } } } // Benchmark #[derive(Copy, Clone)] struct Options { tx_batch_size: usize, rx_batch_size: usize, tx_count: usize, rx_count: usize, } struct Timings { total: Duration, per_item: Duration, } impl Timings { fn print(&self) { println!("{:?}, {:?} per item", self.total, self.per_item,) } } async fn benchmark_throughput_async(_: C, options: Options) -> Timings { const CAPACITY: usize = 65536; let send_count: usize = 2 * 1024 * 1024 * (if C::HAS_BATCH { options.tx_batch_size } else { 1 }); let total_items = send_count * options.tx_count; let mut senders = Vec::with_capacity(options.tx_count); let mut receivers = Vec::with_capacity(options.rx_count); let now = Instant::now(); let (tx, rx) = C::bounded(CAPACITY); for task_id in 0..options.tx_count { let tx = tx.clone(); senders.push(tokio::spawn( async move { tx.autobatch(options.tx_batch_size, move |tx| { async move { for i in 0..send_count { tx.send((task_id, i)).await; } } .boxed() }) .await; } .boxed(), )); } drop(tx); for _ in 0..options.rx_count { let rx = rx.clone(); receivers.push(tokio::spawn( async move { let mut batch = Vec::with_capacity(options.rx_batch_size); loop { batch.clear(); rx.recv_vec(options.rx_batch_size, &mut batch).await; if batch.is_empty() { break; } } } .boxed(), )); } drop(rx); for r in receivers { () = r.await.expect("task panicked"); } for s in senders { () = s.await.expect("task panicked"); } let elapsed = now.elapsed(); Timings { total: elapsed, per_item: elapsed / (total_items as u32), } } fn benchmark_throughput_sync(_: C, options: Options) -> Timings { const CAPACITY: usize = 65536; let send_count: usize = 1 * 1024 * 1024 * (if C::HAS_BATCH { options.tx_batch_size } else { 1 }); let total_items = send_count * options.tx_count; let mut senders = Vec::with_capacity(options.tx_count); let mut receivers = Vec::with_capacity(options.rx_count); let now = Instant::now(); let (tx, rx) = C::bounded_sync(CAPACITY); for task_id in 0..options.tx_count { let mut tx = tx.clone(); senders.push(std::thread::spawn(move || { tx.autobatch(options.tx_batch_size, move |tx| { for i in 0..send_count { tx.send((task_id, i)); } }) })); } drop(tx); for _ in 0..options.rx_count { let rx = rx.clone(); receivers.push(std::thread::spawn(move || { let mut batch = Vec::with_capacity(options.rx_batch_size); loop { batch.clear(); rx.recv_vec(options.rx_batch_size, &mut batch); if batch.is_empty() { break; } } })); } drop(rx); for r in receivers { () = r.join().expect("thread panicked"); } for s in senders { () = s.join().expect("thread panicked"); } let elapsed = now.elapsed(); Timings { total: elapsed, per_item: elapsed / (total_items as u32), } } #[derive(Debug, Subcommand)] enum Commands { Throughput { #[arg(long)] bench: bool, }, Alloc { #[arg(long)] bench: bool, }, Async { #[arg(long)] bench: bool, }, Uncontended { #[arg(long)] bench: bool, }, } #[derive(Parser, Debug)] struct Args { #[arg(long)] bench: bool, #[arg(long)] csv: bool, #[command(subcommand)] command: Option, } lazy_static! { static ref ARGS: Args = Args::parse(); } fn main() { match ARGS.command { Some(Commands::Throughput { .. }) => (), None => (), _ => { return; } } let runtime = tokio::runtime::Builder::new_multi_thread() .worker_threads(8) .build() .expect("failed to create tokio runtime"); let batch_sizes = [1, 2, 4, 8, 16, 32, 64, 128, 256]; async fn bench_async(name: &str, options: Options, channel: C) { if !ARGS.csv { print!(" {: <13}: ", name); } let timings = benchmark_throughput_async(channel, options).await; if ARGS.csv { println!( "async,{},{},{},{},{},{},{}", name, options.tx_count, options.rx_count, options.tx_batch_size, options.rx_batch_size, timings.total.as_nanos(), timings.per_item.as_nanos() ); } else { timings.print(); } } fn bench_sync(name: &str, options: Options, channel: C) { if !ARGS.csv { print!(" {: <13}: ", name); } let timings = benchmark_throughput_sync(channel, options); if ARGS.csv { println!( "sync,{},{},{},{},{},{},{}", name, options.tx_count, options.rx_count, options.tx_batch_size, options.rx_batch_size, timings.total.as_nanos(), timings.per_item.as_nanos() ); } else { timings.print(); } } let run_batch_async_with_options = |options| { runtime.block_on(bench_async("batch-channel", options, BatchChannel)); runtime.block_on(bench_async("kanal", options, KanalChannel)); runtime.block_on(bench_async("async-channel", options, AsyncChannel)); }; let run_batch_sync_with_options = |options| { bench_sync("batch-channel", options, BatchChannel); bench_sync("kanal", options, KanalChannel); bench_sync("crossbeam", options, CrossbeamChannel); }; if ARGS.csv { println!("mode,channel,tx,rx,tx_batch_size,rx_batch_size,total_ns,per_item_ns"); } for (tx_count, rx_count) in [(1, 1), (4, 1), (4, 4)] { if !ARGS.csv { println!(); println!("throughput async (tx={} rx={})", tx_count, rx_count); } for batch_size in batch_sizes { if !ARGS.csv { println!(" batch={}", batch_size); } run_batch_async_with_options(Options { tx_batch_size: batch_size, rx_batch_size: batch_size, tx_count, rx_count, }); } if !ARGS.csv { println!(); println!("throughput sync (tx={} rx={})", tx_count, rx_count); } for batch_size in batch_sizes { if !ARGS.csv { println!(" batch={}", batch_size); } run_batch_sync_with_options(Options { tx_batch_size: batch_size, rx_batch_size: batch_size, tx_count, rx_count, }); } } }