| Crates.io | hsnet-rpc |
| lib.rs | hsnet-rpc |
| version | 0.1.0 |
| created_at | 2025-10-14 08:47:45.559469+00 |
| updated_at | 2025-10-14 08:47:45.559469+00 |
| description | Type-safe RPC framework for Rust with async support |
| homepage | |
| repository | |
| max_upload_size | |
| id | 1881890 |
| size | 80,370 |
类型安全的 RPC 框架,支持跨平台 IPC(Unix socket、Windows named pipe、TCP)。
1. 消除特殊情况
2. 类型安全优先
RpcError)3. 并发支持
使用 #[rpc(server)] 宏自动生成类型安全的注册代码:
use hsnet_rpc::{rpc, RpcError};
#[async_trait]
#[rpc(server)]
pub trait MyService {
#[method(name = "start")]
async fn start(&self, config: Config) -> Result<String, RpcError>;
#[method(name = "stop")]
async fn stop(&self, force: bool) -> Result<String, RpcError>;
}
宏会生成:
IntoRpcServer trait,提供 .into_rpc() 方法#[method] 的方法所有平台使用相同的 RPC 代码,只在传输层有差异:
// 创建服务
let service = MyServiceImpl::new(...);
let server = service.into_rpc();
// Unix
#[cfg(unix)]
server.run_unix("/tmp/app.sock").await?;
// Windows
#[cfg(windows)]
server.run_pipe(r"\\.\pipe\app").await?;
// TCP(跨平台)
server.run_tcp("127.0.0.1:8080").await?;
客户端可以并发发送多个请求,服务端并发执行:
let client = RpcClient::new(transport);
// 并发发送 3 个请求
let (r1, r2, r3) = tokio::join!(
client.call("method1", params1),
client.call("method2", params2),
client.call("method3", params3),
);
use hsnet_rpc::{rpc, RpcError};
use async_trait::async_trait;
use serde::{Serialize, Deserialize};
// 定义数据结构
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Config {
pub name: String,
pub port: u16,
}
// 使用宏定义 RPC trait
#[async_trait]
#[rpc(server)]
pub trait ControlRpc {
#[method(name = "start")]
async fn start(&self, config: Config) -> Result<String, RpcError>;
#[method(name = "stop")]
async fn stop(&self, force: bool) -> Result<String, RpcError>;
#[method(name = "status")]
async fn status(&self) -> Result<String, RpcError>;
}
use std::sync::Arc;
pub struct ControlService {
// 你的业务状态
}
impl ControlService {
pub fn new() -> Arc<Self> {
Arc::new(Self {})
}
}
// 实现 trait
#[async_trait]
impl ControlRpc for ControlService {
async fn start(&self, config: Config) -> Result<String, RpcError> {
println!("Starting with config: {:?}", config);
Ok("started".to_string())
}
async fn stop(&self, force: bool) -> Result<String, RpcError> {
println!("Stopping (force: {})", force);
Ok("stopped".to_string())
}
async fn status(&self) -> Result<String, RpcError> {
Ok("running".to_string())
}
}
use hsnet_rpc::IntoRpcServer; // 宏生成的 trait
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 创建服务实例(必须是 Arc)
let service = ControlService::new();
// 使用宏生成的 into_rpc() 方法
let server = service.into_rpc();
// 启动服务器(平台差异只在这里)
#[cfg(unix)]
server.run_unix("/tmp/control.sock").await?;
#[cfg(windows)]
server.run_pipe(r"\\.\pipe\control").await?;
Ok(())
}
use hsnet_rpc::RpcClient;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 连接到服务端
#[cfg(unix)]
let transport = hsnet_rpc::connect_unix("/tmp/control.sock").await?;
#[cfg(windows)]
let transport = hsnet_rpc::connect_pipe(r"\\.\pipe\control").await?;
// 创建客户端
let client = RpcClient::new(transport);
// 调用远程方法(类型安全)
let config = Config {
name: "test".to_string(),
port: 8080,
};
let result: String = client.call("start", config).await?;
println!("Start result: {}", result);
// 并发调用
let (status, stop_result) = tokio::join!(
client.call::<_, String>("status", ()),
client.call::<_, String>("stop", true),
);
Ok(())
}
请求流向:
Client: call("start", config)
↓ 序列化为 RpcRequest { id, method, params }
↓ 发送到 Transport (Unix/Windows/TCP)
↓ Length-prefixed JSON 编码
↓ 网络传输
↓
Server: 接收并解码
↓ 反序列化为 RpcRequest
↓ 根据 method 路由到对应 handler
↓ 反序列化 params 为具体类型(Config)
↓ 调用用户实现的 async fn
↓ 序列化返回值为 RpcResponse
↓ 发送回客户端
1. Transport 层(crates/hsnet-rpc/src/transport.rs)
tokio-util::codec::LengthDelimitedCodec 实现帧分割tokio-serde 实现 JSON 序列化Stream + Sink 接口2. RPC 层(crates/hsnet-rpc/src/server.rs, client.rs)
RpcServer: 方法路由、并发处理RpcClient: 请求管理、响应路由3. 类型系统(crates/hsnet-rpc/src/types.rs)
RpcRequest: { id, method, params }RpcResponse: { id, result?, error? }RpcError: 统一的可序列化错误类型4. Proc Macro(crates/hsnet-rpc-macro/src/lib.rs)
#[rpc(server)] 和 #[method(name = "xxx")]#[cfg] 条件编译输入:
#[rpc(server)]
pub trait ControlRpc {
#[method(name = "start")]
async fn start(&self, config: Config) -> Result<String, RpcError>;
}
生成:
// 1. 保留原始 trait(去掉宏属性)
pub trait ControlRpc {
async fn start(&self, config: Config) -> Result<String, RpcError>;
}
// 2. 生成扩展 trait
pub trait IntoRpcServer {
fn into_rpc(self: Arc<Self>) -> RpcServer;
}
// 3. 为所有实现了 ControlRpc 的类型实现 IntoRpcServer
impl<T: ControlRpc + Send + Sync + 'static> IntoRpcServer for T {
fn into_rpc(self: Arc<Self>) -> RpcServer {
let service = self;
let mut server = RpcServer::new();
// 注册 "start" 方法
server = server.handle("start", {
let service = service.clone();
move |config: Config| {
let service = service.clone();
async move { service.start(config).await }
}
});
server
}
}
config: Config)而不是引用(config: &Config)?A: 闭包跨越任务边界,需要 'static 生命周期。
生成的代码会创建独立的异步任务:
server.handle("start", {
let service = service.clone();
move |config: Config| { // 必须是值类型
let service = service.clone();
async move {
service.start(config).await // 这里跨越任务边界
}
}
});
如果用引用:
解决方案:
bool, u32)Arc<T> 或 String 等拥有所有权的类型async fn start(&self, config: Config) { let config = &config; ... }Arc<Self>?A: RPC Server 需要多个任务共享服务实例。
// 每个连接是一个任务
tokio::spawn(async move {
let router = router.clone(); // Arc clone
handle_connection(router, stream).await
});
// 每个请求是一个任务
tokio::spawn(async move {
let service = service.clone(); // Arc clone
service.start(config).await
});
Arc 提供:
Send + Sync)IntoRpcServer trait?A: Rust trait 方法调用需要 trait 在作用域内。
// 正确
use my_crate::IntoRpcServer;
let server = service.into_rpc();
// 错误(编译失败)
let server = service.into_rpc();
// ^^^^^^^^ method not found in `ControlService`
这是 Rust 的 trait 设计,防止命名冲突。
\r\n 分隔符?A: 消除转义的特殊情况。
之前的问题(\r\n 分隔):
\r\n 需要转义现在(Length-prefixed):
框架消除了大部分平台差异,只在以下地方需要 #[cfg]:
1. 传输层连接建立:
// Unix
#[cfg(unix)]
pub async fn connect_unix(path) -> ClientTransport<UnixStream>
// Windows
#[cfg(windows)]
pub async fn connect_pipe(path) -> ClientTransport<NamedPipeClient>
2. RPC 方法的条件编译:
#[rpc(server)]
pub trait ControlRpc {
// 通用方法
#[method(name = "start")]
async fn start(&self, config: Config) -> Result<String, RpcError>;
// Windows 专用方法
#[cfg(windows)]
#[method(name = "restart")]
async fn restart(&self) -> Result<String, RpcError>;
}
宏会自动处理 #[cfg] 属性,生成条件编译的注册代码。
3. 应用层的业务逻辑:
这部分由用户控制,框架不涉及。
统一的 RpcError 类型:
pub enum RpcError {
// 网络错误
Network(String),
Serialization(String),
ConnectionClosed(String),
// JSON-RPC 标准错误
Internal(String), // -32603
InvalidParams(String), // -32602
MethodNotFound(String), // -32601
// 用户自定义错误
Custom { code: i32, message: String },
}
用法:
async fn start(&self, config: Config) -> Result<String, RpcError> {
if config.port == 0 {
return Err(RpcError::invalid_params("port cannot be 0"));
}
some_operation()
.await
.map_err(|e| RpcError::internal(format!("operation failed: {}", e)))?;
Ok("started".to_string())
}
查看 examples/ 目录:
manual.rs - 不使用宏的手动实现(理解底层机制)tcp.rs - TCP 传输示例(展示传输层灵活性)concurrent.rs - 并发测试(验证并发处理能力)运行示例:
# 手动注册示例
cargo run --example manual
# TCP 传输示例
cargo run --example tcp
# 并发测试
cargo run --example concurrent
参考 client/src/ctrl_ipc/service.rs,这是在生产环境使用的完整示例:
Arc 和闭包的实际使用# 运行单元测试
cargo test -p hsnet-rpc
# 运行所有示例
cargo test -p hsnet-rpc --examples
MIT