| Crates.io | smux_rust |
| lib.rs | smux_rust |
| version | 0.2.1 |
| created_at | 2026-01-16 02:01:59.565145+00 |
| updated_at | 2026-01-17 14:23:11.102385+00 |
| description | A simple multiplexing library for Rust, inspired by xtaci/smux |
| homepage | https://github.com/hushuoyouli/smux_rust |
| repository | https://github.com/hushuoyouli/smux_rust |
| max_upload_size | |
| id | 2047616 |
| size | 135,272 |
一个用 Rust 实现的多路复用库,参考了 xtaci/smux 的 Go 实现。
use smux_rust::{client, server, Config};
use std::io::{Read, Write};
use std::net::TcpStream;
// 客户端示例
fn client_example() -> Result<(), Box<dyn std::error::Error>> {
// 建立底层连接(例如 TCP)
let tcp_conn = TcpStream::connect("127.0.0.1:8080")?;
// 创建客户端会话
let session = client(Box::new(tcp_conn), None)?;
// 打开一个新流
let mut stream = session.open_stream()?;
// 写入数据
stream.write_all(b"Hello, Server!")?;
// 读取响应
let mut buf = [0u8; 1024];
let n = stream.read(&mut buf)?;
println!("Received: {}", String::from_utf8_lossy(&buf[..n]));
// 关闭流
stream.close()?;
// 关闭会话
session.close()?;
Ok(())
}
// 服务器端示例
fn server_example() -> Result<(), Box<dyn std::error::Error>> {
use std::net::{TcpListener, TcpStream};
let listener = TcpListener::bind("127.0.0.1:8080")?;
for stream in listener.incoming() {
let tcp_conn = stream?;
// 创建服务器会话
let session = server(Box::new(tcp_conn), None)?;
// 接受流
let mut stream = session.accept_stream()?;
// 读取数据
let mut buf = [0u8; 1024];
let n = stream.read(&mut buf)?;
println!("Received: {}", String::from_utf8_lossy(&buf[..n]));
// 写入响应
stream.write_all(b"Hello, Client!")?;
// 关闭流
stream.close()?;
}
Ok(())
}
use smux_rust::Config;
use std::time::Duration;
let config = Config {
version: 2, // 使用协议版本 2
keep_alive_disabled: false,
keep_alive_interval: Duration::from_secs(10),
keep_alive_timeout: Duration::from_secs(30),
max_frame_size: 32768,
max_receive_buffer: 4_194_304, // 4MB
max_stream_buffer: 65536, // 64KB
};
// 验证配置
config.verify()?;
// 使用自定义配置创建会话
let session = client(Box::new(tcp_conn), Some(config))?;
// 在同一个会话中打开多个流
let stream1 = session.open_stream().await?;
let stream2 = session.open_stream().await?;
let stream3 = session.open_stream().await?;
// 每个流都可以独立读写
stream1.write_all(b"Stream 1 data").await?;
stream2.write_all(b"Stream 2 data").await?;
stream3.write_all(b"Stream 3 data").await?;
use std::time::{Duration, Instant};
// 1. 获取活跃流数量
let num = session.num_streams().await;
println!("活跃流数量: {}", num);
// 2. 设置会话截止时间
session.set_deadline(Some(Instant::now() + Duration::from_secs(30))).await;
// 3. 设置流的读写超时
stream.set_read_deadline(Some(Instant::now() + Duration::from_secs(5))).await;
stream.set_write_deadline(Some(Instant::now() + Duration::from_secs(5))).await;
// 4. 获取流关闭通知
let die_notifier = stream.get_die_notifier();
tokio::spawn(async move {
die_notifier.notified().await;
println!("流已关闭");
});
// 5. 检查流是否关闭
if stream.is_closed().await {
println!("流已关闭");
}
// 6. 等待多个流中任意一个就绪(PollWait)
let streams = vec![&stream1, &stream2, &stream3];
let idx = session.poll_wait(&streams).await?;
println!("流 {} 有数据可读", idx);
// 7. 获取流地址
if let Some(addr) = stream.local_addr() {
println!("本地地址: {}", addr);
}
// 8. 高效复制流数据到文件或其他 AsyncWrite
let mut file = tokio::fs::File::create("output.dat").await?;
let bytes_copied = stream.copy_to(&mut file).await?;
println!("复制了 {} 字节", bytes_copied);
帧格式:
VERSION(1B) | CMD(1B) | LENGTH(2B) | STREAMID(4B) | DATA(LENGTH)
命令类型:
cmdSYN(0) - 流打开cmdFIN(1) - 流关闭cmdPSH(2) - 数据推送cmdNOP(3) - 无操作cmdUPD(4) - 窗口更新(仅版本 2)流ID分配:
bytes - 字节缓冲区处理parking_lot - 高性能互斥锁MIT License