use { async_executors :: { * } , criterion :: { Criterion, criterion_group, criterion_main, BatchSize } , futures :: { executor::{ block_on } } , thespis :: { * } , thespis_impl :: { * } , actix :: { Actor as _, ActorFutureExt } , }; const BOUNDED: usize = 11; fn termial( n: u64 ) -> u64 { n * ( n + 1 ) / 2 } #[ derive( Actor ) ] struct Sum { pub total: u64, pub inner: Addr, } #[ derive( Actor ) ] struct SumIn { pub count: u64, } struct Add ( u64 ); struct Show ; impl Message for Add { type Return = () ; } impl Message for Show { type Return = u64; } impl Handler< Add > for Sum { fn handle( &mut self, msg: Add ) -> Return<()> { Box::pin( async move { let inner = Show ).await.expect( "call inner" ); += msg.0 + inner; })} } impl Handler< Show > for Sum { fn handle( &mut self, _msg: Show ) -> Return { Box::pin( async move { })} } impl Handler< Show > for SumIn { fn handle( &mut self, _msg: Show ) -> Return { Box::pin( async move { self.count += 1; self.count })} } struct ActixSum { pub total: u64, pub inner: actix::Addr, } impl actix::Actor for ActixSum { type Context = actix::Context; } impl actix::Actor for SumIn { type Context = actix::Context; } impl actix::Message for Add { type Result = () ; } impl actix::Message for Show { type Result = u64; } impl actix::Handler< Add > for ActixSum { type Result = actix::ResponseActFuture< Self, ::Result >; fn handle( &mut self, msg: Add, _ctx: &mut Self::Context ) -> Self::Result { let action = self.inner.send( Show ); let act_fut = actix::fut::wrap_future::<_, Self>(action); let update_self = move |result, actor, _ctx| { += msg.0 + result.expect( "Call SumIn" ); }); Box::pin( update_self ) } } impl actix::Handler< Show > for ActixSum { type Result = u64; fn handle( &mut self, _msg: Show, _ctx: &mut actix::Context ) -> Self::Result { } } impl actix::Handler< Show > for SumIn { type Result = u64; fn handle( &mut self, _msg: Show, _ctx: &mut actix::Context ) -> Self::Result { self.count += 1; self.count } } struct Accu( u64 ); impl Accu { #[ inline( never ) ] // async fn add( &mut self, v: Add ) { self.0 += v.0; } async fn show( &self ) -> u64 { self.0 } } fn seq( c: &mut Criterion ) { // let _ = flexi_logger::Logger::with_str( "trace" ).start(); let mut group = c.benchmark_group( "seq" ); for msgs in [ 1, 100, 10000 ].iter() { // match buffer_size // { // 10 => { group.sample_size( 100 ); } // 100 => { group.sample_size( 50 ); } // 200 => { group.sample_size( 30 ); } // _ => { unreachable!(); } // } group.sample_size( 30 ); group.bench_function ( format!( "send: {} msgs", &msgs ), |b| b.iter_batched ( move || // setup { let (sum_in_addr, sum_in_mb) = Addr::builder( "sum_in" ).bounded( Some(BOUNDED) ).build() ; let (sum_addr, sum_mb) = Addr::builder( "sum" ).bounded( Some(BOUNDED) ).build() ; let sum = Sum{ total: 5, inner: sum_in_addr } ; let exec = TokioCtBuilder::new().build().expect( "build runtime" ); let sumin = SumIn{ count: 0 }; let sumin_handle = exec.spawn_handle( sum_in_mb.start( sumin ) ).expect( "spawn" ); let sum_handle = exec.spawn_handle( sum_mb .start( sum ) ).expect( "spawn" ); (sum_addr, sumin_handle, sum_handle, exec) }, |(mut sum_addr, sumin_handle, sum_handle, exec)| // measure { exec.block_on( async move { for _ in 0..*msgs { sum_addr.send( Add(10) ).await.expect( "Send failed" ); } let res = Show{} ).await.expect( "Call failed" ); assert_eq!( *msgs *10 + 5 + termial( *msgs ), res ); drop( sum_addr ); drop( sum_handle .await ); drop( sumin_handle.await ); }); }, BatchSize::SmallInput ) ); // Currently doesn't work, as it won't wait until the message is processed, so the // assert comes to early. We would have to synchronize somehow. // // group.bench_function // ( // format!( "actix do_send: {} msgs", &msgs ), // |b| b.iter // ( // || // { // actix_rt::System::new( "main" ).block_on( async move // { // let sum_in = SumIn{ count: 0 }; // let sum_in_addr = SumIn::start( sum_in ); // let sum = ActixSum{ total: 5, inner: sum_in_addr }; // let sum_addr = ActixSum::start( sum ); // for _ in 0..*msgs // { // sum_addr.do_send( Add(10) ); // } // let res = sum_addr.send( Show{} ).await.expect( "Call failed" ); // assert_eq!( *msgs *10 + 5 + termial( *msgs ), res ); // actix_rt::System::current().stop(); // }); // } // ) // ); group.bench_function ( format!( "call: {} msgs", &msgs ), |b| b.iter_batched ( move || // setup { let (sum_in_addr, sum_in_mb) = Addr::builder( "sum_in" ).bounded( Some(BOUNDED) ).build() ; let (sum_addr , sum_mb ) = Addr::builder( "sum" ).bounded( Some(BOUNDED) ).build() ; let sum = Sum{ total: 5, inner: sum_in_addr } ; let exec = TokioCtBuilder::new().build().expect( "build runtime" ); let sumin = SumIn{ count: 0 }; let sumin_handle = exec.spawn_handle( sum_in_mb.start( sumin ) ).expect( "spawn" ); let sum_handle = exec.spawn_handle( sum_mb .start( sum ) ).expect( "spawn" ); (sum_addr, sumin_handle, sum_handle, exec) }, |(mut sum_addr, sumin_handle, sum_handle, exec)| // measure { exec.block_on( async move { for _ in 0..*msgs { Add(10) ).await.expect( "Send failed" ); } let res = Show{} ).await.expect( "Call failed" ); assert_eq!( *msgs *10 + 5 + termial( *msgs ), res ); drop( sum_addr ); drop( sum_handle .await ); drop( sumin_handle.await ); }); }, BatchSize::SmallInput ) ); group.bench_function ( format!( "actix send: {} msgs", &msgs ), |b| b.iter ( || { actix_rt::System::new().block_on( async move { let sum_in = SumIn{ count: 0 }; let sum_in_addr = SumIn::start( sum_in ); let sum = ActixSum{ total: 5, inner: sum_in_addr }; let sum_addr = ActixSum::start( sum ); for _ in 0..*msgs { sum_addr.send( Add(10) ).await.expect( "Send failed" ); } let res = sum_addr.send( Show{} ).await.expect( "Call failed" ); assert_eq!( *msgs *10 + 5 + termial( *msgs ), res ); actix_rt::System::current().stop(); }); } ) ); group.bench_function ( format!( "async method: {} msgs", &msgs ), |b| b.iter ( || { let res = block_on( async { let mut sum = Accu( 5 ); for _i in 0..*msgs { sum.add( Add( 10 ) ).await; } }); assert_eq!( msgs*10 + 5, res ); } ) ); } } criterion_group!( benches, seq ); criterion_main! ( benches );