stream-tungstenite

Crates.iostream-tungstenite
lib.rsstream-tungstenite
version0.6.1
created_at2024-10-23 08:46:57.113438+00
updated_at2026-01-12 10:36:49.5805+00
descriptionA streaming implementation of the Tungstenite WebSocket protocol
homepage
repository
max_upload_size
id1419843
size247,903
(roson9527)

documentation

README

stream-tungstenite

A robust Rust WebSocket client library with automatic reconnection, customizable retry strategies, and an extensible architecture.

Features

  • Automatic Reconnection: Multiple retry strategies including exponential backoff
  • Connection State Management: Real-time tracking of connection status and health
  • Extension System: Hook into lifecycle events and message processing
  • Application-Level Handshakes: Support for authentication and subscriptions
  • Builder Pattern API: Fluent, type-safe configuration
  • Backpressure-Aware Sending: Bounded send queue with non-blocking, blocking, and timeout APIs

Architecture

┌─────────────────────────────────────────────────────────────────┐
│                      WebSocketClient                            │
│  - High-level API for WebSocket connections                     │
│  - Automatic reconnection with retry strategies                 │
│  - Message broadcasting via channels                            │
└─────────────────────────────────────────────────────────────────┘
                             │
         ┌───────────────────┼───────────────────┐
         ▼                   ▼                   ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│   Handshake     │ │   Extension     │ │   Connection    │
│   - Auth        │ │   - Lifecycle   │ │   - State       │
│   - Subscribe   │ │   - Messages    │ │   - Retry       │
│   - Chained     │ │   - Logging     │ │   - Supervisor  │
└─────────────────┘ └─────────────────┘ └─────────────────┘

Quick Start

Basic Usage

use stream_tungstenite::prelude::*;
use std::sync::Arc;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create client with builder pattern
    let client = WebSocketClient::builder("wss://echo.websocket.org")
        .receive_timeout(std::time::Duration::from_secs(30))
        .build();

    // Subscribe to messages before running
    let mut messages = client.subscribe();

    // Run client in background
    let client = Arc::new(client);
    tokio::spawn({
        let client = client.clone();
        async move { client.run().await }
    });

    // Receive messages (Arc<Message> for zero-copy)
    while let Ok(msg) = messages.recv().await {
        println!("Received: {:?}", msg);
        // Access message: msg.as_ref() or &*msg
        // Clone if needed: (*msg).clone()
    }

    Ok(())
}

Sending Messages

use stream_tungstenite::prelude::*;
use std::sync::Arc;
use tokio_tungstenite::tungstenite::Message;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = Arc::new(WebSocketClient::new("wss://echo.websocket.org"));

    // Get a sender handle
    if let Some(sender) = client.sender().await {
        // Send text message
        sender.send_text("Hello, WebSocket!")?;

        // Send binary message
        sender.send_binary(vec![1, 2, 3])?;

        // Send ping
        sender.ping(vec![])?;
    }

    Ok(())
}

Backpressure and Timeout

  • Non-blocking (default): client.send(msg).await
    Returns immediately; if the queue is full, returns SendError::ChannelFull.
  • Blocking: client.send_async(msg).await
    Awaits capacity; returns SendError::ChannelClosed if not connected.
  • Timeout: client.send_timeout(msg, Duration::from_secs(1)).await
    Returns SendError::Timeout(_) if it expires.

Configure queue capacity:

let cfg = ClientConfig::new().with_send_queue_capacity(512);

Configure handshake retry delay:

let cfg = ClientConfig::new().with_handshake_retry_delay(std::time::Duration::from_millis(500));

Configuration

Using ClientConfig

use stream_tungstenite::prelude::*;
use std::time::Duration;

// Create custom configuration
let config = ClientConfig::new()
    .with_receive_timeout(Duration::from_secs(30))
    .with_exit_on_first_failure(false)
    .with_nodelay(true)
    .with_channel_buffer(512);

let client = WebSocketClient::builder("wss://example.com/ws")
    .config(config)
    .build();

Configuration Presets

The library provides preset configurations for common scenarios:

Fast Reconnect

