mod store_rpc { use std::fmt::Debug; use quic_rpc::rpc_service; use serde::{Deserialize, Serialize}; pub type Cid = [u8; 32]; #[derive(Debug, Serialize, Deserialize)] pub struct Put(pub Vec); #[derive(Debug, Serialize, Deserialize)] pub struct PutResponse(pub Cid); #[derive(Debug, Serialize, Deserialize)] pub struct Get(pub Cid); #[derive(Debug, Serialize, Deserialize)] pub struct GetResponse(pub Vec); #[derive(Debug, Serialize, Deserialize)] pub struct PutFile; #[derive(Debug, Serialize, Deserialize)] pub struct PutFileUpdate(pub Vec); #[derive(Debug, Serialize, Deserialize)] pub struct PutFileResponse(pub Cid); #[derive(Debug, Serialize, Deserialize)] pub struct GetFile(pub Cid); #[derive(Debug, Serialize, Deserialize)] pub struct GetFileResponse(pub Vec); #[derive(Debug, Serialize, Deserialize)] pub struct ConvertFile; #[derive(Debug, Serialize, Deserialize)] pub struct ConvertFileUpdate(pub Vec); #[derive(Debug, Serialize, Deserialize)] pub struct ConvertFileResponse(pub Vec); rpc_service! { Request = StoreRequest; Response = StoreResponse; Service = StoreService; CreateDispatch = create_store_dispatch; Rpc put = Put, _ -> PutResponse; Rpc get = Get, _ -> GetResponse; ClientStreaming put_file = PutFile, PutFileUpdate -> PutFileResponse; ServerStreaming get_file = GetFile, _ -> GetFileResponse; BidiStreaming convert_file = ConvertFile, ConvertFileUpdate -> ConvertFileResponse; } } use async_stream::stream; use futures_lite::{Stream, StreamExt}; use futures_util::SinkExt; use quic_rpc::{client::RpcClient, server::run_server_loop, transport::flume}; use store_rpc::*; #[derive(Clone)] pub struct Store; impl Store { async fn put(self, _put: Put) -> PutResponse { PutResponse([0; 32]) } async fn get(self, _get: Get) -> GetResponse { GetResponse(vec![]) } async fn put_file( self, _put: PutFile, updates: impl Stream, ) -> PutFileResponse { tokio::pin!(updates); while let Some(_update) = updates.next().await {} PutFileResponse([0; 32]) } fn get_file(self, _get: GetFile) -> impl Stream + Send + 'static { stream! { for i in 0..3 { yield GetFileResponse(vec![i]); } } } fn convert_file( self, _convert: ConvertFile, updates: impl Stream + Send + 'static, ) -> impl Stream + Send + 'static { stream! { tokio::pin!(updates); while let Some(msg) = updates.next().await { yield ConvertFileResponse(msg.0); } } } } create_store_dispatch!(Store, dispatch_store_request); // create_store_client!(StoreClient); #[tokio::main] async fn main() -> anyhow::Result<()> { let (server, client) = flume::channel(1); let server_handle = tokio::task::spawn(async move { let target = Store; run_server_loop(StoreService, server, target, dispatch_store_request).await }); let client = RpcClient::::new(client); // a rpc call for i in 0..3 { println!("a rpc call [{i}]"); let client = client.clone(); tokio::task::spawn(async move { let res = client.rpc(Get([0u8; 32])).await; println!("rpc res [{i}]: {res:?}"); }); } // server streaming call println!("a server streaming call"); let mut s = client.server_streaming(GetFile([0u8; 32])).await?; while let Some(res) = s.next().await { println!("streaming res: {res:?}"); } // client streaming call println!("a client streaming call"); let (mut send, recv) = client.client_streaming(PutFile).await?; tokio::task::spawn(async move { for i in 0..3 { send.send(PutFileUpdate(vec![i])).await.unwrap(); } }); let res = recv.await?; println!("client stremaing res: {res:?}"); // bidi streaming call println!("a bidi streaming call"); let (mut send, mut recv) = client.bidi(ConvertFile).await?; tokio::task::spawn(async move { for i in 0..3 { send.send(ConvertFileUpdate(vec![i])).await.unwrap(); } }); while let Some(res) = recv.next().await { println!("bidi res: {res:?}"); } // dropping the client will cause the server to terminate drop(client); server_handle.await??; Ok(()) }