hsnet-rpc

Crates.iohsnet-rpc
lib.rshsnet-rpc
version0.1.0
created_at2025-10-14 08:47:45.559469+00
updated_at2025-10-14 08:47:45.559469+00
descriptionType-safe RPC framework for Rust with async support
homepage
repository
max_upload_size
id1881890
size80,370
lee (loyalpartner)

documentation

README

hsnet-rpc

类型安全的 RPC 框架,支持跨平台 IPC(Unix socket、Windows named pipe、TCP)。

设计哲学

1. 消除特殊情况

  • 所有平台使用相同的协议(Length-prefixed JSON)
  • RPC 逻辑完全平台无关,只在传输层有差异
  • 宏生成代码消除手动注册的重复

2. 类型安全优先

  • 编译期类型检查(参数和返回值)
  • 自动序列化/反序列化
  • 统一的错误类型(RpcError

3. 并发支持

  • 单个连接可以并发处理多个请求
  • 客户端支持并发发送,服务端支持并发执行
  • 响应乱序返回,根据 ID 路由

核心特性

✅ 类型安全的 Handler

使用 #[rpc(server)] 宏自动生成类型安全的注册代码:

use hsnet_rpc::{rpc, RpcError};

#[async_trait]
#[rpc(server)]
pub trait MyService {
    #[method(name = "start")]
    async fn start(&self, config: Config) -> Result<String, RpcError>;

    #[method(name = "stop")]
    async fn stop(&self, force: bool) -> Result<String, RpcError>;
}

宏会生成:

  • 保留原始 trait 定义(供业务实现)
  • 生成 IntoRpcServer trait,提供 .into_rpc() 方法
  • 自动注册所有标记 #[method] 的方法

✅ 跨平台统一

所有平台使用相同的 RPC 代码,只在传输层有差异:

// 创建服务
let service = MyServiceImpl::new(...);
let server = service.into_rpc();

// Unix
#[cfg(unix)]
server.run_unix("/tmp/app.sock").await?;

// Windows
#[cfg(windows)]
server.run_pipe(r"\\.\pipe\app").await?;

// TCP(跨平台)
server.run_tcp("127.0.0.1:8080").await?;

✅ 并发请求处理

客户端可以并发发送多个请求,服务端并发执行:

let client = RpcClient::new(transport);

// 并发发送 3 个请求
let (r1, r2, r3) = tokio::join!(
    client.call("method1", params1),
    client.call("method2", params2),
    client.call("method3", params3),
);

快速开始

1. 定义 RPC 接口

use hsnet_rpc::{rpc, RpcError};
use async_trait::async_trait;
use serde::{Serialize, Deserialize};

// 定义数据结构
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Config {
    pub name: String,
    pub port: u16,
}

// 使用宏定义 RPC trait
#[async_trait]
#[rpc(server)]
pub trait ControlRpc {
    #[method(name = "start")]
    async fn start(&self, config: Config) -> Result<String, RpcError>;

    #[method(name = "stop")]
    async fn stop(&self, force: bool) -> Result<String, RpcError>;

    #[method(name = "status")]
    async fn status(&self) -> Result<String, RpcError>;
}

2. 实现业务逻辑

use std::sync::Arc;

pub struct ControlService {
    // 你的业务状态
}

impl ControlService {
    pub fn new() -> Arc<Self> {
        Arc::new(Self {})
    }
}

// 实现 trait
#[async_trait]
impl ControlRpc for ControlService {
    async fn start(&self, config: Config) -> Result<String, RpcError> {
        println!("Starting with config: {:?}", config);
        Ok("started".to_string())
    }

    async fn stop(&self, force: bool) -> Result<String, RpcError> {
        println!("Stopping (force: {})", force);
        Ok("stopped".to_string())
    }

    async fn status(&self) -> Result<String, RpcError> {
        Ok("running".to_string())
    }
}

3. 启动服务端

use hsnet_rpc::IntoRpcServer; // 宏生成的 trait

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 创建服务实例(必须是 Arc)
    let service = ControlService::new();

    // 使用宏生成的 into_rpc() 方法
    let server = service.into_rpc();

    // 启动服务器(平台差异只在这里)
    #[cfg(unix)]
    server.run_unix("/tmp/control.sock").await?;

    #[cfg(windows)]
    server.run_pipe(r"\\.\pipe\control").await?;

    Ok(())
}