For low-latency scenarios requiring quick recovery:

let config = ClientConfig::fast_reconnect();
// - Receive timeout: 10 seconds
// - Connect timeout: 10 seconds
// - Nagle disabled: true
// - Channel buffer: 512

Stable Connection

For long-running, stable connections:

let config = ClientConfig::stable_connection();
// - Receive timeout: 60 seconds
// - Connect timeout: 60 seconds
// - Nagle disabled: false
// - Channel buffer: 128

Retry Strategies

Exponential Backoff (Default)

use stream_tungstenite::prelude::*;
use std::time::Duration;

let client = WebSocketClient::builder("wss://example.com/ws")
    .exponential_backoff(
        Duration::from_millis(100),  // Initial delay
        Duration::from_secs(30),     // Maximum delay
        2.0,                         // Multiplier
    )
    .build();

Preset Strategies

// Fast reconnection - quick recovery
let strategy = ExponentialBackoff::fast();
// - 100ms initial, 5s max, 1.5x factor

// Standard - balanced approach
let strategy = ExponentialBackoff::standard();
// - 1s initial, 60s max, 2x factor

// Conservative - longer waits
let strategy = ExponentialBackoff::conservative();
// - 2s initial, 120s max, 2x factor

Fixed Delay

use stream_tungstenite::prelude::*;
use std::time::Duration;

let strategy = FixedDelay::new(Duration::from_secs(5))
    .with_max_attempts(10);

let client = WebSocketClient::builder("wss://example.com/ws")
    .retry_strategy(strategy)
    .build();

No Retry

let client = WebSocketClient::builder("wss://example.com/ws")
    .no_retry()
    .build();

Handshakes

Perform application-level handshakes after WebSocket connection is established.

Authentication

use stream_tungstenite::prelude::*;

// Plain text token
let auth = AuthHandshaker::new("my-api-key");

// JSON format: {"type":"auth","token":"..."}
let auth = AuthHandshaker::new("my-api-key").json();

// Custom format
let auth = AuthHandshaker::new("my-api-key")
    .custom_format("AUTH {}");

let client = WebSocketClient::builder("wss://example.com/ws")
    .handshaker(auth)
    .build();

Connection Timeout and Handshake Retry

use stream_tungstenite::prelude::*;
use std::time::Duration;

let cfg = ClientConfig::new()
    .with_connect_timeout(Duration::from_secs(15))         // TCP/WebSocket connect timeout
    .with_handshake_retry_delay(Duration::from_millis(500)); // wait before retrying failed handshakes

Extension Message Processing (Processor)

Extensions can transform or filter messages; the dispatcher applies them before broadcasting:

use async_trait::async_trait;
use stream_tungstenite::extension::Extension;
use stream_tungstenite::context::ConnectionContext;
use tungstenite::Message;

struct UppercaseExt;
#[async_trait]
impl Extension for UppercaseExt {
    fn name(&self) -> &'static str { "upper" }
    fn handles_messages(&self) -> bool { true }

    // Receives &Message for zero-copy efficiency
    async fn on_message(
        &self,
        msg: &Message,
        _ctx: &ConnectionContext,
    ) -> Result<Option<Message>, stream_tungstenite::error::ExtensionError> {
        if let Message::Text(t) = msg {
            Ok(Some(Message::Text(t.to_uppercase().into())))
        } else {
            Ok(Some(msg.clone()))
        }
    }
}

Channel Subscription

use stream_tungstenite::prelude::*;

let subscribe = SubscribeHandshaker::new(vec!["trades", "orderbook"])
    .wait_confirmation();

let client = WebSocketClient::builder("wss://example.com/ws")
    .handshaker(subscribe)
    .build();

Chained Handshakes

use stream_tungstenite::prelude::*;

// Chain multiple handshakers
let handshaker = ChainedHandshaker::new()
    .then(AuthHandshaker::new("my-api-key").json())
    .then(SubscribeHandshaker::new(vec!["trades", "orderbook"]));

let client = WebSocketClient::builder("wss://example.com/ws")
    .handshaker(handshaker)
    .build();

