#![allow(clippy::many_single_char_names)] use criterion::{criterion_group, criterion_main, Criterion}; use futures::{executor, try_join}; use rumpsteak::{ role::{Role, Roles, ToFrom}, try_session, End, Label, Receive, Result, Send, }; #[derive(Roles)] struct Roles(A, B, C); #[derive(Role)] #[message(Message)] struct A(#[route(B)] ToFrom, #[route(C)] ToFrom); #[derive(Role)] #[message(Message)] struct B(#[route(A)] ToFrom, #[route(C)] ToFrom); #[derive(Role)] #[message(Message)] struct C(#[route(A)] ToFrom, #[route(B)] ToFrom); #[derive(Label)] enum Message { Add(Add), Sum(Sum), } struct Add(i32); struct Sum(i32); #[rustfmt::skip] type AdderA<'a> = Send<'a, A, B, Add, Receive<'a, A, B, Add, Send<'a, A, C, Add, Receive<'a, A, C, Sum, End<'a>>>>>; #[rustfmt::skip] type AdderB<'b> = Receive<'b, B, A, Add, Send<'b, B, A, Add, Send<'b, B, C, Add, Receive<'b, B, C, Sum, End<'b>>>>>; #[rustfmt::skip] type AdderC<'c> = Receive<'c, C, A, Add, Receive<'c, C, B, Add, Send<'c, C, A, Sum, Send<'c, C, B, Sum, End<'c>>>>>; async fn adder_a(s: AdderA<'_>) -> Result<((), End<'_>)> { let x = 2; let s = s.send(Add(x))?; let (Add(y), s) = s.receive().await?; let s = s.send(Add(y))?; let (Sum(z), s) = s.receive().await?; assert_eq!(z, 5); Ok(((), s)) } async fn adder_b(s: AdderB<'_>) -> Result<((), End<'_>)> { let (Add(y), s) = s.receive().await?; let x = 3; let s = s.send(Add(x))?; let s = s.send(Add(y))?; let (Sum(z), s) = s.receive().await?; assert_eq!(z, 5); Ok(((), s)) } async fn adder_c(s: AdderC<'_>) -> Result<((), End<'_>)> { let (Add(x), s) = s.receive().await?; let (Add(y), s) = s.receive().await?; let z = x + y; let s = s.send(Sum(z))?; Ok(((), s.send(Sum(z))?)) } pub fn criterion_benchmark(criterion: &mut Criterion) { { let mut group = criterion.benchmark_group("three_adder"); group.bench_function("with_channels", |bencher| { bencher.iter(|| { let Roles(mut a, mut b, mut c) = Roles::default(); executor::block_on(async { try_join!( try_session(&mut a, adder_a), try_session(&mut b, adder_b), try_session(&mut c, adder_c), ) .unwrap(); }); }); }); group.bench_function("without_channels", |bencher| { let Roles(mut a, mut b, mut c) = Roles::default(); bencher.iter(|| { executor::block_on(async { try_join!( try_session(&mut a, adder_a), try_session(&mut b, adder_b), try_session(&mut c, adder_c), ) .unwrap(); }); }); }); } oneshot::criterion_benchmark(criterion); blocking::criterion_benchmark(criterion); blocking_oneshot::criterion_benchmark(criterion); } mod oneshot { use criterion::Criterion; use futures::executor; use rumpsteak_oneshot::{session3, End, Left, Receive, Right, Send, SessionPair}; type AdderAToB = Send>; type AdderAToC = Send>; type AdderAQueue = Left>>>; type AdderBToA = Receive>; type AdderBToC = Send>; type AdderBQueue = Left>>>; type AdderCToA = Receive>; type AdderCToB = Receive>; type AdderCQueue = Left>>>; async fn adder_a( s: SessionPair, ) -> SessionPair { let x = 2; let s = s.send(x); let (y, s) = s.receive().await; let s = s.send(y); let (z, s) = s.receive().await; assert_eq!(z, 5); s } async fn adder_b( s: SessionPair, ) -> SessionPair { let (y, s) = s.receive().await; let x = 3; let s = s.send(x); let s = s.send(y); let (z, s) = s.receive().await; assert_eq!(z, 5); s } async fn adder_c( s: SessionPair, ) -> SessionPair { let (x, s) = s.receive().await; let (y, s) = s.receive().await; let z = x + y; let s = s.send(z); s.send(z) } pub fn criterion_benchmark(criterion: &mut Criterion) { criterion.bench_function("oneshot_three_adder", |bencher| { bencher.iter(|| { executor::block_on(session3(adder_a, adder_b, adder_c)); }); }); } } mod blocking { use criterion::Criterion; use std::{ error::Error, result, sync::mpsc::{self, Receiver, Sender}, thread, }; type Result = result::Result>; type Channel = (Sender, Receiver); struct Roles(pub A, pub B, pub C); impl Default for Roles { fn default() -> Self { let (a_b_s, a_b_r) = mpsc::channel(); let (a_c_s, a_c_r) = mpsc::channel(); let (b_a_s, b_a_r) = mpsc::channel(); let (b_c_s, b_c_r) = mpsc::channel(); let (c_a_s, c_a_r) = mpsc::channel(); let (c_b_s, c_b_r) = mpsc::channel(); Self( A((a_b_s, b_a_r), (a_c_s, c_a_r)), B((b_a_s, a_b_r), (b_c_s, c_b_r)), C((c_a_s, a_c_r), (c_b_s, b_c_r)), ) } } struct A(Channel, Channel); struct B(Channel, Channel); struct C(Channel, Channel); fn adder_a(A(b, c): &A) -> Result<()> { let x = 2; b.0.send(x)?; let y = b.1.recv()?; c.0.send(y)?; let z = c.1.recv()?; assert_eq!(z, 5); Ok(()) } fn adder_b(B(a, c): &B) -> Result<()> { let y = a.1.recv()?; let x = 3; a.0.send(x)?; c.0.send(y)?; let z = c.1.recv()?; assert_eq!(z, 5); Ok(()) } fn adder_c(C(a, b): &C) -> Result<()> { let x = a.1.recv()?; let y = b.1.recv()?; let z = x + y; a.0.send(z)?; b.0.send(z)?; Ok(()) } pub fn criterion_benchmark(criterion: &mut Criterion) { let mut group = criterion.benchmark_group("blocking_three_adder"); group.bench_function("with_channels", |bencher| { bencher.iter(|| { let Roles(a, b, c) = Roles::default(); let a = thread::spawn(move || adder_a(&a)); let b = thread::spawn(move || adder_b(&b)); let c = thread::spawn(move || adder_c(&c)); a.join().unwrap().unwrap(); b.join().unwrap().unwrap(); c.join().unwrap().unwrap(); }); }); group.bench_function("without_channels", |bencher| { let mut roles = Some(Roles::default()); bencher.iter(|| { let Roles(a, b, c) = roles.take().unwrap(); let a = thread::spawn(move || { adder_a(&a)?; Result::<_>::Ok(a) }); let b = thread::spawn(move || { adder_b(&b)?; Result::<_>::Ok(b) }); let c = thread::spawn(move || { adder_c(&c)?; Result::<_>::Ok(c) }); let a = a.join().unwrap().unwrap(); let b = b.join().unwrap().unwrap(); let c = c.join().unwrap().unwrap(); roles = Some(Roles(a, b, c)); }); }); } } mod blocking_oneshot { use criterion::Criterion; use mpstthree::{ binary::{End, Recv, Send}, fork_mpst, functionmpst::{ close::close_mpst, recv::{ recv_mpst_a_to_b, recv_mpst_a_to_c, recv_mpst_b_to_a, recv_mpst_b_to_c, recv_mpst_c_to_a, recv_mpst_c_to_b, }, send::{ send_mpst_a_to_b, send_mpst_a_to_c, send_mpst_b_to_a, send_mpst_b_to_c, send_mpst_c_to_a, send_mpst_c_to_b, }, }, role::{a::RoleA, b::RoleB, c::RoleC, end::RoleEnd}, sessionmpst::SessionMpst, }; use std::{error::Error, result}; type Result = result::Result>; type AtoB = Send>; type AtoC = Send>; type QueueA = RoleB>>>; type EndpointA = SessionMpst>; type BtoA = Recv>; type BtoC = Send>; type QueueB = RoleA>>>; type EndpointB = SessionMpst>; type CtoA = Recv>; type CtoB = Recv>; type QueueC = RoleA>>>; type EndpointC = SessionMpst>; fn adder_a(s: EndpointA) -> Result<()> { let x = 2; let s = send_mpst_a_to_b(x, s); let (y, s) = recv_mpst_a_to_b(s)?; let s = send_mpst_a_to_c(y, s); let (z, s) = recv_mpst_a_to_c(s)?; assert_eq!(z, 5); close_mpst(s)?; Ok(()) } fn adder_b(s: EndpointB) -> Result<()> { let (y, s) = recv_mpst_b_to_a(s)?; let x = 3; let s = send_mpst_b_to_a(x, s); let s = send_mpst_b_to_c(y, s); let (z, s) = recv_mpst_b_to_c(s)?; assert_eq!(z, 5); close_mpst(s)?; Ok(()) } fn adder_c(s: EndpointC) -> Result<()> { let (x, s) = recv_mpst_c_to_a(s)?; let (y, s) = recv_mpst_c_to_b(s)?; let z = x + y; let s = send_mpst_c_to_a(z, s); let s = send_mpst_c_to_b(z, s); close_mpst(s)?; Ok(()) } pub fn criterion_benchmark(criterion: &mut Criterion) { criterion.bench_function("blocking_oneshot_three_adder", |bencher| { bencher.iter(|| { let (thread_a, thread_b, thread_c) = fork_mpst(adder_a, adder_b, adder_c); assert!(thread_a.is_ok()); assert!(thread_b.is_ok()); assert!(thread_c.is_ok()); }); }); } } criterion_group!(benches, criterion_benchmark); criterion_main!(benches);