4. 客户端调用

use hsnet_rpc::RpcClient;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 连接到服务端
    #[cfg(unix)]
    let transport = hsnet_rpc::connect_unix("/tmp/control.sock").await?;

    #[cfg(windows)]
    let transport = hsnet_rpc::connect_pipe(r"\\.\pipe\control").await?;

    // 创建客户端
    let client = RpcClient::new(transport);

    // 调用远程方法(类型安全)
    let config = Config {
        name: "test".to_string(),
        port: 8080,
    };
    let result: String = client.call("start", config).await?;
    println!("Start result: {}", result);

    // 并发调用
    let (status, stop_result) = tokio::join!(
        client.call::<_, String>("status", ()),
        client.call::<_, String>("stop", true),
    );

    Ok(())
}

架构说明

数据流

请求流向:
Client: call("start", config)
  ↓ 序列化为 RpcRequest { id, method, params }
  ↓ 发送到 Transport (Unix/Windows/TCP)
  ↓ Length-prefixed JSON 编码
  ↓ 网络传输
  ↓
Server: 接收并解码
  ↓ 反序列化为 RpcRequest
  ↓ 根据 method 路由到对应 handler
  ↓ 反序列化 params 为具体类型(Config)
  ↓ 调用用户实现的 async fn
  ↓ 序列化返回值为 RpcResponse
  ↓ 发送回客户端

关键组件

1. Transport 层(crates/hsnet-rpc/src/transport.rs)

  • 使用 tokio-util::codec::LengthDelimitedCodec 实现帧分割
  • 使用 tokio-serde 实现 JSON 序列化
  • 提供统一的 Stream + Sink 接口
  • 平台差异只在连接建立时

2. RPC 层(crates/hsnet-rpc/src/server.rs, client.rs)

  • RpcServer: 方法路由、并发处理
  • RpcClient: 请求管理、响应路由
  • 完全平台无关

3. 类型系统(crates/hsnet-rpc/src/types.rs)

  • RpcRequest: { id, method, params }
  • RpcResponse: { id, result?, error? }
  • RpcError: 统一的可序列化错误类型

4. Proc Macro(crates/hsnet-rpc-macro/src/lib.rs)

  • 解析 #[rpc(server)]#[method(name = "xxx")]
  • 生成 handler 注册代码
  • 处理 #[cfg] 条件编译

Macro 工作原理

输入:

#[rpc(server)]
pub trait ControlRpc {
    #[method(name = "start")]
    async fn start(&self, config: Config) -> Result<String, RpcError>;
}

生成:

// 1. 保留原始 trait(去掉宏属性)
pub trait ControlRpc {
    async fn start(&self, config: Config) -> Result<String, RpcError>;
}

// 2. 生成扩展 trait
pub trait IntoRpcServer {
    fn into_rpc(self: Arc<Self>) -> RpcServer;
}

// 3. 为所有实现了 ControlRpc 的类型实现 IntoRpcServer
impl<T: ControlRpc + Send + Sync + 'static> IntoRpcServer for T {
    fn into_rpc(self: Arc<Self>) -> RpcServer {
        let service = self;
        let mut server = RpcServer::new();

        // 注册 "start" 方法
        server = server.handle("start", {
            let service = service.clone();
            move |config: Config| {
                let service = service.clone();
                async move { service.start(config).await }
            }
        });

        server
    }
}

设计决策

Q: 为什么参数必须是值类型(config: Config)而不是引用(config: &Config)?

A: 闭包跨越任务边界,需要 'static 生命周期。

生成的代码会创建独立的异步任务:

server.handle("start", {
    let service = service.clone();
    move |config: Config| {  // 必须是值类型
        let service = service.clone();
        async move {
            service.start(config).await  // 这里跨越任务边界
        }
    }
});

如果用引用:

  • 闭包会捕获引用
  • 异步任务可能在原数据释放后执行
  • 编译器会拒绝(生命周期错误)

