// Copyright (C) 2023 Quickwit, Inc. // // Quickwit is offered under the AGPL v3.0 and as commercial software. // For commercial licensing, contact us at hello@quickwit.io. // // AGPL: // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU Affero General Public License as // published by the Free Software Foundation, either version 3 of the // License, or (at your option) any later version. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Affero General Public License for more details. // // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . use async_trait::async_trait; use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; use witty_actors::{Actor, ActorContext, ActorExitStatus, Handler, Universe}; #[derive(Default)] struct DoNothingActor(u64); #[async_trait] impl Actor for DoNothingActor { type ObservableState = u64; fn observable_state(&self) -> u64 { self.0 } fn yield_after_each_message(&self) -> bool { YIELD_AFTER_EACH_MESSAGE } } #[derive(Debug)] struct AddMessage(u64); #[async_trait] impl Handler for DoNothingActor { type Reply = (); async fn handle( &mut self, msg: AddMessage, _ctx: &ActorContext, ) -> Result<(), ActorExitStatus> { self.0 += msg.0; Ok(()) } } async fn actor_bench_code(num_messages: usize) { let universe = Universe::default(); let actor: DoNothingActor = DoNothingActor::default(); let (mailbox, handle) = universe.spawn_builder().spawn(actor); for _ in 0..num_messages { mailbox.send_message(AddMessage(1)).await.unwrap(); } drop(mailbox); let (_, total) = handle.join().await; assert_eq!(total, num_messages as u64); } async fn flume_bench_code(num_messages: usize) { let (tx, rx) = flume::unbounded::(); for _ in 0..num_messages { tx.send_async(AddMessage(1)).await.unwrap(); } let join = tokio::task::spawn(async move { let mut sum = 0; while rx.recv_async().await.is_ok() { sum += 1; } sum }); drop(tx); let total = join.await.unwrap(); assert_eq!(total, num_messages as u64); } async fn chan_with_priority_bench_code(num_messages: usize) { let (tx, rx) = witty_actors::channel_with_priority::channel(witty_actors::QueueCapacity::Unbounded); for _ in 0..num_messages { tx.send_low_priority(AddMessage(1)).await.unwrap(); } let join = tokio::task::spawn(async move { let mut sum = 0; while rx.recv().await.is_ok() { sum += 1; } sum }); drop(tx); let total = join.await.unwrap(); assert_eq!(total, num_messages as u64); } fn message_throughput(c: &mut Criterion) { let num_messages = [10_000]; // [1, 1_000, 10_000] for num_messages in num_messages { c.bench_with_input( BenchmarkId::new( "unlimited_capacity_actors_yield_after_each_message", num_messages, ), &num_messages, |b, &num_messages| { // Insert a call to `to_async` to convert the bencher to async mode. // The timing loops are the same as with the normal bencher. let runtime = tokio::runtime::Builder::new_multi_thread() .enable_all() .build() .unwrap(); b.to_async(runtime) .iter(|| actor_bench_code::(num_messages)); }, ); c.bench_with_input( BenchmarkId::new( "unlimited_capacity_actors_no_yield_after_each_message", num_messages, ), &num_messages, |b, &num_messages| { // Insert a call to `to_async` to convert the bencher to async mode. // The timing loops are the same as with the normal bencher. let runtime = tokio::runtime::Builder::new_multi_thread() .enable_all() .build() .unwrap(); b.to_async(runtime) .iter(|| actor_bench_code::(num_messages)); }, ); c.bench_with_input( BenchmarkId::new("unlimited_capacity_flume", num_messages), &num_messages, |b, &num_messages| { // Insert a call to `to_async` to convert the bencher to async mode. // The timing loops are the same as with the normal bencher. let runtime = tokio::runtime::Builder::new_multi_thread() .enable_all() .build() .unwrap(); b.to_async(runtime).iter(|| flume_bench_code(num_messages)); }, ); c.bench_with_input( BenchmarkId::new("unlimited_capacity_chan_with_priority", num_messages), &num_messages, |b, &num_messages| { // Insert a call to `to_async` to convert the bencher to async mode. // The timing loops are the same as with the normal bencher. let runtime = tokio::runtime::Builder::new_multi_thread() .enable_all() .build() .unwrap(); b.to_async(runtime) .iter(|| chan_with_priority_bench_code(num_messages)); }, ); } } criterion_group!(benches, message_throughput); criterion_main!(benches);