turbomcp-transport

Crates.ioturbomcp-transport
lib.rsturbomcp-transport
version1.1.0-exp.3
created_at2025-08-26 17:17:48.871637+00
updated_at2025-08-29 19:51:12.189163+00
descriptionMulti-protocol transport layer supporting HTTP, WebSocket, STDIO, and TCP for MCP communications
homepage
repositoryhttps://github.com/Epistates/turbomcp
max_upload_size
id1811554
size604,922
Nick Paterno (nicholasjpaterno)

documentation

README

TurboMCP Transport

Crates.io Documentation License: MIT

Production-ready multi-protocol transport layer with enterprise security, circuit breakers, and comprehensive fault tolerance for MCP communications.

Overview

turbomcp-transport provides a robust, production-ready transport layer supporting multiple protocols with enterprise-grade security features, fault tolerance, and observability. This crate handles all network communication concerns while providing a unified interface for different transport protocols.

Key Features

🌐 Multi-Protocol Transport Support

  • STDIO - Standard input/output for local process communication
  • HTTP/SSE - Server-Sent Events for web applications
  • WebSocket - Real-time bidirectional communication
  • TCP - Network socket communication
  • TLS - Encrypted network communication with TLS 1.3
  • Unix Sockets - Local inter-process communication

🛡️ Enterprise Security Features

  • TLS 1.3 Support - Modern encryption with rustls
  • CORS Protection - Comprehensive cross-origin resource sharing
  • Security Headers - CSP, HSTS, X-Frame-Options, and more
  • Rate Limiting - Token bucket algorithm with flexible strategies
  • Authentication - JWT validation and API key support

Circuit Breaker & Reliability

  • Circuit Breaker Pattern - Prevents cascade failures with automatic recovery
  • Exponential Backoff - Intelligent retry with jitter for optimal throughput
  • Connection Health Monitoring - Automatic detection of stale connections
  • Graceful Degradation - Fallback mechanisms and error recovery
  • Resource Management - Bounded memory usage with cleanup tasks

🗜️ Advanced Compression

  • Multiple Algorithms - gzip, brotli, lz4 with intelligent selection
  • Adaptive Compression - Automatic algorithm selection based on content
  • Streaming Support - Low-memory compression for large messages
  • Compression Metrics - Performance monitoring and optimization

Architecture

┌─────────────────────────────────────────────┐
│            TurboMCP Transport               │
├─────────────────────────────────────────────┤
│ Protocol Implementations                   │
│ ├── STDIO (process pipes)                  │
│ ├── HTTP/SSE (web servers)                 │
│ ├── WebSocket (realtime)                   │
│ ├── TCP (network sockets)                  │
│ └── Unix Sockets (IPC)                     │
├─────────────────────────────────────────────┤
│ Security & Authentication                  │
│ ├── TLS 1.3 encryption                    │
│ ├── JWT token validation                   │
│ ├── CORS and security headers             │
│ ├── Rate limiting                          │
│ └── Certificate management                 │
├─────────────────────────────────────────────┤
│ Reliability & Fault Tolerance             │
│ ├── Circuit breaker pattern               │
│ ├── Exponential backoff retry             │
│ ├── Connection pooling                     │
│ ├── Health monitoring                      │
│ └── Graceful degradation                   │
├─────────────────────────────────────────────┤
│ Performance & Optimization                 │
│ ├── Advanced compression                   │
│ ├── Connection reuse                       │
│ ├── Message batching                       │
│ └── Memory-efficient streaming             │
└─────────────────────────────────────────────┘

Transport Protocols

STDIO Transport

For local process communication:

use turbomcp_transport::stdio::{StdioTransport, ChildProcessConfig};

// Direct process communication
let transport = StdioTransport::new();

// Child process management
let config = ChildProcessConfig::new()
    .command("/usr/bin/python3")
    .args(["-m", "my_mcp_server"])
    .working_directory("/path/to/server")
    .environment_vars([("DEBUG", "true")]);

let child_transport = StdioTransport::with_child_process(config).await?;

HTTP/SSE Transport

For web application integration:

use turbomcp_transport::http::{HttpTransport, SseConfig};

// HTTP transport with Server-Sent Events
let config = SseConfig::new()
    .endpoint("/api/mcp")
    .heartbeat_interval(Duration::from_secs(30))
    .max_message_size(1024 * 1024); // 1MB

let transport = HttpTransport::new_sse(config);

WebSocket Transport

For real-time communication:

use turbomcp_transport::websocket::{WebSocketTransport, WsConfig};

let config = WsConfig::new()
    .url("wss://api.example.com/mcp")
    .ping_interval(Duration::from_secs(30))
    .max_frame_size(16 * 1024 * 1024) // 16MB
    .compression_enabled(true);

let transport = WebSocketTransport::connect(config).await?;

TCP Transport

For network socket communication:

use turbomcp_transport::tcp::{TcpTransport, TcpConfig};

