#![cfg(feature = "flume-transport")] #![allow(non_local_definitions)] mod math; use math::*; use quic_rpc::{ server::{RpcChannel, RpcServerError}, transport::flume, RpcClient, RpcServer, Service, }; use tokio_util::task::AbortOnDropHandle; #[tokio::test] async fn flume_channel_bench() -> anyhow::Result<()> { tracing_subscriber::fmt::try_init().ok(); let (server, client) = flume::channel(1); let server = RpcServer::::new(server); let _server_handle = AbortOnDropHandle::new(tokio::spawn(ComputeService::server(server))); let client = RpcClient::::new(client); bench(client, 1000000).await?; Ok(()) } #[tokio::test] async fn flume_channel_mapped_bench() -> anyhow::Result<()> { use derive_more::{From, TryInto}; use serde::{Deserialize, Serialize}; tracing_subscriber::fmt::try_init().ok(); #[derive(Debug, Serialize, Deserialize, From, TryInto)] enum OuterRequest { Inner(InnerRequest), } #[derive(Debug, Serialize, Deserialize, From, TryInto)] enum InnerRequest { Compute(ComputeRequest), } #[derive(Debug, Serialize, Deserialize, From, TryInto)] enum OuterResponse { Inner(InnerResponse), } #[derive(Debug, Serialize, Deserialize, From, TryInto)] enum InnerResponse { Compute(ComputeResponse), } #[derive(Debug, Clone)] struct OuterService; impl Service for OuterService { type Req = OuterRequest; type Res = OuterResponse; } #[derive(Debug, Clone)] struct InnerService; impl Service for InnerService { type Req = InnerRequest; type Res = InnerResponse; } let (server, client) = flume::channel(1); let server = RpcServer::::new(server); let server_handle: tokio::task::JoinHandle>> = tokio::task::spawn(async move { let service = ComputeService; loop { let (req, chan) = server.accept().await?.read_first().await?; let service = service.clone(); tokio::spawn(async move { let req: OuterRequest = req; match req { OuterRequest::Inner(InnerRequest::Compute(req)) => { let chan: RpcChannel = chan.map(); let chan: RpcChannel = chan.map(); ComputeService::handle_rpc_request(service, req, chan).await } } }); } }); let client = RpcClient::::new(client); let client: RpcClient = client.map(); let client: RpcClient = client.map(); bench(client, 1000000).await?; // dropping the client will cause the server to terminate match server_handle.await? { Err(RpcServerError::Accept(_)) => {} e => panic!("unexpected termination result {e:?}"), } Ok(()) } /// simple happy path test for all 4 patterns #[tokio::test] async fn flume_channel_smoke() -> anyhow::Result<()> { tracing_subscriber::fmt::try_init().ok(); let (server, client) = flume::channel(1); let server = RpcServer::::new(server); let _server_handle = AbortOnDropHandle::new(tokio::spawn(ComputeService::server(server))); smoke_test(client).await?; Ok(()) }