use anyhow::anyhow; use async_std::task::yield_now; use futures::executor::LocalPool; use futures::task::SpawnExt; use futures::StreamExt; use std::fmt; use std::future::Future; use std::sync::mpsc::TryRecvError; trait UnboundedChannel: 'static { type Sender: Send; type Receiver: Send; fn new() -> (Self::Sender, Self::Receiver); fn send(tx: &Self::Sender, value: T) -> anyhow::Result<()>; fn recv(rx: &mut Self::Receiver) -> impl Future> + Send; fn send_vec( tx: &Self::Sender, mut values: Vec, ) -> anyhow::Result> { for v in values.drain(..) { Self::send(tx, v)?; } Ok(values) } fn recv_batch( rx: &mut Self::Receiver, element_limit: usize, ) -> impl Future> + Send { async move { let mut v = Vec::with_capacity(element_limit); loop { match Self::recv(rx).await { Some(value) => { v.push(value); if v.len() == element_limit { return v; } } None => { return v; } } } } } } struct BatchChannel; impl UnboundedChannel for BatchChannel { type Sender = batch_channel::SyncSender; type Receiver = batch_channel::Receiver; fn new() -> (Self::Sender, Self::Receiver) { let (tx, rx) = batch_channel::unbounded(); (tx.into_sync(), rx) } fn send(tx: &Self::Sender, value: T) -> anyhow::Result<()> { Ok(tx.send(value).map_err(|_| anyhow!("failed to send"))?) } async fn recv(rx: &mut Self::Receiver) -> Option { rx.recv().await } fn send_vec( tx: &Self::Sender, values: Vec, ) -> anyhow::Result> { Ok(tx.send_vec(values).map_err(|_| anyhow!("failed to send"))?) } async fn recv_batch(rx: &mut Self::Receiver, element_limit: usize) -> Vec { rx.recv_batch(element_limit).await } } struct StdChannel; impl UnboundedChannel for StdChannel { type Sender = std::sync::mpsc::Sender; type Receiver = std::sync::mpsc::Receiver; fn new() -> (Self::Sender, Self::Receiver) { std::sync::mpsc::channel() } fn send(tx: &Self::Sender, value: T) -> anyhow::Result<()> { Ok(tx.send(value).map_err(|_| anyhow!("failed to send"))?) } async fn recv(rx: &mut Self::Receiver) -> Option { loop { let r = rx.try_recv(); match r { Ok(value) => return Some(value), Err(TryRecvError::Empty) => { yield_now().await; continue; } Err(TryRecvError::Disconnected) => return None, } } } } struct CrossbeamChannel; impl UnboundedChannel for CrossbeamChannel { type Sender = crossbeam::channel::Sender; type Receiver = crossbeam::channel::Receiver; fn new() -> (Self::Sender, Self::Receiver) { crossbeam::channel::unbounded() } fn send(tx: &Self::Sender, value: T) -> anyhow::Result<()> { Ok(tx.send(value).map_err(|_| anyhow!("failed to send"))?) } async fn recv(rx: &mut Self::Receiver) -> Option { loop { let r = rx.try_recv(); match r { Ok(value) => return Some(value), Err(crossbeam::channel::TryRecvError::Empty) => { yield_now().await; continue; } Err(crossbeam::channel::TryRecvError::Disconnected) => return None, } } } } struct FuturesChannel; impl UnboundedChannel for FuturesChannel { type Sender = futures::channel::mpsc::UnboundedSender; type Receiver = futures::channel::mpsc::UnboundedReceiver; fn new() -> (Self::Sender, Self::Receiver) { futures::channel::mpsc::unbounded() } fn send(tx: &Self::Sender, value: T) -> anyhow::Result<()> { Ok(tx .unbounded_send(value) .map_err(|_| anyhow!("failed to send"))?) } async fn recv(rx: &mut Self::Receiver) -> Option { rx.next().await } } struct KanalChannel; impl UnboundedChannel for KanalChannel { type Sender = kanal::Sender; type Receiver = kanal::AsyncReceiver; fn new() -> (Self::Sender, Self::Receiver) { let (tx, rx) = kanal::unbounded(); let rx = rx.to_async(); (tx, rx) } fn send(tx: &Self::Sender, value: T) -> anyhow::Result<()> { Ok(tx.send(value)?) } async fn recv(rx: &mut Self::Receiver) -> Option { rx.recv().await.ok() } } async fn sender( tx: UC::Sender, iteration_count: usize, batch_size: usize, ) { if batch_size == 1 { for i in 0..iteration_count { UC::send(&tx, i).unwrap(); // The intent of this benchmark is to interleave send and recv. yield_now().await; } } else { let mut vec = Vec::with_capacity(batch_size); for i in 0..iteration_count { if vec.len() < batch_size { vec.push(i); } else { vec = UC::send_vec(&tx, vec).unwrap(); // The intent of this benchmark is to interleave send and recv. yield_now().await; } } if !vec.is_empty() { _ = UC::send_vec(&tx, vec).unwrap(); } } } async fn receiver(mut rx: UC::Receiver, batch_size: usize) { if batch_size == 1 { while let Some(_) = UC::recv(&mut rx).await {} } else { loop { let v = UC::recv_batch(&mut rx, batch_size).await; if v.is_empty() { break; } } } } #[divan::bench( types = [BatchChannel, StdChannel, CrossbeamChannel, FuturesChannel, KanalChannel], consts = [1, 10, 100], )] fn batch_size_tx_first(bencher: divan::Bencher) where UC: UnboundedChannel + Send, { let iteration_count = 100; bencher .counter(divan::counter::ItemsCount::new(N * iteration_count)) .with_inputs(|| { let pool = LocalPool::new(); let spawner = pool.spawner(); let (tx, rx) = UC::new(); () = spawner.spawn(sender::(tx, iteration_count, N)).unwrap(); () = spawner.spawn(receiver::(rx, N)).unwrap(); pool }) .bench_local_values(|mut pool| { pool.run_until_stalled(); }) } #[divan::bench( types = [BatchChannel, StdChannel, CrossbeamChannel, FuturesChannel, KanalChannel], consts = [1, 10, 100], )] fn batch_size_rx_first(bencher: divan::Bencher) where UC: UnboundedChannel + Send, { let iteration_count = 100; bencher .counter(divan::counter::ItemsCount::new(N * iteration_count)) .with_inputs(|| { let pool = LocalPool::new(); let spawner = pool.spawner(); let (tx, rx) = UC::new(); () = spawner.spawn(receiver::(rx, N)).unwrap(); () = spawner.spawn(sender::(tx, iteration_count, N)).unwrap(); pool }) .bench_local_values(|mut pool| { pool.run_until_stalled(); }) } fn main() { divan::main() }