let config = TcpConfig::new()
    .bind_address("127.0.0.1:8080")
    .nodelay(true)
    .keep_alive(Duration::from_secs(60))
    .buffer_size(64 * 1024); // 64KB

let transport = TcpTransport::bind(config).await?;

TLS Transport

For secure encrypted communication:

use turbomcp_transport::tls::{TlsTransport, TlsConfig, CertValidationConfig};

// Basic TLS server setup
let config = TlsConfig::new("server.crt", "server.key")
    .with_min_version(TlsVersion::V1_3);

let server = TlsTransport::new_server("127.0.0.1:8443".parse()?, config).await?;

// TLS client with custom validation
let client_config = TlsConfig::new("client.crt", "client.key")
    .with_cert_validation(CertValidationConfig {
        verify_hostname: true,
        ca_bundle_path: Some("/etc/ssl/certs/ca-bundle.pem".into()),
        ocsp_stapling: true,
        ct_validation: true,
    });

let client = TlsTransport::new_client("api.example.com:8443".parse()?, client_config).await?;

Advanced TLS Configuration

use turbomcp_transport::tls::{
    TlsConfig, CertPinningConfig, ClientAuthMode, TlsVersion
};

// Production TLS setup with mutual authentication
let tls_config = TlsConfig::new("server.crt", "server.key")
    .with_min_version(TlsVersion::V1_3)
    .with_mtls() // Enable mutual TLS
    .with_cert_pinning(CertPinningConfig {
        allowed_hashes: vec![
            "sha256:AAAAAAAAAAAABBBBBBBBBBBBCCCCCCCCCCCCDDDDDDDDDDDD".to_string()
        ],
        enforce: true,
    })
    .with_dpop_security(); // Enhanced OAuth 2.0 security

let transport = TlsTransport::new_server("0.0.0.0:8443".parse()?, tls_config).await?;

TLS Security Features

  • 🔐 TLS 1.3 by Default - Latest TLS version with forward secrecy
  • 🔑 Certificate Pinning - Pin specific certificates or public keys
  • 🤝 Mutual TLS (mTLS) - Client certificate authentication
  • 📋 OCSP Stapling - Real-time certificate revocation checking
  • 🛡️ DPoP Integration - Demonstration of Proof-of-Possession for OAuth 2.0
  • ⚡ Memory Safety - Rust's memory safety with rustls implementation

Unix Socket Transport

For local inter-process communication:

use turbomcp_transport::unix::{UnixTransport, UnixConfig};

let config = UnixConfig::new()
    .path("/tmp/mcp.sock")
    .permissions(0o660)
    .cleanup_on_drop(true);

let transport = UnixTransport::bind(config).await?;

Security Configuration

Production Security Setup

use turbomcp_transport::{SecurityConfig, TlsConfig, AuthConfig};

let security = SecurityConfig::production()
    .with_tls(TlsConfig::new()
        .cert_path("/etc/ssl/certs/server.pem")
        .key_path("/etc/ssl/private/server.key")
        .verify_client_certs(true))
    .with_cors(CorsConfig::new()
        .allowed_origins(["https://app.example.com"])
        .allowed_methods(["GET", "POST"])
        .max_age(Duration::from_secs(86400)))
    .with_auth(AuthConfig::new()
        .jwt_secret("your-secret-key")
        .jwt_issuer("your-app")
        .api_key_header("X-API-Key"))
    .with_rate_limiting(RateLimitConfig::new()
        .requests_per_minute(120)
        .burst_capacity(20));

Security Headers

use turbomcp_transport::security::{SecurityHeaders, ContentSecurityPolicy};

let headers = SecurityHeaders::strict()
    .with_csp(ContentSecurityPolicy::new()
        .default_src(["'self'"])
        .connect_src(["'self'", "wss:"])
        .script_src(["'self'", "'unsafe-inline'"])
        .style_src(["'self'", "'unsafe-inline'"]))
    .with_hsts(Duration::from_secs(31536000)) // 1 year
    .with_frame_options(FrameOptions::Deny)
    .with_content_type_options(true);

Circuit Breaker Configuration

Production Circuit Breaker

use turbomcp_transport::circuit_breaker::{
    CircuitBreakerConfig, FailureThreshold, RecoveryStrategy
};

let config = CircuitBreakerConfig::production()
    .failure_threshold(FailureThreshold::Consecutive(5))
    .recovery_timeout(Duration::from_secs(60))
    .half_open_max_calls(3)
    .recovery_strategy(RecoveryStrategy::LinearBackoff {
        initial_delay: Duration::from_secs(1),
        max_delay: Duration::from_secs(60),
        multiplier: 2.0,
    });

Custom Retry Policies

use turbomcp_transport::retry::{RetryPolicy, RetryConfig, BackoffStrategy};