解决方案:

  • 小对象直接传值(如 bool, u32
  • 大对象用 Arc<T>String 等拥有所有权的类型
  • trait 方法内部可以转换为引用:async fn start(&self, config: Config) { let config = &config; ... }

Q: 为什么服务必须是 Arc<Self>

A: RPC Server 需要多个任务共享服务实例。

// 每个连接是一个任务
tokio::spawn(async move {
    let router = router.clone();  // Arc clone
    handle_connection(router, stream).await
});

// 每个请求是一个任务
tokio::spawn(async move {
    let service = service.clone();  // Arc clone
    service.start(config).await
});

Arc 提供:

  • 跨任务共享(Send + Sync
  • 自动内存管理(最后一个任务结束时释放)

Q: 为什么需要手动导入 IntoRpcServer trait?

A: Rust trait 方法调用需要 trait 在作用域内。

// 正确
use my_crate::IntoRpcServer;
let server = service.into_rpc();

// 错误(编译失败)
let server = service.into_rpc();
//                    ^^^^^^^^ method not found in `ControlService`

这是 Rust 的 trait 设计,防止命名冲突。

Q: 为什么使用 Length-prefixed 而不是 \r\n 分隔符?

A: 消除转义的特殊情况。

之前的问题(\r\n 分隔):

  • JSON 中的 \r\n 需要转义
  • 二进制数据需要 base64 编码
  • 性能损失

现在(Length-prefixed):

  • 每帧前 4 字节表示长度
  • JSON 可以包含任意字符
  • 无需转义,无特殊情况
  • 标准做法(gRPC、protobuf、tokio-serde 都用这个)

平台差异

框架消除了大部分平台差异,只在以下地方需要 #[cfg]

1. 传输层连接建立:

// Unix
#[cfg(unix)]
pub async fn connect_unix(path) -> ClientTransport<UnixStream>

// Windows
#[cfg(windows)]
pub async fn connect_pipe(path) -> ClientTransport<NamedPipeClient>

2. RPC 方法的条件编译:

#[rpc(server)]
pub trait ControlRpc {
    // 通用方法
    #[method(name = "start")]
    async fn start(&self, config: Config) -> Result<String, RpcError>;

    // Windows 专用方法
    #[cfg(windows)]
    #[method(name = "restart")]
    async fn restart(&self) -> Result<String, RpcError>;
}

宏会自动处理 #[cfg] 属性,生成条件编译的注册代码。

3. 应用层的业务逻辑:

这部分由用户控制,框架不涉及。

错误处理

统一的 RpcError 类型:

pub enum RpcError {
    // 网络错误
    Network(String),
    Serialization(String),
    ConnectionClosed(String),

    // JSON-RPC 标准错误
    Internal(String),         // -32603
    InvalidParams(String),    // -32602
    MethodNotFound(String),   // -32601

    // 用户自定义错误
    Custom { code: i32, message: String },
}

用法:

async fn start(&self, config: Config) -> Result<String, RpcError> {
    if config.port == 0 {
        return Err(RpcError::invalid_params("port cannot be 0"));
    }

    some_operation()
        .await
        .map_err(|e| RpcError::internal(format!("operation failed: {}", e)))?;

    Ok("started".to_string())
}

示例

查看 examples/ 目录:

  • manual.rs - 不使用宏的手动实现(理解底层机制)
  • tcp.rs - TCP 传输示例(展示传输层灵活性)
  • concurrent.rs - 并发测试(验证并发处理能力)

运行示例:

# 手动注册示例
cargo run --example manual

# TCP 传输示例
cargo run --example tcp

# 并发测试
cargo run --example concurrent

实际使用

参考 client/src/ctrl_ipc/service.rs,这是在生产环境使用的完整示例:

  • 定义了 18 个 RPC 方法
  • 支持跨平台条件编译
  • 包含复杂的状态管理
  • 展示了 Arc 和闭包的实际使用

测试

# 运行单元测试
cargo test -p hsnet-rpc

# 运行所有示例
cargo test -p hsnet-rpc --examples

技术栈

  • 异步运行时: tokio
  • 序列化: serde_json
  • 帧分割: tokio-util::codec::LengthDelimitedCodec
  • 编解码: tokio-serde
  • 过程宏: syn + quote

相关文档

License

MIT

Commit count: 0

cargo fmt