use criterion::{ async_executor::AsyncExecutor, criterion_group, criterion_main, measurement::WallTime, Bencher, BenchmarkGroup, Criterion, }; use dialectic::prelude::*; use dialectic_null as null; use dialectic_tokio_mpsc as mpsc; use futures::Future; use std::{convert::TryInto, fmt::Debug, marker, sync::Arc, time::Instant}; use tokio::runtime::Runtime; #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] enum Primitive { Send, Recv, Choose, Offer, Call, Split, } #[Transmitter(Tx for ())] async fn send( chan: Chan, ) -> Chan where Rx: Send, Tx::Error: Debug, { chan.send(()).await.unwrap() } #[Receiver(Rx for ())] async fn recv( chan: Chan, ) -> Chan where Tx: Send, Rx::Error: Debug, { chan.recv().await.unwrap().1 } #[Transmitter(Tx)] async fn choose( chan: Chan {} } } }, Tx, Rx>, ) -> Chan {} } } }, Tx, Rx> where Rx: Send, Tx::Error: Debug, { chan.choose::<0>().await.unwrap() } #[Receiver(Rx)] async fn offer( chan: Chan {} } } }, Tx, Rx>, ) -> Chan {} } } }, Tx, Rx> where Tx: Send, Rx::Error: Debug, { offer!(in chan { 0 => chan, }) .unwrap() } async fn call( chan: Chan, ) -> Chan where Tx: Send, Rx: Send, { chan.call(|_| async { Ok::<_, ()>(()) }) .await .unwrap() .1 .unwrap() } async fn split( chan: Chan {}, <- {} } } }, Tx, Rx>, ) -> Chan {} <- {} } } }, Tx, Rx> where Tx: Send, Rx: Send, { chan.split(|_, _| async { Ok::<_, ()>(()) }) .await .unwrap() .1 .unwrap() } fn bench_chan_loop( b: &mut Bencher, rt: Arc, channel: N, primitive: Primitive, f: F, ) where F: Fn(Chan) -> Fut, Fut: Future>, N: Fn(Primitive, u64) -> (Tx, Rx, H), S: Session, Tx: marker::Send + 'static, Rx: marker::Send + 'static, A: AsyncExecutor, { b.iter_custom(|iters| { let (tx, rx, drop_after_bench) = channel(primitive, iters); let mut chan = S::wrap(tx, rx); let elapsed = rt.block_on(async { let start = Instant::now(); for _ in 0..iters { chan = f(chan).await; } start.elapsed() }); drop(drop_after_bench); elapsed }); } fn bench_chan_loop_group( g: &mut BenchmarkGroup, rt: Arc, channel: fn(Primitive, u64) -> (Tx, Rx, H), name: &str, primitive: Primitive, f: fn(Chan) -> Fut, ) where Fut: Future>, S: Session, Tx: marker::Send + 'static, Rx: marker::Send + 'static, A: AsyncExecutor, { g.bench_function(name, move |b| { bench_chan_loop(b, rt.clone(), channel, primitive, f) }); } #[Transmitter(Tx for ())] #[Receiver(Rx for ())] fn bench_all_on( c: &mut Criterion, rt_name: &str, rt: Arc, backend_name: &str, channel: fn(Primitive, u64) -> (Tx, Rx, H), ) where Tx::Error: Debug, Rx::Error: Debug, A: AsyncExecutor, { use Primitive::*; let group_name = format!("{}/{}", rt_name, backend_name); let mut g = c.benchmark_group(&group_name); bench_chan_loop_group(&mut g, rt.clone(), channel, "send", Send, send); bench_chan_loop_group(&mut g, rt.clone(), channel, "recv", Recv, recv); bench_chan_loop_group(&mut g, rt.clone(), channel, "choose", Choose, choose); bench_chan_loop_group(&mut g, rt.clone(), channel, "offer", Offer, offer); bench_chan_loop_group(&mut g, rt.clone(), channel, "call", Call, call); bench_chan_loop_group(&mut g, rt, channel, "split", Split, split); g.finish(); } fn bench_tokio_null(c: &mut Criterion) { bench_all_on( c, "tokio", Arc::new(Runtime::new().unwrap()), "null", |_primitive, _iters| (null::Sender::default(), null::Receiver::default(), ()), ) } fn bench_tokio_mpsc(c: &mut Criterion) { use Primitive::*; bench_all_on( c, "tokio", Arc::new(Runtime::new().unwrap()), "mpsc", |primitive, iters| { let (tx0, rx0) = mpsc::unbounded_channel(); let (tx1, rx1) = mpsc::unbounded_channel(); // Pre-allocate responses for those operations which need responses match primitive { Send | Choose | Call | Split => {} Recv => { for _ in 0..iters { tx1.0.send(Box::new(())).unwrap(); } } Offer => { for _ in 0..iters { let zero_choice: Choice<1> = 0u8.try_into().unwrap(); tx1.0.send(Box::new(zero_choice)).unwrap(); } } }; (tx0, rx1, (tx1, rx0)) }, ) } criterion_group!(benches, bench_tokio_null, bench_tokio_mpsc); criterion_main!(benches);