use anyhow::Context; use ar_pe_ce::{Result, Stream}; use futures::{FutureExt, TryStreamExt}; use serde::{Deserialize, Serialize}; // Lets image that we have a blob data send in a stream from server to client and this data might be large, // splitted into the packages. // For example 10GB data splitted into 100MB packages. #[derive(Debug, Deserialize, Serialize)] struct Package { payload: Vec, } // We define our RPC #[ar_pe_ce::rpc] trait Performance { #[rpc(server_streaming)] // Which has one method, that returns a stream of packages async fn get_data(&self, arg: ()) -> Result>; } struct PerformanceServer; #[ar_pe_ce::async_trait] impl Performance for PerformanceServer { // Our implementation of the server is plain simple. We send in an infinite loop 100MB packages without any sleep, // delay or so. The common sense tells us that our client should be flooded by data. Constant push. #[tracing::instrument(skip(self))] async fn get_data(&self, _: ()) -> Result> { let stream = async_stream::stream! { loop { // We trace package generation to see exactly when data is requested to be generated by Hyper tracing::info!("Sending msg"); // Because we have to remember that Rust futures are Pull-based. yield Ok(Package { payload: vec![0; 100_000_000] }); // 100 MB } }; Ok(Box::pin(stream)) } } // Next lets define client thread/task. async fn client_task() -> anyhow::Result<()> { let client = PerformanceClient::new("http://localhost:3000".parse()?); let mut data_stream = client .get_data(()) .await .context("Could not get data stream")?; // We make our client much slower than the server on purpose by using tokio::time::sleep. // In that way I'd like to simulate when client has to process the data, // and therefore it cannot catch up with the flood of messages. loop { tracing::info!("Waiting for message"); //Take only one message match data_stream .try_next() .await .context("Could not retrieve message from data stream")? { Some(s) => { tracing::info!(len = s.payload.len(), "Got message"); } None => { tracing::warn!("Got no message. Stream is closed"); break; } } // Then we sleep tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; } Ok(()) } #[tokio::main] async fn main() -> anyhow::Result<()> { tracing_subscriber::fmt().pretty().compact().init(); let mut client = tokio::spawn(async { client_task().await }).fuse(); use std::net::SocketAddr; let addr = SocketAddr::from(([0, 0, 0, 0], 3000)); let server = PerformanceServer.serve(addr).fuse(); futures::pin_mut!(server); futures::select! { server = server => server?, client = client => client?? }; Ok(()) }