smux_rust

Crates.iosmux_rust
lib.rssmux_rust
version0.2.1
created_at2026-01-16 02:01:59.565145+00
updated_at2026-01-17 14:23:11.102385+00
descriptionA simple multiplexing library for Rust, inspired by xtaci/smux
homepagehttps://github.com/hushuoyouli/smux_rust
repositoryhttps://github.com/hushuoyouli/smux_rust
max_upload_size
id2047616
size135,272
(hushuoyouli)

documentation

https://docs.rs/smux_rust

README

smux_rust

一个用 Rust 实现的多路复用库,参考了 xtaci/smux 的 Go 实现。

功能特性

  • 多路复用:在单个底层连接上创建多个流
  • 流量控制:Token bucket 和滑动窗口控制
  • 内存优化:对象池减少内存分配开销
  • 流量整形:轮询调度算法保证流之间的公平性
  • 协议支持:支持协议版本 1 和版本 2

使用方法

基本使用

use smux_rust::{client, server, Config};
use std::io::{Read, Write};
use std::net::TcpStream;

// 客户端示例
fn client_example() -> Result<(), Box<dyn std::error::Error>> {
    // 建立底层连接(例如 TCP)
    let tcp_conn = TcpStream::connect("127.0.0.1:8080")?;
    
    // 创建客户端会话
    let session = client(Box::new(tcp_conn), None)?;
    
    // 打开一个新流
    let mut stream = session.open_stream()?;
    
    // 写入数据
    stream.write_all(b"Hello, Server!")?;
    
    // 读取响应
    let mut buf = [0u8; 1024];
    let n = stream.read(&mut buf)?;
    println!("Received: {}", String::from_utf8_lossy(&buf[..n]));
    
    // 关闭流
    stream.close()?;
    
    // 关闭会话
    session.close()?;
    
    Ok(())
}

// 服务器端示例
fn server_example() -> Result<(), Box<dyn std::error::Error>> {
    use std::net::{TcpListener, TcpStream};
    
    let listener = TcpListener::bind("127.0.0.1:8080")?;
    
    for stream in listener.incoming() {
        let tcp_conn = stream?;
        
        // 创建服务器会话
        let session = server(Box::new(tcp_conn), None)?;
        
        // 接受流
        let mut stream = session.accept_stream()?;
        
        // 读取数据
        let mut buf = [0u8; 1024];
        let n = stream.read(&mut buf)?;
        println!("Received: {}", String::from_utf8_lossy(&buf[..n]));
        
        // 写入响应
        stream.write_all(b"Hello, Client!")?;
        
        // 关闭流
        stream.close()?;
    }
    
    Ok(())
}

自定义配置

use smux_rust::Config;
use std::time::Duration;

let config = Config {
    version: 2,  // 使用协议版本 2
    keep_alive_disabled: false,
    keep_alive_interval: Duration::from_secs(10),
    keep_alive_timeout: Duration::from_secs(30),
    max_frame_size: 32768,
    max_receive_buffer: 4_194_304,  // 4MB
    max_stream_buffer: 65536,       // 64KB
};

// 验证配置
config.verify()?;

// 使用自定义配置创建会话
let session = client(Box::new(tcp_conn), Some(config))?;

多流使用

// 在同一个会话中打开多个流
let stream1 = session.open_stream().await?;
let stream2 = session.open_stream().await?;
let stream3 = session.open_stream().await?;

// 每个流都可以独立读写
stream1.write_all(b"Stream 1 data").await?;
stream2.write_all(b"Stream 2 data").await?;
stream3.write_all(b"Stream 3 data").await?;

高级功能

use std::time::{Duration, Instant};

// 1. 获取活跃流数量
let num = session.num_streams().await;
println!("活跃流数量: {}", num);

// 2. 设置会话截止时间
session.set_deadline(Some(Instant::now() + Duration::from_secs(30))).await;

// 3. 设置流的读写超时
stream.set_read_deadline(Some(Instant::now() + Duration::from_secs(5))).await;
stream.set_write_deadline(Some(Instant::now() + Duration::from_secs(5))).await;

// 4. 获取流关闭通知
let die_notifier = stream.get_die_notifier();
tokio::spawn(async move {
    die_notifier.notified().await;
    println!("流已关闭");
});

// 5. 检查流是否关闭
if stream.is_closed().await {
    println!("流已关闭");
}

// 6. 等待多个流中任意一个就绪(PollWait)
let streams = vec![&stream1, &stream2, &stream3];
let idx = session.poll_wait(&streams).await?;
println!("流 {} 有数据可读", idx);

// 7. 获取流地址
if let Some(addr) = stream.local_addr() {
    println!("本地地址: {}", addr);
}

// 8. 高效复制流数据到文件或其他 AsyncWrite
let mut file = tokio::fs::File::create("output.dat").await?;
let bytes_copied = stream.copy_to(&mut file).await?;
println!("复制了 {} 字节", bytes_copied);

协议规范

帧格式:

VERSION(1B) | CMD(1B) | LENGTH(2B) | STREAMID(4B) | DATA(LENGTH)

命令类型:

  • cmdSYN(0) - 流打开
  • cmdFIN(1) - 流关闭
  • cmdPSH(2) - 数据推送
  • cmdNOP(3) - 无操作
  • cmdUPD(4) - 窗口更新(仅版本 2)

流ID分配:

  • 客户端使用奇数,从 1 开始
  • 服务器端使用偶数,从 0 开始

依赖

  • bytes - 字节缓冲区处理
  • parking_lot - 高性能互斥锁

许可证

MIT License

Commit count: 0

cargo fmt