use std::mem; use criterion::{black_box, criterion_group, criterion_main, Criterion, Throughput}; use serde::Deserialize; use tokio::{runtime::Runtime, time::Instant}; use clickhouse::{error::Result, Client, Compression, Row}; mod server { use std::{convert::Infallible, net::SocketAddr, thread}; use bytes::Bytes; use futures::stream; use hyper::service::{make_service_fn, service_fn}; use hyper::{body, Body, Request, Response, Server}; use tokio::runtime; async fn handle(req: Request) -> Result, Infallible> { let _ = body::aggregate(req.into_body()).await; let chunk = Bytes::from_static(&[15; 128 * 1024]); let stream = stream::repeat(Ok::(chunk)); let body = Body::wrap_stream(stream); Ok(Response::new(body)) } pub fn start(addr: SocketAddr) { thread::spawn(move || { runtime::Builder::new_current_thread() .enable_all() .build() .unwrap() .block_on(async { let make_svc = make_service_fn(|_| async { Ok::<_, Infallible>(service_fn(handle)) }); Server::bind(&addr).serve(make_svc).await.unwrap(); }); }); } } fn select(c: &mut Criterion) { let addr = "127.0.0.1:6543".parse().unwrap(); server::start(addr); #[allow(dead_code)] #[derive(Debug, Row, Deserialize)] struct SomeRow { a: u64, b: i64, c: i32, d: u32, } async fn run(client: Client, iters: u64) -> Result<()> { let mut cursor = client .query("SELECT ?fields FROM some") .fetch::()?; for _ in 0..iters { black_box(cursor.next().await?); } Ok(()) } let mut group = c.benchmark_group("select"); group.throughput(Throughput::Bytes(mem::size_of::() as u64)); group.bench_function("select", |b| { b.iter_custom(|iters| { let rt = Runtime::new().unwrap(); let client = Client::default() .with_url(format!("http://{addr}")) .with_compression(Compression::None); let start = Instant::now(); rt.block_on(run(client, iters)).unwrap(); start.elapsed() }) }); group.finish(); } criterion_group!(benches, select); criterion_main!(benches);