// Or use the macro
let handshaker = chain_handshakers!(
    AuthHandshaker::new("my-api-key").json(),
    SubscribeHandshaker::new(vec!["trades"]),
);

Extension System

Add custom functionality through the extension system.

Built-in Extensions

Logging Extension

use stream_tungstenite::prelude::*;

// Basic logging
let logging = LoggingExtension::new();

// Verbose with message logging
let logging = LoggingExtension::verbose();

// Custom configuration
let config = LoggingConfig::new()
    .with_level(LogLevel::Debug)
    .with_messages()
    .with_prefix("ws-client");
let logging = LoggingExtension::with_config(config);

// Register with client
client.register_extension(logging).await?;

Status Viewer

use stream_tungstenite::prelude::*;

let status = StatusViewer::new();

// Check connection status
if status.is_connected().await {
    println!("Connected!");
}

// Advanced status with history
let advanced = AdvancedStatusViewer::new();
let uptime = advanced.get_uptime().await;
let connection_count = advanced.get_connection_count().await;

Custom Extensions

use stream_tungstenite::extension::Extension;
use stream_tungstenite::context::ConnectionContext;
use stream_tungstenite::error::ExtensionError;
use async_trait::async_trait;
use tokio_tungstenite::tungstenite::Message;

struct MyExtension;

#[async_trait]
impl Extension for MyExtension {
    fn name(&self) -> &'static str {
        "my_extension"
    }

    fn handles_lifecycle(&self) -> bool {
        true
    }

    fn handles_messages(&self) -> bool {
        true
    }

    async fn on_connect(&self, ctx: &ConnectionContext) -> Result<(), ExtensionError> {
        println!("Connected! ID: {}", ctx.connection_id);
        Ok(())
    }

    async fn on_disconnect(&self, ctx: &ConnectionContext) -> Result<(), ExtensionError> {
        println!("Disconnected!");
        Ok(())
    }

    async fn on_message(
        &self,
        message: &Message,
        _ctx: &ConnectionContext,
    ) -> Result<Option<Message>, ExtensionError> {
        // Return Some(message) to pass through
        // Return None to filter out
        // Receives &Message for zero-copy efficiency
        Ok(Some(message.clone()))
    }
}

Connection Events

Monitor connection state changes:

use stream_tungstenite::prelude::*;
use std::sync::Arc;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = Arc::new(WebSocketClient::new("wss://example.com/ws"));

    // Subscribe to connection events
    let mut events = client.subscribe_events();

    tokio::spawn({
        let client = client.clone();
        async move { client.run().await }
    });

    while let Ok(event) = events.recv().await {
        match event {
            ConnectionEvent::Connected { id } => {
                println!("Connected: {}", id);
            }
            ConnectionEvent::Disconnected { reason } => {
                println!("Disconnected: {:?}", reason);
            }
            ConnectionEvent::ReconnectScheduled { delay, attempt } => {
                println!("Reconnecting in {:?} (attempt {})", delay, attempt);
            }
            ConnectionEvent::Error { error, attempt } => {
                println!("Error on attempt {}: {}", attempt, error);
            }
            _ => {}
        }
    }

    Ok(())
}

Connection State

Access detailed connection state:

// Get current state snapshot
let state = client.state().await;
println!("Connection ID: {}", state.id);
println!("Status: {:?}", state.status);
println!("Reconnect count: {}", state.reconnect_count);
println!("Error count: {}", state.error_count);

if let Some(duration) = state.connection_duration {
    println!("Connected for: {:?}", duration);
}

// Quick connection check
if client.is_connected() {
    println!("Currently connected");
}

Error Handling

The library provides structured error types:

use stream_tungstenite::error::*;

// Top-level errors
match client.run().await {
    Ok(()) => println!("Client shutdown gracefully"),
    Err(ClientError::Connect(e)) => println!("Connection error: {}", e),
    Err(ClientError::AlreadyRunning) => println!("Client already running"),
    Err(e) => println!("Other error: {}", e),
}

// Connection errors include retryability info
let error = ConnectError::TcpFailed("connection refused".into());
if error.is_retryable() {
    println!("Will retry connection");
}

