| Crates.io | msgtrans |
| lib.rs | msgtrans |
| version | 1.0.0 |
| created_at | 2024-09-02 09:06:26.359645+00 |
| updated_at | 2025-06-29 07:35:37.887948+00 |
| description | Support for a variety of communication protocols such as TCP / QUIC / WebSocket, easy to create server and client network library. |
| homepage | |
| repository | https://github.com/zoujiaqing/msgtrans |
| max_upload_size | |
| id | 1360315 |
| size | 729,727 |
đ Language: English | įŽäŊ䏿
Enterprise-grade modern multi-protocol communication framework with unified interface supporting TCP, WebSocket, QUIC and more
SharedPacket and ArcPacket implement memory zero-copy[dependencies]
msgtrans = "1.0.0"
use msgtrans::{
transport::TransportServerBuilder,
protocol::{TcpServerConfig, WebSocketServerConfig, QuicServerConfig},
event::ServerEvent,
tokio,
};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Configure multiple protocols - same business logic supports multiple protocols
let tcp_config = TcpServerConfig::new("127.0.0.1:8001")?;
let websocket_config = WebSocketServerConfig::new("127.0.0.1:8002")?
.with_path("/ws");
let quic_config = QuicServerConfig::new("127.0.0.1:8003")?;
// Build multi-protocol server
let mut server = TransportServerBuilder::new()
.max_connections(10000)
.with_protocol(tcp_config)
.with_protocol(websocket_config)
.with_protocol(quic_config)
.build()
.await?;
println!("đ Multi-protocol server started successfully!");
// Get event stream
let mut events = server.events().await?;
// Unified event handling - all protocols use the same logic
while let Some(event) = events.recv().await {
match event {
ServerEvent::ConnectionEstablished { session_id, .. } => {
println!("New connection: {}", session_id);
}
ServerEvent::MessageReceived { session_id, context } => {
// Echo message - protocol transparent
let message = String::from_utf8_lossy(&context.data);
let response = format!("Echo: {}", message);
let _ = server.send(session_id, response.as_bytes()).await;
}
ServerEvent::ConnectionClosed { session_id, .. } => {
println!("Connection closed: {}", session_id);
}
_ => {}
}
}
Ok(())
}
use msgtrans::{
transport::TransportClientBuilder,
protocol::TcpClientConfig,
event::ClientEvent,
tokio,
};
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Configure client - configuration-driven protocol selection
let tcp_config = TcpClientConfig::new("127.0.0.1:8001")?
.with_timeout(Duration::from_secs(30));
// Build client - zero configuration with high performance
let mut client = TransportClientBuilder::new()
.with_protocol(tcp_config)
.build()
.await?;
// Connect to server
client.connect().await?;
// Send message - simple API, directly send byte data
let _result = client.send("Hello, MsgTrans!".as_bytes()).await?;
println!("â
Message sent successfully");
// Send request and wait for response
match client.request("What time is it?".as_bytes()).await? {
result if result.data.is_some() => {
let response = String::from_utf8_lossy(result.data.as_ref().unwrap());
println!("đĨ Received response: {}", response);
}
_ => println!("â Request timeout or failed"),
}
// Receive events - unified event model
let mut events = client.events().await?;
tokio::spawn(async move {
while let Some(event) = events.recv().await {
match event {
ClientEvent::MessageReceived(context) => {
let message = String::from_utf8_lossy(&context.data);
println!("đ¨ Received message: {}", message);
}
ClientEvent::Disconnected { .. } => {
println!("đ Connection closed");
break;
}
_ => {}
}
}
});
Ok(())
}
âââââââââââââââââââââââââââââââââââââââ
â đ¯ Application Layer â â Business logic, protocol-agnostic
âââââââââââââââââââââââââââââââââââââââ¤
â đ Transport Layer â â Connection management, unified API
â âââ TransportServer/Client â âĸ Connection lifecycle management
â âââ SessionManager â âĸ Event routing and dispatching
â âââ EventStream â âĸ Message passing and broadcasting
âââââââââââââââââââââââââââââââââââââââ¤
â đĄ Protocol Layer â â Protocol implementation, extensible
â âââ TCP/WebSocket/QUIC â âĸ Protocol-specific adapters
â âââ ProtocolAdapter â âĸ Protocol configuration management
â âââ ConfigurationRegistry â âĸ Protocol registration mechanism
âââââââââââââââââââââââââââââââââââââââ
// Same server code, different protocol configurations
let server = TransportServerBuilder::new()
.with_protocol(TcpServerConfig::new("0.0.0.0:8080")?)
.build().await?; // TCP version
let server = TransportServerBuilder::new()
.with_protocol(QuicServerConfig::new("0.0.0.0:8080")?)
.build().await?; // QUIC version - identical business logic
// Server event types
pub enum ServerEvent {
ConnectionEstablished { session_id: SessionId, info: ConnectionInfo },
MessageReceived { session_id: SessionId, context: TransportContext },
MessageSent { session_id: SessionId, message_id: u32 },
ConnectionClosed { session_id: SessionId, reason: CloseReason },
TransportError { session_id: Option<SessionId>, error: TransportError },
}
// Client event types
pub enum ClientEvent {
Connected { info: ConnectionInfo },
MessageReceived(TransportContext),
MessageSent { message_id: u32 },
Disconnected { reason: CloseReason },
Error { error: TransportError },
}
// Concise event handling pattern - Server
let mut events = server.events().await?;
while let Some(event) = events.recv().await {
match event {
ServerEvent::MessageReceived { session_id, context } => {
// Protocol-agnostic business processing - directly use byte data
let message = String::from_utf8_lossy(&context.data);
let response = format!("Processing result: {}", message);
server.send(session_id, response.as_bytes()).await?;
}
_ => {}
}
}
// Concise event handling pattern - Client
let mut events = client.events().await?;
while let Some(event) = events.recv().await {
match event {
ClientEvent::MessageReceived(context) => {
// Handle received messages - directly use byte data
let message = String::from_utf8_lossy(&context.data);
println!("Received: {}", message);
}
_ => {}
}
}
// User-level API is simple, underlying automatic lock-free optimization
// Concurrent sending - internally optimized with lock-free queues
let tasks: Vec<_> = (0..1000).map(|i| {
let client = client.clone();
tokio::spawn(async move {
let message = format!("Message {}", i);
client.send(message.as_bytes()).await
})
}).collect();
// Server high-concurrency processing - internally using lock-free hash tables for session management
let mut events = server.events().await?;
while let Some(event) = events.recv().await {
match event {
ServerEvent::MessageReceived { session_id, context } => {
// High-concurrency processing, lock-free session access
tokio::spawn(async move {
let response = process_message(&context.data).await;
server.send(session_id, &response).await
});
}
_ => {}
}
}
// CPU-aware automatic optimization - zero configuration high performance
let config = ConnectionConfig::auto_optimized(); // Auto-tuning based on CPU core count
// Intelligent connection pool - adaptive load
let server = TransportServerBuilder::new()
.connection_pool_config(
ConnectionPoolConfig::adaptive() // Dynamic scaling
.with_initial_size(100)
.with_max_size(10000)
)
.build().await?;
// User API always simple - internal automatic zero-copy optimization
let result = client.send("Hello, World!".as_bytes()).await?;
// Large data transmission - automatic zero-copy handling
let large_data = vec![0u8; 1024 * 1024]; // 1MB data
let result = client.send(&large_data).await?;
// Request-response - automatic zero-copy optimization
let response = client.request(b"Get user data").await?;
if let Some(data) = response.data {
// Data transmission process already optimized, no additional copying needed
process_response(&data);
}
// 1. Implement protocol adapter
pub struct MyProtocolAdapter {
connection: MyConnection,
event_sender: broadcast::Sender<TransportEvent>,
}
#[async_trait]
impl ProtocolAdapter for MyProtocolAdapter {
async fn send(&mut self, packet: Packet) -> Result<(), TransportError> {
// Implement protocol-specific send logic
self.connection.send(packet.payload()).await?;
Ok(())
}
fn connection_info(&self) -> ConnectionInfo {
// Return connection information
ConnectionInfo::new("MyProtocol", self.connection.peer_addr())
}
fn events(&self) -> broadcast::Receiver<TransportEvent> {
self.event_sender.subscribe()
}
}
// 2. Implement configuration structure
#[derive(Debug, Clone)]
pub struct MyProtocolServerConfig {
pub bind_address: SocketAddr,
pub custom_setting: String,
}
#[async_trait]
impl ServerConfig for MyProtocolServerConfig {
type Adapter = MyProtocolAdapter;
async fn build_server(&self) -> Result<Self::Adapter, TransportError> {
// Build server adapter
let connection = MyConnection::bind(&self.bind_address).await?;
let (event_sender, _) = broadcast::channel(1000);
Ok(MyProtocolAdapter {
connection,
event_sender,
})
}
}
// 3. Seamless integration - exactly the same usage as built-in protocols
let my_config = MyProtocolServerConfig {
bind_address: "127.0.0.1:9000".parse()?,
custom_setting: "custom_value".to_string(),
};
let server = TransportServerBuilder::new()
.with_protocol(my_config) // Use directly!
.build()
.await?;
use msgtrans::{
transport::TransportServerBuilder,
protocol::WebSocketServerConfig,
event::ServerEvent,
SessionId,
tokio,
};
use std::collections::HashMap;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = WebSocketServerConfig::new("127.0.0.1:8080")?
.with_path("/chat");
let server = TransportServerBuilder::new()
.with_protocol(config)
.max_connections(1000)
.build()
.await?;
println!("đ WebSocket Chat Server: ws://127.0.0.1:8080/chat");
// Chat room management
let mut chat_rooms: HashMap<String, Vec<SessionId>> = HashMap::new();
let mut events = server.events().await?;
while let Some(event) = events.recv().await {
match event {
ServerEvent::MessageReceived { session_id, context } => {
let message = String::from_utf8_lossy(&context.data);
// Parse chat commands
if message.starts_with("/join ") {
let room = message[6..].to_string();
chat_rooms.entry(room.clone()).or_default().push(session_id);
let response = format!("Joined room: {}", room);
let _ = server.send(session_id, response.as_bytes()).await;
} else {
// Broadcast message to all users in the room
for (room, members) in &chat_rooms {
if members.contains(&session_id) {
let broadcast_msg = format!("[{}] {}", room, message);
for &member_id in members {
let _ = server.send(member_id, broadcast_msg.as_bytes()).await;
}
break;
}
}
}
}
ServerEvent::ConnectionClosed { session_id, .. } => {
// Remove user from all rooms
for members in chat_rooms.values_mut() {
members.retain(|&id| id != session_id);
}
}
_ => {}
}
}
Ok(())
}
use msgtrans::{
transport::TransportClientBuilder,
protocol::QuicClientConfig,
tokio,
};
use std::time::Instant;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = QuicClientConfig::new("127.0.0.1:8003")?
.with_server_name("localhost")
.with_alpn(vec![b"msgtrans".to_vec()]);
let client = TransportClientBuilder::new()
.with_protocol(config)
.build()
.await?;
client.connect().await?;
println!("â
QUIC connection established successfully");
// High-concurrency message sending test
let start = Instant::now();
let message_count = 10000;
let tasks: Vec<_> = (0..message_count).map(|i| {
let client = client.clone();
tokio::spawn(async move {
let message = format!("High-performance message {}", i);
client.send(message.as_bytes()).await
})
}).collect();
// Wait for all messages to complete sending
for task in tasks {
task.await??;
}
let duration = start.elapsed();
println!("đ {} messages sent, time elapsed: {:?}", message_count, duration);
println!("đ Average per second: {:.0} messages", message_count as f64 / duration.as_secs_f64());
// Test request-response performance
let start = Instant::now();
let request_count = 1000;
for i in 0..request_count {
let request_data = format!("Request {}", i);
match client.request(request_data.as_bytes()).await? {
result if result.data.is_some() => {
// Request successful, record response time
if i % 100 == 0 {
println!("â
Request {} completed", i);
}
}
_ => println!("â Request {} timeout", i),
}
}
let duration = start.elapsed();
println!("đ {} requests completed, time elapsed: {:?}", request_count, duration);
println!("đ Average per second: {:.0} requests", request_count as f64 / duration.as_secs_f64());
Ok(())
}
// TCP Server - High reliability configuration
let tcp_config = TcpServerConfig::new("0.0.0.0:8001")?
.with_max_connections(10000)
.with_keepalive(Duration::from_secs(60))
.with_nodelay(true)
.with_reuse_addr(true);
// WebSocket Server - Web integration configuration
let ws_config = WebSocketServerConfig::new("0.0.0.0:8002")?
.with_path("/api/ws")
.with_max_frame_size(1024 * 1024)
.with_max_connections(5000);
// QUIC Server - Next-generation protocol configuration
let quic_config = QuicServerConfig::new("0.0.0.0:8003")?
.with_cert_path("cert.pem")
.with_key_path("key.pem")
.with_alpn(vec![b"h3".to_vec(), b"msgtrans".to_vec()])
.with_max_concurrent_streams(1000);
// Zero configuration - automatic optimization (recommended)
let server = TransportServerBuilder::new()
.with_protocol(tcp_config)
.build().await?; // Automatically optimized based on CPU
// High-performance configuration - manual tuning
let server = TransportServerBuilder::new()
.with_protocol(tcp_config)
.connection_config(ConnectionConfig::high_performance())
.max_connections(50000)
.build().await?;
// Resource-saving configuration - low memory environment
let server = TransportServerBuilder::new()
.with_protocol(tcp_config)
.connection_config(ConnectionConfig::memory_optimized())
.max_connections(1000)
.build().await?;
// Real-time statistics - zero-copy performance monitoring
let stats = server.get_stats().await;
println!("Active connections: {}", stats.active_connections);
println!("Total messages: {}", stats.total_messages);
println!("Average latency: {:?}", stats.average_latency);
println!("Memory usage: {} MB", stats.memory_usage_mb);
// Protocol distribution statistics
for (protocol, count) in &stats.protocol_distribution {
println!("{}: {} connections", protocol, count);
}
use msgtrans::error::{TransportError, CloseReason};
// Message sending error handling
match client.send("Hello, World!".as_bytes()).await {
Ok(result) => println!("â
Message sent successfully (ID: {})", result.message_id),
Err(TransportError::ConnectionLost { .. }) => {
println!("đ Connection lost, attempting reconnection");
client.connect().await?;
}
Err(TransportError::ProtocolError { protocol, error }) => {
println!("â ī¸ Protocol error [{}]: {}", protocol, error);
}
Err(e) => println!("â Other error: {}", e),
}
// Request-response error handling
match client.request("Get status".as_bytes()).await {
Ok(result) => {
match result.data {
Some(data) => {
let response = String::from_utf8_lossy(&data);
println!("đĨ Received response: {}", response);
}
None => println!("â° Request timeout (ID: {})", result.message_id),
}
}
Err(TransportError::Timeout { duration }) => {
println!("â° Request timeout: {:?}", duration);
}
Err(e) => println!("â Request failed: {}", e),
}
// Server-side sending error handling
match server.send(session_id, "Response data".as_bytes()).await {
Ok(result) => println!("â
Successfully sent to session {}", session_id),
Err(TransportError::ConnectionLost { .. }) => {
println!("đ Session {} connection disconnected", session_id);
// Automatic session cleanup
}
Err(e) => println!("â Send failed: {}", e),
}
// Connection pool management
let pool_config = ConnectionPoolConfig::adaptive()
.with_initial_size(100)
.with_max_size(10000)
.with_idle_timeout(Duration::from_secs(300))
.with_health_check_interval(Duration::from_secs(30));
// Graceful shutdown
let server = TransportServerBuilder::new()
.with_protocol(tcp_config)
.graceful_shutdown_timeout(Duration::from_secs(30))
.build().await?;
// Smooth restart support
server.start_graceful_shutdown().await?;
Check the examples/ directory for more examples:
echo_server.rs - Multi-protocol echo serverecho_client_tcp.rs - TCP client exampleecho_client_websocket.rs - WebSocket client exampleecho_client_quic.rs - QUIC client examplepacket.rs - Packet serialization verification example# Start multi-protocol echo server
cargo run --example echo_server
# Test TCP client
cargo run --example echo_client_tcp
# Test WebSocket client
cargo run --example echo_client_websocket
# Test QUIC client
cargo run --example echo_client_quic
This project is licensed under the Apache License 2.0.
Copyright Š 2024 Jiaqing Zou
Issues and Pull Requests are welcome! Please check the Contributing Guide for detailed information.
đ¯ MsgTrans Mission: Make multi-protocol communication simple, efficient, and reliable, focusing on business logic rather than underlying transport details.