Crates.io | turbomcp-transport |
lib.rs | turbomcp-transport |
version | 1.1.0-exp.3 |
created_at | 2025-08-26 17:17:48.871637+00 |
updated_at | 2025-08-29 19:51:12.189163+00 |
description | Multi-protocol transport layer supporting HTTP, WebSocket, STDIO, and TCP for MCP communications |
homepage | |
repository | https://github.com/Epistates/turbomcp |
max_upload_size | |
id | 1811554 |
size | 604,922 |
Production-ready multi-protocol transport layer with enterprise security, circuit breakers, and comprehensive fault tolerance for MCP communications.
turbomcp-transport
provides a robust, production-ready transport layer supporting multiple protocols with enterprise-grade security features, fault tolerance, and observability. This crate handles all network communication concerns while providing a unified interface for different transport protocols.
rustls
┌─────────────────────────────────────────────┐
│ TurboMCP Transport │
├─────────────────────────────────────────────┤
│ Protocol Implementations │
│ ├── STDIO (process pipes) │
│ ├── HTTP/SSE (web servers) │
│ ├── WebSocket (realtime) │
│ ├── TCP (network sockets) │
│ └── Unix Sockets (IPC) │
├─────────────────────────────────────────────┤
│ Security & Authentication │
│ ├── TLS 1.3 encryption │
│ ├── JWT token validation │
│ ├── CORS and security headers │
│ ├── Rate limiting │
│ └── Certificate management │
├─────────────────────────────────────────────┤
│ Reliability & Fault Tolerance │
│ ├── Circuit breaker pattern │
│ ├── Exponential backoff retry │
│ ├── Connection pooling │
│ ├── Health monitoring │
│ └── Graceful degradation │
├─────────────────────────────────────────────┤
│ Performance & Optimization │
│ ├── Advanced compression │
│ ├── Connection reuse │
│ ├── Message batching │
│ └── Memory-efficient streaming │
└─────────────────────────────────────────────┘
For local process communication:
use turbomcp_transport::stdio::{StdioTransport, ChildProcessConfig};
// Direct process communication
let transport = StdioTransport::new();
// Child process management
let config = ChildProcessConfig::new()
.command("/usr/bin/python3")
.args(["-m", "my_mcp_server"])
.working_directory("/path/to/server")
.environment_vars([("DEBUG", "true")]);
let child_transport = StdioTransport::with_child_process(config).await?;
For web application integration:
use turbomcp_transport::http::{HttpTransport, SseConfig};
// HTTP transport with Server-Sent Events
let config = SseConfig::new()
.endpoint("/api/mcp")
.heartbeat_interval(Duration::from_secs(30))
.max_message_size(1024 * 1024); // 1MB
let transport = HttpTransport::new_sse(config);
For real-time communication:
use turbomcp_transport::websocket::{WebSocketTransport, WsConfig};
let config = WsConfig::new()
.url("wss://api.example.com/mcp")
.ping_interval(Duration::from_secs(30))
.max_frame_size(16 * 1024 * 1024) // 16MB
.compression_enabled(true);
let transport = WebSocketTransport::connect(config).await?;
For network socket communication:
use turbomcp_transport::tcp::{TcpTransport, TcpConfig};
let config = TcpConfig::new()
.bind_address("127.0.0.1:8080")
.nodelay(true)
.keep_alive(Duration::from_secs(60))
.buffer_size(64 * 1024); // 64KB
let transport = TcpTransport::bind(config).await?;
For secure encrypted communication:
use turbomcp_transport::tls::{TlsTransport, TlsConfig, CertValidationConfig};
// Basic TLS server setup
let config = TlsConfig::new("server.crt", "server.key")
.with_min_version(TlsVersion::V1_3);
let server = TlsTransport::new_server("127.0.0.1:8443".parse()?, config).await?;
// TLS client with custom validation
let client_config = TlsConfig::new("client.crt", "client.key")
.with_cert_validation(CertValidationConfig {
verify_hostname: true,
ca_bundle_path: Some("/etc/ssl/certs/ca-bundle.pem".into()),
ocsp_stapling: true,
ct_validation: true,
});
let client = TlsTransport::new_client("api.example.com:8443".parse()?, client_config).await?;
use turbomcp_transport::tls::{
TlsConfig, CertPinningConfig, ClientAuthMode, TlsVersion
};
// Production TLS setup with mutual authentication
let tls_config = TlsConfig::new("server.crt", "server.key")
.with_min_version(TlsVersion::V1_3)
.with_mtls() // Enable mutual TLS
.with_cert_pinning(CertPinningConfig {
allowed_hashes: vec![
"sha256:AAAAAAAAAAAABBBBBBBBBBBBCCCCCCCCCCCCDDDDDDDDDDDD".to_string()
],
enforce: true,
})
.with_dpop_security(); // Enhanced OAuth 2.0 security
let transport = TlsTransport::new_server("0.0.0.0:8443".parse()?, tls_config).await?;
rustls
implementationFor local inter-process communication:
use turbomcp_transport::unix::{UnixTransport, UnixConfig};
let config = UnixConfig::new()
.path("/tmp/mcp.sock")
.permissions(0o660)
.cleanup_on_drop(true);
let transport = UnixTransport::bind(config).await?;
use turbomcp_transport::{SecurityConfig, TlsConfig, AuthConfig};
let security = SecurityConfig::production()
.with_tls(TlsConfig::new()
.cert_path("/etc/ssl/certs/server.pem")
.key_path("/etc/ssl/private/server.key")
.verify_client_certs(true))
.with_cors(CorsConfig::new()
.allowed_origins(["https://app.example.com"])
.allowed_methods(["GET", "POST"])
.max_age(Duration::from_secs(86400)))
.with_auth(AuthConfig::new()
.jwt_secret("your-secret-key")
.jwt_issuer("your-app")
.api_key_header("X-API-Key"))
.with_rate_limiting(RateLimitConfig::new()
.requests_per_minute(120)
.burst_capacity(20));
use turbomcp_transport::security::{SecurityHeaders, ContentSecurityPolicy};
let headers = SecurityHeaders::strict()
.with_csp(ContentSecurityPolicy::new()
.default_src(["'self'"])
.connect_src(["'self'", "wss:"])
.script_src(["'self'", "'unsafe-inline'"])
.style_src(["'self'", "'unsafe-inline'"]))
.with_hsts(Duration::from_secs(31536000)) // 1 year
.with_frame_options(FrameOptions::Deny)
.with_content_type_options(true);
use turbomcp_transport::circuit_breaker::{
CircuitBreakerConfig, FailureThreshold, RecoveryStrategy
};
let config = CircuitBreakerConfig::production()
.failure_threshold(FailureThreshold::Consecutive(5))
.recovery_timeout(Duration::from_secs(60))
.half_open_max_calls(3)
.recovery_strategy(RecoveryStrategy::LinearBackoff {
initial_delay: Duration::from_secs(1),
max_delay: Duration::from_secs(60),
multiplier: 2.0,
});
use turbomcp_transport::retry::{RetryPolicy, RetryConfig, BackoffStrategy};
let retry_policy = RetryPolicy::custom(RetryConfig::new()
.max_attempts(5)
.strategy(BackoffStrategy::ExponentialWithJitter {
base_delay: Duration::from_millis(100),
max_delay: Duration::from_secs(30),
multiplier: 2.0,
jitter_factor: 0.1,
})
.retryable_errors([
ErrorKind::ConnectionTimeout,
ErrorKind::ConnectionReset,
ErrorKind::TemporaryFailure,
]));
use turbomcp_transport::compression::{CompressionConfig, Algorithm};
let compression = CompressionConfig::adaptive()
.algorithms([Algorithm::Brotli, Algorithm::Gzip, Algorithm::Lz4])
.min_size(1024) // Only compress messages > 1KB
.quality_level(6) // Balance between speed and compression ratio
.streaming_threshold(64 * 1024); // Stream messages > 64KB
use turbomcp_transport::metrics::{TransportMetrics, MetricsConfig};
let metrics = TransportMetrics::new(MetricsConfig::new()
.request_duration_buckets([0.001, 0.01, 0.1, 1.0, 10.0])
.connection_pool_size_histogram(true)
.compression_ratio_tracking(true));
// Metrics are automatically collected
let stats = metrics.snapshot();
println!("Average request duration: {:?}", stats.avg_request_duration);
println!("Active connections: {}", stats.active_connections);
use turbomcp_transport::health::{HealthChecker, HealthConfig};
let health = HealthChecker::new(HealthConfig::new()
.check_interval(Duration::from_secs(30))
.connection_timeout(Duration::from_secs(5))
.max_consecutive_failures(3));
let health_status = health.check_transport(&transport).await?;
match health_status {
HealthStatus::Healthy => println!("Transport is healthy"),
HealthStatus::Degraded(issues) => println!("Transport issues: {:?}", issues),
HealthStatus::Unhealthy(error) => println!("Transport failed: {}", error),
}
Transport selection is automatic when using the main framework:
use turbomcp::prelude::*;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let server = MyServer::new();
// Transport selected based on environment/configuration
match std::env::var("TRANSPORT").as_deref() {
Ok("http") => server.run_http("127.0.0.1:8080").await?,
Ok("websocket") => server.run_websocket("127.0.0.1:8080").await?,
Ok("tcp") => server.run_tcp("127.0.0.1:8080").await?,
Ok("tls") => server.run_tls("127.0.0.1:8443", "server.crt", "server.key").await?,
Ok("unix") => server.run_unix("/tmp/mcp.sock").await?,
_ => server.run_stdio().await?, // Default
}
Ok(())
}
use turbomcp_transport::{Transport, TransportMessage, TransportConfig};
use async_trait::async_trait;
struct CustomTransport {
config: TransportConfig,
// ... custom fields
}
#[async_trait]
impl Transport for CustomTransport {
async fn send(&self, message: TransportMessage) -> Result<(), TransportError> {
// Custom send implementation
Ok(())
}
async fn receive(&self) -> Result<TransportMessage, TransportError> {
// Custom receive implementation
todo!()
}
async fn close(&self) -> Result<(), TransportError> {
// Cleanup implementation
Ok(())
}
}
Feature | Description | Default |
---|---|---|
http |
Enable HTTP/SSE transport | ✅ |
websocket |
Enable WebSocket transport | ✅ |
tcp |
Enable TCP transport | ✅ |
unix |
Enable Unix socket transport | ✅ |
tls |
Enable TLS/SSL support | ✅ |
compression |
Enable compression algorithms | ✅ |
metrics |
Enable metrics collection | ✅ |
circuit-breaker |
Enable circuit breaker pattern | ✅ |
Transport | Latency (avg) | Throughput | Memory Usage |
---|---|---|---|
STDIO | 0.1ms | 50k msg/s | 2MB |
Unix Socket | 0.2ms | 45k msg/s | 3MB |
TCP | 0.5ms | 30k msg/s | 5MB |
TLS | 0.8ms | 25k msg/s | 6MB |
WebSocket | 1ms | 25k msg/s | 8MB |
HTTP/SSE | 2ms | 15k msg/s | 10MB |
# Build with all features
cargo build --all-features
# Build specific transport
cargo build --features http,websocket
# Build without TLS (for testing)
cargo build --no-default-features --features stdio,tcp
# Run transport tests
cargo test
# Test with TLS
cargo test --features tls
# Run integration tests
cargo test --test integration
# Test circuit breaker functionality
cargo test circuit_breaker
For comprehensive security information, see:
Licensed under the MIT License.
Part of the TurboMCP high-performance Rust SDK for the Model Context Protocol.