TLS Configuration

The library supports multiple TLS backends via feature flags. Choose one based on your requirements:

Available TLS Features

[dependencies]
# Default: native-tls (system TLS, cross-platform)
stream-tungstenite = "0.6"

# Rustls with system certificates (pure Rust, recommended)
stream-tungstenite = { version = "0.6", default-features = false, features = ["rustls-tls-native-roots"] }

# Rustls with embedded certificates (no system dependency)
stream-tungstenite = { version = "0.6", default-features = false, features = ["rustls-tls-webpki-roots"] }

# Native TLS with vendored dependencies (for static linking)
stream-tungstenite = { version = "0.6", default-features = false, features = ["native-tls-vendored"] }

Custom TLS Configuration

Complete Control with TLS Connector

use stream_tungstenite::prelude::*;
use tokio_tungstenite::Connector;

// Native TLS example
#[cfg(feature = "native-tls")]
{
    use native_tls::TlsConnector as NativeTlsConnector;

    let tls = NativeTlsConnector::builder()
        .min_protocol_version(Some(native_tls::Protocol::Tlsv12))
        .build()?;

    let connector = DefaultConnector::new()
        .with_tls_connector(Connector::NativeTls(tls));

    let client = WebSocketClient::builder("wss://example.com/ws")
        .connector(connector)
        .build();
}

// Rustls example
#[cfg(feature = "__rustls-tls")]
{
    use rustls::ClientConfig;
    use std::sync::Arc;

    let mut config = ClientConfig::builder()
        .with_root_certificates(/* your certs */)
        .with_no_client_auth();

    let connector = DefaultConnector::new()
        .with_tls_connector(Connector::Rustls(Arc::new(config)));
}

Disable Certificate Verification (Testing Only!)

Security Warning: Never use in production!

use stream_tungstenite::prelude::*;

// For testing with self-signed certificates
let connector = DefaultConnector::new()
    .danger_accept_invalid_certs()?;

let client = WebSocketClient::builder("wss://localhost:8080")
    .connector(connector)
    .build();

Custom CA Certificates

use stream_tungstenite::prelude::*;

#[cfg(feature = "native-tls")]
{
    use native_tls::Certificate;
    use std::fs;

    // Load custom CA certificate
    let ca_cert = fs::read("internal-ca.pem")?;
    let cert = Certificate::from_pem(&ca_cert)?;

    let connector = DefaultConnector::new()
        .with_custom_ca_cert(cert)?;

    let client = WebSocketClient::builder("wss://internal.example.com/ws")
        .connector(connector)
        .build();
}

Client Certificate Authentication (mTLS)

use stream_tungstenite::prelude::*;

#[cfg(feature = "native-tls")]
{
    use native_tls::Identity;
    use std::fs;

    // Load client certificate (PKCS#12 format)
    let pkcs12 = fs::read("client-cert.p12")?;
    let identity = Identity::from_pkcs12(&pkcs12, "password")?;

    let connector = DefaultConnector::new()
        .with_client_identity(identity)?;

    let client = WebSocketClient::builder("wss://api.example.com/ws")
        .connector(connector)
        .build();
}

TLS Feature Comparison

Feature native-tls rustls-tls-native-roots rustls-tls-webpki-roots
Backend System (OpenSSL/Security.framework/SChannel) Pure Rust Pure Rust
System Certs ✅ Yes ✅ Yes ❌ No (embedded)
Binary Size Smaller Medium Medium
Cross-compile Harder Easier Easier
FIPS Depends on system ❌ No ❌ No
Performance Good Excellent Excellent

Recommendation: Use rustls-tls-native-roots for most projects (better security, pure Rust). Use native-tls if you need system certificate stores or FIPS compliance.

Installation

Add to your Cargo.toml:

[dependencies]
stream-tungstenite = "0.6"
tokio = { version = "1.0", features = ["full"] }

License

This project is licensed under the Apache License Version 2.0. See the LICENSE file for details.

Contributing

Contributions are welcome! Please feel free to submit Pull Requests or create Issues.

Commit count: 0

cargo fmt