let retry_policy = RetryPolicy::custom(RetryConfig::new()
    .max_attempts(5)
    .strategy(BackoffStrategy::ExponentialWithJitter {
        base_delay: Duration::from_millis(100),
        max_delay: Duration::from_secs(30),
        multiplier: 2.0,
        jitter_factor: 0.1,
    })
    .retryable_errors([
        ErrorKind::ConnectionTimeout,
        ErrorKind::ConnectionReset,
        ErrorKind::TemporaryFailure,
    ]));

Compression Configuration

Adaptive Compression

use turbomcp_transport::compression::{CompressionConfig, Algorithm};

let compression = CompressionConfig::adaptive()
    .algorithms([Algorithm::Brotli, Algorithm::Gzip, Algorithm::Lz4])
    .min_size(1024) // Only compress messages > 1KB
    .quality_level(6) // Balance between speed and compression ratio
    .streaming_threshold(64 * 1024); // Stream messages > 64KB

Observability & Monitoring

Metrics Collection

use turbomcp_transport::metrics::{TransportMetrics, MetricsConfig};

let metrics = TransportMetrics::new(MetricsConfig::new()
    .request_duration_buckets([0.001, 0.01, 0.1, 1.0, 10.0])
    .connection_pool_size_histogram(true)
    .compression_ratio_tracking(true));

// Metrics are automatically collected
let stats = metrics.snapshot();
println!("Average request duration: {:?}", stats.avg_request_duration);
println!("Active connections: {}", stats.active_connections);

Health Monitoring

use turbomcp_transport::health::{HealthChecker, HealthConfig};

let health = HealthChecker::new(HealthConfig::new()
    .check_interval(Duration::from_secs(30))
    .connection_timeout(Duration::from_secs(5))
    .max_consecutive_failures(3));

let health_status = health.check_transport(&transport).await?;
match health_status {
    HealthStatus::Healthy => println!("Transport is healthy"),
    HealthStatus::Degraded(issues) => println!("Transport issues: {:?}", issues),
    HealthStatus::Unhealthy(error) => println!("Transport failed: {}", error),
}

Integration Examples

With TurboMCP Framework

Transport selection is automatic when using the main framework:

use turbomcp::prelude::*;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let server = MyServer::new();
    
    // Transport selected based on environment/configuration
    match std::env::var("TRANSPORT").as_deref() {
        Ok("http") => server.run_http("127.0.0.1:8080").await?,
        Ok("websocket") => server.run_websocket("127.0.0.1:8080").await?,
        Ok("tcp") => server.run_tcp("127.0.0.1:8080").await?,
        Ok("tls") => server.run_tls("127.0.0.1:8443", "server.crt", "server.key").await?,
        Ok("unix") => server.run_unix("/tmp/mcp.sock").await?,
        _ => server.run_stdio().await?, // Default
    }
    
    Ok(())
}

Custom Transport Implementation

use turbomcp_transport::{Transport, TransportMessage, TransportConfig};
use async_trait::async_trait;

struct CustomTransport {
    config: TransportConfig,
    // ... custom fields
}

#[async_trait]
impl Transport for CustomTransport {
    async fn send(&self, message: TransportMessage) -> Result<(), TransportError> {
        // Custom send implementation
        Ok(())
    }
    
    async fn receive(&self) -> Result<TransportMessage, TransportError> {
        // Custom receive implementation
        todo!()
    }
    
    async fn close(&self) -> Result<(), TransportError> {
        // Cleanup implementation
        Ok(())
    }
}

Feature Flags

Feature Description Default
http Enable HTTP/SSE transport
websocket Enable WebSocket transport
tcp Enable TCP transport
unix Enable Unix socket transport
tls Enable TLS/SSL support
compression Enable compression algorithms
metrics Enable metrics collection
circuit-breaker Enable circuit breaker pattern

Performance Characteristics

Benchmarks

Transport Latency (avg) Throughput Memory Usage
STDIO 0.1ms 50k msg/s 2MB
Unix Socket 0.2ms 45k msg/s 3MB
TCP 0.5ms 30k msg/s 5MB
TLS 0.8ms 25k msg/s 6MB
WebSocket 1ms 25k msg/s 8MB
HTTP/SSE 2ms 15k msg/s 10MB

Optimization Features

  • 🚀 Connection Pooling - Reuse connections for better performance
  • 📦 Message Batching - Combine small messages for efficiency
  • 🗜️ Smart Compression - Adaptive compression based on content
  • Zero-Copy - Minimize memory allocations where possible

Development

Building

# Build with all features
cargo build --all-features

# Build specific transport
cargo build --features http,websocket

# Build without TLS (for testing)
cargo build --no-default-features --features stdio,tcp

Testing

# Run transport tests
cargo test

# Test with TLS
cargo test --features tls

# Run integration tests
cargo test --test integration

# Test circuit breaker functionality
cargo test circuit_breaker

Security Documentation

For comprehensive security information, see:

Related Crates

External Resources

License

Licensed under the MIT License.


Part of the TurboMCP high-performance Rust SDK for the Model Context Protocol.

Commit count: 8

cargo fmt