use async_trait::async_trait; use std::sync::Arc; use tempfile::tempdir; use tokio::sync::Mutex; use tokio_util::compat::TokioAsyncReadCompatExt; use mrpc::{Connection, Result as MrpcResult, RpcError, RpcSender, Server, Value}; const PINGS: u32 = 3; #[derive(Clone, Default)] struct PingPongService { pong_counter: Arc>, } #[async_trait] impl Connection for PingPongService { async fn handle_request( &self, _client: RpcSender, method: &str, params: Vec, ) -> MrpcResult { match method { "ping" => { let _ = params.first().and_then(|v| v.as_i64()).ok_or_else(|| { RpcError::Protocol("Expected integer parameter for ping".into()) })?; let mut count = self.pong_counter.lock().await; *count += 1; Ok(Value::Boolean(true)) } _ => Err(RpcError::Protocol(format!( "PingPongService: Unknown method: {}", method ))), } } } #[tokio::test] async fn test_mrpc_compatibility_with_msgpack_rpc() -> Result<(), Box> { let temp_dir = tempdir()?; let socket_path = temp_dir.path().join("pingpong.sock"); // Set up the mrpc server let pong_counter = Arc::new(Mutex::new(0_u32)); let pong_counter_clone = pong_counter.clone(); let server: Server = Server::from_fn(move || PingPongService { pong_counter: pong_counter_clone.clone(), }) .unix(&socket_path) .await?; let server_task = tokio::spawn(async move { if let Err(e) = server.run().await { panic!("Server error: {}", e); } }); // Give the server a moment to start up tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; // Create a msgpack-rpc client let socket = tokio::net::UnixStream::connect(&socket_path).await?; let client = msgpack_rpc::Client::new(socket.compat()); for i in 0..PINGS { client .request("ping", &[msgpack_rpc::Value::Integer(i.into())]) .await .unwrap(); // Add a small delay between requests tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; } // Give more time for all pings to be processed tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; // Check if our server received 10 pings let final_count = *pong_counter.lock().await; assert_eq!( final_count, PINGS, "Expected {} pongs, but got {}", PINGS, final_count ); server_task.abort(); Ok(()) }