| Crates.io | turbomcp-transport |
| lib.rs | turbomcp-transport |
| version | 3.0.0-beta.3 |
| created_at | 2025-08-26 17:17:48.871637+00 |
| updated_at | 2026-01-22 16:44:58.552604+00 |
| description | Multi-protocol transport layer supporting HTTP, WebSocket, STDIO, and TCP for MCP communications |
| homepage | https://turbomcp.org |
| repository | https://github.com/Epistates/turbomcp |
| max_upload_size | |
| id | 1811554 |
| size | 883,276 |
Transport layer implementation for the Model Context Protocol (MCP) with support for multiple transport protocols and connection patterns.
turbomcp-transport provides transport layer implementations for the Model Context Protocol. It supports multiple transport protocols including STDIO, HTTP/SSE, WebSocket, TCP, and Unix sockets with features for security, reliability, and concurrent usage.
MCP Standard Transports (MCP 2025-06-18 Specification):
MCP-Compliant Custom Extensions:
All transports preserve JSON-RPC message format and MCP lifecycle requirements.
rustls.clone() operations┌─────────────────────────────────────────────┐
│ TurboMCP Transport │
├─────────────────────────────────────────────┤
│ MCP Standard Transports (2025-06-18) │
│ ├── STDIO (subprocess communication) │
│ └── HTTP/SSE (streamable HTTP) │
├─────────────────────────────────────────────┤
│ MCP-Compliant Custom Extensions │
│ ├── WebSocket (bidirectional streaming) │
│ ├── TCP (network services) │
│ └── Unix Sockets (local IPC) │
├─────────────────────────────────────────────┤
│ Security & Authentication │
│ ├── TLS 1.3 encryption │
│ ├── JWT token validation │
│ ├── CORS and security headers │
│ ├── Rate limiting │
│ └── Certificate management │
├─────────────────────────────────────────────┤
│ Reliability & Fault Tolerance │
│ ├── Circuit breaker pattern │
│ ├── Exponential backoff retry │
│ ├── Connection pooling │
│ ├── Health monitoring │
│ └── Graceful degradation │
├─────────────────────────────────────────────┤
│ Performance & Optimization │
│ ├── Multi-algorithm compression │
│ ├── Connection reuse │
│ ├── Message batching │
│ └── Memory-efficient streaming │
└─────────────────────────────────────────────┘
For local process communication:
use turbomcp_transport::stdio::{StdioTransport, ChildProcessConfig};
// Direct process communication
let transport = StdioTransport::new();
// Child process management
let config = ChildProcessConfig::new()
.command("/usr/bin/python3")
.args(["-m", "my_mcp_server"])
.working_directory("/path/to/server")
.environment_vars([("DEBUG", "true")]);
let child_transport = StdioTransport::with_child_process(config).await?;
For connecting to HTTP-based MCP servers with full SSE support:
use turbomcp_transport::streamable_http_client::{
StreamableHttpClientConfig, StreamableHttpClientTransport
};
use std::time::Duration;
// MCP 2025-06-18 compliant HTTP client with SSE
let config = StreamableHttpClientConfig {
base_url: "http://localhost:8080".to_string(),
endpoint_path: "/mcp".to_string(),
timeout: Duration::from_secs(30),
..Default::default()
};
let mut transport = StreamableHttpClientTransport::new(config);
For real-time communication:
use turbomcp_transport::websocket_bidirectional::{
WebSocketBidirectionalTransport, WebSocketBidirectionalConfig
};
// Connect to WebSocket server with full MCP 2025-06-18 support
let config = WebSocketBidirectionalConfig {
url: Some("wss://api.example.com/mcp".to_string()),
..Default::default()
};
let transport = WebSocketBidirectionalTransport::new(config).await?;
// Full MCP 2025-06-18 features:
// - Bidirectional request-response correlation
// - Server-initiated elicitation support
// - Automatic reconnection with exponential backoff
// - Keep-alive ping/pong
// - Optional TLS and compression
For network socket communication:
use turbomcp_transport::tcp::TcpTransportBuilder;
use std::net::SocketAddr;
let bind_addr: SocketAddr = "127.0.0.1:8080".parse()?;
let transport = TcpTransportBuilder::new()
.bind_addr(bind_addr)
.keep_alive(true)
.buffer_size(64 * 1024) // 64KB
.build();
For local inter-process communication:
use turbomcp_transport::unix::{UnixTransport, UnixConfig};
use std::path::PathBuf;
let config = UnixConfig {
socket_path: PathBuf::from("/tmp/mcp.sock"),
permissions: Some(0o660),
buffer_size: 8192,
cleanup_on_disconnect: true,
};
let transport = UnixTransport::new(config);
use turbomcp_transport::{SecurityConfigBuilder, AuthMethod, RateLimitConfig};
use std::time::Duration;
// Build a security validator with authentication and rate limiting
let validator = SecurityConfigBuilder::new()
.add_allowed_origin("https://app.example.com".to_string())
.allow_localhost(true)
.require_authentication(true)
.with_auth_method(AuthMethod::ApiKey)
.add_api_key("your-api-key-here".to_string())
.with_rate_limit(100, Duration::from_secs(60)) // 100 requests per minute
.build();
use turbomcp_transport::EnhancedSecurityConfigBuilder;
use std::time::Duration;
// Build enhanced security with session tracking
let enhanced_security = EnhancedSecurityConfigBuilder::new()
.add_allowed_origin("https://app.example.com".to_string())
.require_authentication(true)
.add_api_key("your-api-key".to_string())
.with_rate_limit(120, Duration::from_secs(60))
.with_session_max_lifetime(Duration::from_secs(3600)) // 1 hour max session lifetime
.with_session_idle_timeout(Duration::from_secs(1800)) // 30 min inactivity timeout
.with_max_sessions_per_ip(5) // Max 5 sessions per IP address
.build();
use turbomcp_transport::resilience::circuit_breaker::CircuitBreakerConfig;
use std::time::Duration;
// Configure circuit breaker for fault tolerance
let circuit_config = CircuitBreakerConfig {
failure_threshold: 5, // Open circuit after 5 failures
success_threshold: 3, // Close circuit after 3 successes
timeout: Duration::from_secs(60), // Wait 60s before trying again
rolling_window_size: 100, // Track last 100 operations
minimum_requests: 10, // Need 10 requests before opening
};
use turbomcp_transport::resilience::retry::RetryConfig;
use std::time::Duration;
// Configure retry behavior with exponential backoff
let retry_config = RetryConfig {
max_attempts: 3,
base_delay: Duration::from_millis(100),
max_delay: Duration::from_secs(10),
backoff_multiplier: 2.0,
jitter_factor: 0.1,
retry_on_connection_error: true,
retry_on_timeout: true,
custom_retry_condition: None,
};
use turbomcp_transport::compression::{MessageCompressor, CompressionType};
use serde_json::json;
// Create a compressor with LZ4 compression
let compressor = MessageCompressor::new(CompressionType::Lz4);
// Compress a message
let message = json!({"tool": "add", "args": {"a": 5, "b": 3}});
let compressed = compressor.compress(&message)?;
// Decompress back to JSON
let decompressed = compressor.decompress(&compressed)?;
assert_eq!(message, decompressed);
// Available compression types:
// - CompressionType::None
// - CompressionType::Gzip (with "flate2" feature)
// - CompressionType::Brotli (with "brotli" feature)
// - CompressionType::Lz4 (with "lz4_flex" feature)
use turbomcp_transport::resilience::health::{HealthCheckConfig, HealthStatus};
use std::time::Duration;
// Configure health checking
let health_config = HealthCheckConfig {
interval: Duration::from_secs(30),
timeout: Duration::from_secs(5),
failure_threshold: 3, // Unhealthy after 3 failures
success_threshold: 2, // Healthy after 2 successes
custom_check: None,
};
// Health status can be:
// - HealthStatus::Healthy
// - HealthStatus::Unhealthy
// - HealthStatus::Unknown
// - HealthStatus::Checking
Transport selection is automatic when using the main framework:
use turbomcp::prelude::*;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let server = MyServer::new();
// Transport selected based on environment/configuration
match std::env::var("TRANSPORT").as_deref() {
Ok("http") => server.run_http("127.0.0.1:8080").await?,
Ok("websocket") => server.run_websocket("127.0.0.1:8080").await?,
Ok("tcp") => server.run_tcp("127.0.0.1:8080").await?,
Ok("unix") => server.run_unix("/tmp/mcp.sock").await?,
_ => server.run_stdio().await?, // Default
}
Ok(())
}
use turbomcp_transport::{Transport, TransportMessage, TransportConfig};
use async_trait::async_trait;
struct CustomTransport {
config: TransportConfig,
// ... custom fields
}
#[async_trait]
impl Transport for CustomTransport {
async fn send(&self, message: TransportMessage) -> Result<(), TransportError> {
// Custom send implementation
Ok(())
}
async fn receive(&self) -> Result<TransportMessage, TransportError> {
// Custom receive implementation
todo!()
}
async fn close(&self) -> Result<(), TransportError> {
// Cleanup implementation
Ok(())
}
}
| Feature | Description | Default |
|---|---|---|
stdio |
Enable STDIO transport | ✅ |
http |
Enable HTTP/SSE transport | ❌ |
websocket |
Enable WebSocket transport | ❌ |
tcp |
Enable TCP transport | ❌ |
unix |
Enable Unix socket transport | ❌ |
tls |
Enable TLS/SSL support | ❌ |
compression |
Enable compression algorithms | ❌ |
metrics |
Enable metrics collection | ❌ |
TurboMCP introduces SharedTransport - a thread-safe wrapper that eliminates Arc/Mutex complexity while preserving full transport functionality:
use turbomcp_transport::{StdioTransport, SharedTransport};
// Create and wrap any transport for sharing
let transport = StdioTransport::new();
let shared = SharedTransport::new(transport);
// Connect once
shared.connect().await?;
// Clone for concurrent usage across tasks
let shared1 = shared.clone();
let shared2 = shared.clone();
// Both tasks can use the transport concurrently
let handle1 = tokio::spawn(async move {
shared1.send(message1).await
});
let handle2 = tokio::spawn(async move {
shared2.receive().await
});
let (send_result, message) = tokio::join!(handle1, handle2);
use turbomcp_transport::SharedTransport;
use std::sync::Arc;
use tokio::sync::Semaphore;
// Rate-limited concurrent transport operations
let shared_transport = SharedTransport::new(transport);
let semaphore = Arc::new(Semaphore::new(10)); // Max 10 concurrent operations
let tasks = (0..50).map(|i| {
let transport = shared_transport.clone();
let semaphore = semaphore.clone();
tokio::spawn(async move {
let _permit = semaphore.acquire().await.unwrap();
let message = create_message(i);
transport.send(message).await?;
transport.receive().await
})
}).collect::<Vec<_>>();
// Wait for all operations to complete
let results = futures::future::join_all(tasks).await;
use turbomcp_transport::SharedTransport;
use turbomcp_client::Client;
// Share a single transport across multiple clients
let transport = TcpTransport::connect("127.0.0.1:8080").await?;
let shared_transport = SharedTransport::new(transport);
// Create multiple clients sharing the same transport
let client1 = Client::new(shared_transport.clone());
let client2 = Client::new(shared_transport.clone());
let client3 = Client::new(shared_transport.clone());
// Initialize all clients concurrently
let (result1, result2, result3) = tokio::try_join!(
client1.initialize(),
client2.initialize(),
client3.initialize()
)?;
// All clients can now operate independently
tokio::spawn(async move {
loop {
let tools = client1.list_tools().await?;
// Process tools...
tokio::time::sleep(Duration::from_secs(30)).await;
}
});
tokio::spawn(async move {
loop {
let resources = client2.list_resources().await?;
// Process resources...
tokio::time::sleep(Duration::from_secs(45)).await;
}
});
.clone() for concurrent accessThese metrics are indicative and will vary based on hardware, message size, and system load.
| Transport | Latency (avg) | Throughput | Memory Usage |
|---|---|---|---|
| STDIO | 0.1ms | 50k msg/s | 2MB |
| Unix Socket | 0.2ms | 45k msg/s | 3MB |
| TCP | 0.5ms | 30k msg/s | 5MB |
| WebSocket | 1ms | 25k msg/s | 8MB |
| HTTP/SSE | 2ms | 15k msg/s | 10MB |
# Build with all features
cargo build --all-features
# Build specific transport
cargo build --features http,websocket
# Build without TLS (for testing)
cargo build --no-default-features --features stdio,tcp
# Run transport tests
cargo test
# Test with TLS
cargo test --features tls
# Run integration tests
cargo test --test integration
# Test circuit breaker functionality
cargo test circuit_breaker
For comprehensive security information, see:
Licensed under the MIT License.
Part of the TurboMCP Rust SDK for the Model Context Protocol.