use std::net::{IpAddr, Ipv4Addr, SocketAddr, ToSocketAddrs}; use std::sync::Arc; use std::thread::{self, JoinHandle}; use std::time::Duration; use anyhow::{anyhow, bail, Context, Result}; use async_lock::Barrier; use async_net::UdpSocket; use bytes::BytesMut; use criterion::{criterion_group, criterion_main, BatchSize, Criterion, Throughput}; use futures_lite::FutureExt; use lazy_static::lazy_static; use packed_struct::prelude::*; use tempfile; use tracing::{debug, error, info}; use kapiti::codec::{domain_name, encoder::DNSMessageEncoder, message}; use kapiti::config::Config; use kapiti::logging; use kapiti::panic; use kapiti::runner::Runner; use kapiti::specs::enums_generated::{OpCode, ResourceClass, ResourceType, ResponseCode}; use kapiti::specs::message::{IntEnum, Question}; const LOCAL_EPHEMERAL_ENDPOINT: &str = "127.0.0.1:0"; const STUB_QUESTION_NAME: &str = "kapiti.nickbp.com."; const STUB_FILTER_INFO: &str = "benches/server"; const STUB_QUERY_IP: IpAddr = IpAddr::V4(Ipv4Addr::new(1, 2, 4, 8)); const UPSTREAM_LATENCY_FAST: Duration = Duration::from_nanos(1); const UPSTREAM_LATENCY_SLOW: Duration = Duration::from_millis(25); lazy_static! { static ref STUB_REQUEST: BytesMut = write_stub_request().expect("Failed to create stub request"); } /// Writes a DNS response/answer into the provided buffer fn write_stub_response() -> Result { let mut buf = BytesMut::with_capacity(4096); DNSMessageEncoder::new().encode_local_response( ResponseCode::NOERROR, 0, &Question { name: STUB_QUESTION_NAME.to_string(), resource_type: IntEnum::Enum(ResourceType::A), resource_class: IntEnum::Enum(ResourceClass::INTERNET), }, &None, &STUB_FILTER_INFO.to_string(), Some(STUB_QUERY_IP), None, &mut buf, )?; Ok(buf) } /// Writes a DNS request/question into the provided buffer fn write_stub_request() -> Result { let mut buf = BytesMut::with_capacity(4096); message::write_header_bits( message::HeaderBits { id: 12345, is_response: true, op_code: Integer::from(OpCode::QUERY as u8), authoritative: false, truncated: false, recursion_desired: true, recursion_available: true, reserved_9: false, authentic_data: false, checking_disabled: false, response_code: Integer::from(ResponseCode::NOERROR as u8), question_count: 1, answer_count: 0, authority_count: 0, additional_count: 0, }, &mut buf, )?; let mut ptr_offsets = domain_name::LabelOffsets::new(); message::write_question( &Question { name: STUB_QUESTION_NAME.to_string(), resource_type: IntEnum::Enum(ResourceType::A), resource_class: IntEnum::Enum(ResourceClass::INTERNET), }, &mut buf, &mut ptr_offsets, )?; Ok(buf) } #[derive(Debug)] struct ResponseInfo { dest: SocketAddr, request0: u8, request1: u8, } enum UpstreamEvent { GotRequest(ResponseInfo), ResponseDue(ResponseInfo), Stop, } async fn run_udp_upstream( udp_sock: UdpSocket, latency: Duration, stop_upstream_rx: async_channel::Receiver<()>, ) -> Result<()> { let mut response_buffer = write_stub_response().expect("Failed to construct response buffer"); let mut request_buffer: [u8; 4096] = [0; 4096]; let (response_queue, queue_rx) = futures_delay_queue::delay_queue::(); // Wait until the queue is empty before trying to exit let mut queued_count: usize = 0; loop { let fut = ( // Check if the queue has anything scheduled... async { match queue_rx.receive().await { Some(item) => Ok(UpstreamEvent::ResponseDue(item)), None => bail!("channel closed"), } } ) .or( // Check if the socket has something to receive, or... async { let (size, dest) = udp_sock .recv_from(&mut request_buffer) .await .with_context(|| "failed to receive request data from udp_sock")?; // Ensure that the response has a matching request ID (first two bytes) if size < 2 { return Err(anyhow!( "Expected request to have at least 2 bytes, but got {}", size )); } let msg = ResponseInfo { dest, request0: request_buffer[0], request1: request_buffer[1], }; Ok(UpstreamEvent::GotRequest(msg)) }, ) .or( // ... or check if we should be stopping. async { stop_upstream_rx.recv().await.with_context(|| { "failed to receive signal from stop_upstream_rx, did main thread crash?" })?; Ok(UpstreamEvent::Stop) }, ); match fut.await { Ok(UpstreamEvent::GotRequest(response_info)) => { queued_count += 1; debug!( "Upstream got request from {:?}, scheduling reply for {:?} (queued: {})", response_info.dest, latency, queued_count ); response_queue.insert(response_info, latency); } Ok(UpstreamEvent::ResponseDue(response_info)) => { // Send the response we got back to the original requestor. queued_count -= 1; response_buffer[0] = response_info.request0; response_buffer[1] = response_info.request1; udp_sock .send_to(&mut response_buffer, &response_info.dest) .await .expect("Failed to send DNS response"); debug!( "Upstream sent {} bytes to {:?} (queued: {})", response_buffer.len(), response_info.dest, queued_count ); } Ok(UpstreamEvent::Stop) => { return Ok(()); } Err(e) => { error!("Upstream got error while waiting for next event: {}", e); } } } } fn start_udp_upstream( latency: Duration, stop_upstream_rx: async_channel::Receiver<()>, ) -> Result<(SocketAddr, JoinHandle>)> { let listen_addr_ephemeral = LOCAL_EPHEMERAL_ENDPOINT .to_socket_addrs()? .next() .with_context(|| "Invalid listen address")?; let upstream_sock = smol::block_on(UdpSocket::bind(listen_addr_ephemeral)) .with_context(|| format!("Failed to listen on {}", listen_addr_ephemeral))?; let listen_addr_actual = upstream_sock.local_addr()?; debug!("Upstream running at {:?}", listen_addr_actual); Ok(( listen_addr_actual, thread::spawn(move || { smol::block_on(run_udp_upstream(upstream_sock, latency, stop_upstream_rx)) .with_context(|| format!("run_udp_upstream failed")) }), )) } struct RunInputs { // Had previously been using a synchronous UdpSocket but was seeing weird errors around EINTR on recv calls. // So we instead just use async sockets everywhere. client_sock: UdpSocket, response_buffer: BytesMut, } async fn setup_udp_requests(count: u64) -> Result> { let client_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0); let mut inputs_vec = Vec::new(); for _i in 0..count { let client_sock = UdpSocket::bind(client_addr).await?; let mut response_buffer = BytesMut::with_capacity(4096); // Ensure that the buffer has a SIZE suitable for socket.recv_from(). // If we just leave it with the CAPACITY then it drops data. response_buffer.resize(response_buffer.capacity(), 0); inputs_vec.push(RunInputs { client_sock, response_buffer, }); } Ok(inputs_vec) } /// Sets up and runs `count` requests, waiting for a response after each request. /// Reinitializes every time so that `move` will work. async fn run_udp_requests( mut inputs_vec: Vec, kapiti_udp_endpoint: SocketAddr, ) -> Result<()> { // Send requests for each entry in the batch for (idx, inputs) in inputs_vec.iter().enumerate() { let sendsize = inputs .client_sock .send_to(&STUB_REQUEST[..], kapiti_udp_endpoint) .await .with_context(|| { format!( "{} Bench client send_to from {:?} to {} failed", idx, inputs.client_sock.local_addr(), kapiti_udp_endpoint ) })?; debug!( "{} Bench client sent {} bytes from {:?} to {}", idx, sendsize, inputs.client_sock.local_addr(), kapiti_udp_endpoint ); } // Wait for responses to come back for (idx, inputs) in inputs_vec.iter_mut().enumerate() { debug!("{} recv...", idx); let (recvsize, recvfrom) = inputs .client_sock .recv_from(&mut inputs.response_buffer) .await .with_context(|| { format!( "{} Bench client recv_from failed from {} to {:?}", idx, kapiti_udp_endpoint, inputs.client_sock.local_addr() ) })?; debug!( "{} Bench client got {} bytes from {:?} to {:?}", idx, recvsize, recvfrom, inputs.client_sock.local_addr() ); if inputs.response_buffer[0] != STUB_REQUEST[0] || inputs.response_buffer[1] != STUB_REQUEST[1] { bail!( "Response doesn't have expected request ID:\n- request: {:?}\n-response: {:?}", &STUB_REQUEST[..], inputs.response_buffer ); } } Ok(()) } /// Requests coming in over UDP, upstream endpoint over UDP fn run_udp_udp_test( c: &mut Criterion, name: &str, upstream_latency: Duration, samples: usize, ) -> Result<()> { let tmpstorage = tempfile::tempdir()?; // Use channels to notify threads to stop // In the stop_upstream case using a Barrier had false positives, probably because of multiple wait() // calls across the loop. So instead we use a channel since it supports repeated read checks. let (stop_upstream_tx, stop_upstream_rx): ( async_channel::Sender<()>, async_channel::Receiver<()>, ) = async_channel::bounded(1); let stop_kapiti = Arc::new(Barrier::new(2)); // Start upstream harness let (upstream_addr, upstream_join_handle) = start_udp_upstream(upstream_latency, stop_upstream_rx)?; let config = Config::new_for_test( tmpstorage .path() .to_str() .expect("invalid temp storage path"), upstream_addr.to_string(), ); // Start kapiti server let runner = smol::block_on(async { return Runner::new("benchmark".to_string(), config).await; })?; let kapiti_udp_endpoint = runner.get_udp_endpoint()?; let stop_kapiti_copy = stop_kapiti.clone(); let kapiti_join_handle = thread::spawn(move || smol::block_on(runner.run(stop_kapiti_copy))); // Run benchmark: See how quickly we can get responses from kapiti let run_count: u64 = 30; let mut group = c.benchmark_group("server"); group.throughput(Throughput::Elements(run_count)); group.sample_size(samples); group.bench_function(name, |b| { b.iter_batched( move || smol::block_on(setup_udp_requests(run_count)).expect("setup failed"), move |inputs_vec| { smol::block_on(run_udp_requests(inputs_vec, kapiti_udp_endpoint)) .expect("client run failed") }, BatchSize::LargeInput, ) }); group.finish(); info!("Waiting for kapiti thread to exit..."); smol::block_on(stop_kapiti.wait()); kapiti_join_handle .join() .expect("failed to join kapiti thread")?; info!("Waiting for upstream thread to exit..."); smol::block_on(stop_upstream_tx.send(()))?; upstream_join_handle .join() .expect("failed to join upstream harness thread")?; Ok(()) } fn udp_udp_fast(c: &mut Criterion) { // init these once in first bench, then leave alone: logging::init_logging(); panic::init_hook(); run_udp_udp_test(c, "fast", UPSTREAM_LATENCY_FAST, 50).expect("udp_udp test failed"); } fn udp_udp_slow(c: &mut Criterion) { run_udp_udp_test(c, "slow", UPSTREAM_LATENCY_SLOW, 10).expect("udp_udp_slow test failed"); } criterion_group!(benches, udp_udp_fast, udp_udp_slow); criterion_main!(benches);