use criterion::{criterion_group, criterion_main, Criterion};
use tokio_stream::StreamExt;
use anyhow::Result;
use message_worker::{blocking, non_blocking, empty_ctx};
use std::rc::Rc;
use std::sync::Arc;
pub fn listener_creation(c: &mut Criterion) {
c.bench_function("create listener", |b| {
let executor = tokio::runtime::Runtime::new().unwrap();
async fn handle_message(_ctx: Arc<()>, _event: ()) -> Result> { Ok(None) }
b.to_async(executor).iter(move || async {
let source = tokio_stream::iter(vec![()]);
non_blocking::listen(source, empty_ctx, handle_message);
})
});
}
pub fn listener_process_one(c: &mut Criterion) {
c.bench_function("listener process 1 message", |b| {
let executor = tokio::runtime::Runtime::new().unwrap();
async fn handle_message(_ctx: Arc<()>, _event: ()) -> Result > { Ok(None) }
b.to_async(executor).iter(move || async {
let source = tokio_stream::iter(vec![()]);
non_blocking::listen(source, empty_ctx, handle_message).await.unwrap();
})
});
}
pub fn listener_process_many(c: &mut Criterion) {
c.bench_function("listener process 1000 messages", |b| {
let executor = tokio::runtime::Runtime::new().unwrap();
async fn handle_message(_ctx: Arc<()>, _event: ()) -> Result > { Ok(None) }
b.to_async(executor).iter(move || async {
let source = futures::stream::repeat(()).take(1000);
non_blocking::listen(source, empty_ctx, handle_message).await.unwrap();
})
});
}
pub fn blocking_listener_creation(c: &mut Criterion) {
c.bench_function("(blocking) create listener", |b| {
let executor = tokio::runtime::Runtime::new().unwrap();
async fn handle_message(_ctx: Rc<()>, _event: ()) -> Result > { Ok(None) }
b.to_async(executor).iter(move || async {
let source = tokio_stream::iter(vec![()]);
blocking::listen(source, empty_ctx, handle_message);
})
});
}
pub fn blocking_listener_process_one(c: &mut Criterion) {
c.bench_function("(blocking) listener process 1 message", |b| {
let executor = tokio::runtime::Runtime::new().unwrap();
async fn handle_message(_ctx: Rc<()>, _event: ()) -> Result > { Ok(None) }
b.to_async(executor).iter(move || async {
let source = tokio_stream::iter(vec![()]);
blocking::listen(source, empty_ctx, handle_message).await.unwrap();
})
});
}
pub fn blocking_listener_process_many(c: &mut Criterion) {
c.bench_function("(blocking) listener process 1000 messages", |b| {
let executor = tokio::runtime::Runtime::new().unwrap();
async fn handle_message(_ctx: Rc<()>, _event: ()) -> Result > { Ok(None) }
b.to_async(executor).iter(move || async {
let source = futures::stream::repeat(()).take(1000);
blocking::listen(source, empty_ctx, handle_message).await.unwrap();
})
});
}
criterion_group!(
benches,
listener_creation,
listener_process_one,
listener_process_many,
blocking_listener_creation,
blocking_listener_process_one,
blocking_listener_process_many
);
criterion_main!(benches);