| Crates.io | stream-tungstenite |
| lib.rs | stream-tungstenite |
| version | 0.6.1 |
| created_at | 2024-10-23 08:46:57.113438+00 |
| updated_at | 2026-01-12 10:36:49.5805+00 |
| description | A streaming implementation of the Tungstenite WebSocket protocol |
| homepage | |
| repository | |
| max_upload_size | |
| id | 1419843 |
| size | 247,903 |
A robust Rust WebSocket client library with automatic reconnection, customizable retry strategies, and an extensible architecture.
┌─────────────────────────────────────────────────────────────────┐
│ WebSocketClient │
│ - High-level API for WebSocket connections │
│ - Automatic reconnection with retry strategies │
│ - Message broadcasting via channels │
└─────────────────────────────────────────────────────────────────┘
│
┌───────────────────┼───────────────────┐
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Handshake │ │ Extension │ │ Connection │
│ - Auth │ │ - Lifecycle │ │ - State │
│ - Subscribe │ │ - Messages │ │ - Retry │
│ - Chained │ │ - Logging │ │ - Supervisor │
└─────────────────┘ └─────────────────┘ └─────────────────┘
use stream_tungstenite::prelude::*;
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create client with builder pattern
let client = WebSocketClient::builder("wss://echo.websocket.org")
.receive_timeout(std::time::Duration::from_secs(30))
.build();
// Subscribe to messages before running
let mut messages = client.subscribe();
// Run client in background
let client = Arc::new(client);
tokio::spawn({
let client = client.clone();
async move { client.run().await }
});
// Receive messages (Arc<Message> for zero-copy)
while let Ok(msg) = messages.recv().await {
println!("Received: {:?}", msg);
// Access message: msg.as_ref() or &*msg
// Clone if needed: (*msg).clone()
}
Ok(())
}
use stream_tungstenite::prelude::*;
use std::sync::Arc;
use tokio_tungstenite::tungstenite::Message;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = Arc::new(WebSocketClient::new("wss://echo.websocket.org"));
// Get a sender handle
if let Some(sender) = client.sender().await {
// Send text message
sender.send_text("Hello, WebSocket!")?;
// Send binary message
sender.send_binary(vec![1, 2, 3])?;
// Send ping
sender.ping(vec![])?;
}
Ok(())
}
client.send(msg).awaitSendError::ChannelFull.client.send_async(msg).awaitSendError::ChannelClosed if not connected.client.send_timeout(msg, Duration::from_secs(1)).awaitSendError::Timeout(_) if it expires.Configure queue capacity:
let cfg = ClientConfig::new().with_send_queue_capacity(512);
Configure handshake retry delay:
let cfg = ClientConfig::new().with_handshake_retry_delay(std::time::Duration::from_millis(500));
use stream_tungstenite::prelude::*;
use std::time::Duration;
// Create custom configuration
let config = ClientConfig::new()
.with_receive_timeout(Duration::from_secs(30))
.with_exit_on_first_failure(false)
.with_nodelay(true)
.with_channel_buffer(512);
let client = WebSocketClient::builder("wss://example.com/ws")
.config(config)
.build();
The library provides preset configurations for common scenarios:
For low-latency scenarios requiring quick recovery:
let config = ClientConfig::fast_reconnect();
// - Receive timeout: 10 seconds
// - Connect timeout: 10 seconds
// - Nagle disabled: true
// - Channel buffer: 512
For long-running, stable connections:
let config = ClientConfig::stable_connection();
// - Receive timeout: 60 seconds
// - Connect timeout: 60 seconds
// - Nagle disabled: false
// - Channel buffer: 128
use stream_tungstenite::prelude::*;
use std::time::Duration;
let client = WebSocketClient::builder("wss://example.com/ws")
.exponential_backoff(
Duration::from_millis(100), // Initial delay
Duration::from_secs(30), // Maximum delay
2.0, // Multiplier
)
.build();
// Fast reconnection - quick recovery
let strategy = ExponentialBackoff::fast();
// - 100ms initial, 5s max, 1.5x factor
// Standard - balanced approach
let strategy = ExponentialBackoff::standard();
// - 1s initial, 60s max, 2x factor
// Conservative - longer waits
let strategy = ExponentialBackoff::conservative();
// - 2s initial, 120s max, 2x factor
use stream_tungstenite::prelude::*;
use std::time::Duration;
let strategy = FixedDelay::new(Duration::from_secs(5))
.with_max_attempts(10);
let client = WebSocketClient::builder("wss://example.com/ws")
.retry_strategy(strategy)
.build();
let client = WebSocketClient::builder("wss://example.com/ws")
.no_retry()
.build();
Perform application-level handshakes after WebSocket connection is established.
use stream_tungstenite::prelude::*;
// Plain text token
let auth = AuthHandshaker::new("my-api-key");
// JSON format: {"type":"auth","token":"..."}
let auth = AuthHandshaker::new("my-api-key").json();
// Custom format
let auth = AuthHandshaker::new("my-api-key")
.custom_format("AUTH {}");
let client = WebSocketClient::builder("wss://example.com/ws")
.handshaker(auth)
.build();
use stream_tungstenite::prelude::*;
use std::time::Duration;
let cfg = ClientConfig::new()
.with_connect_timeout(Duration::from_secs(15)) // TCP/WebSocket connect timeout
.with_handshake_retry_delay(Duration::from_millis(500)); // wait before retrying failed handshakes
Extensions can transform or filter messages; the dispatcher applies them before broadcasting:
use async_trait::async_trait;
use stream_tungstenite::extension::Extension;
use stream_tungstenite::context::ConnectionContext;
use tungstenite::Message;
struct UppercaseExt;
#[async_trait]
impl Extension for UppercaseExt {
fn name(&self) -> &'static str { "upper" }
fn handles_messages(&self) -> bool { true }
// Receives &Message for zero-copy efficiency
async fn on_message(
&self,
msg: &Message,
_ctx: &ConnectionContext,
) -> Result<Option<Message>, stream_tungstenite::error::ExtensionError> {
if let Message::Text(t) = msg {
Ok(Some(Message::Text(t.to_uppercase().into())))
} else {
Ok(Some(msg.clone()))
}
}
}
use stream_tungstenite::prelude::*;
let subscribe = SubscribeHandshaker::new(vec!["trades", "orderbook"])
.wait_confirmation();
let client = WebSocketClient::builder("wss://example.com/ws")
.handshaker(subscribe)
.build();
use stream_tungstenite::prelude::*;
// Chain multiple handshakers
let handshaker = ChainedHandshaker::new()
.then(AuthHandshaker::new("my-api-key").json())
.then(SubscribeHandshaker::new(vec!["trades", "orderbook"]));
let client = WebSocketClient::builder("wss://example.com/ws")
.handshaker(handshaker)
.build();
// Or use the macro
let handshaker = chain_handshakers!(
AuthHandshaker::new("my-api-key").json(),
SubscribeHandshaker::new(vec!["trades"]),
);
Add custom functionality through the extension system.
use stream_tungstenite::prelude::*;
// Basic logging
let logging = LoggingExtension::new();
// Verbose with message logging
let logging = LoggingExtension::verbose();
// Custom configuration
let config = LoggingConfig::new()
.with_level(LogLevel::Debug)
.with_messages()
.with_prefix("ws-client");
let logging = LoggingExtension::with_config(config);
// Register with client
client.register_extension(logging).await?;
use stream_tungstenite::prelude::*;
let status = StatusViewer::new();
// Check connection status
if status.is_connected().await {
println!("Connected!");
}
// Advanced status with history
let advanced = AdvancedStatusViewer::new();
let uptime = advanced.get_uptime().await;
let connection_count = advanced.get_connection_count().await;
use stream_tungstenite::extension::Extension;
use stream_tungstenite::context::ConnectionContext;
use stream_tungstenite::error::ExtensionError;
use async_trait::async_trait;
use tokio_tungstenite::tungstenite::Message;
struct MyExtension;
#[async_trait]
impl Extension for MyExtension {
fn name(&self) -> &'static str {
"my_extension"
}
fn handles_lifecycle(&self) -> bool {
true
}
fn handles_messages(&self) -> bool {
true
}
async fn on_connect(&self, ctx: &ConnectionContext) -> Result<(), ExtensionError> {
println!("Connected! ID: {}", ctx.connection_id);
Ok(())
}
async fn on_disconnect(&self, ctx: &ConnectionContext) -> Result<(), ExtensionError> {
println!("Disconnected!");
Ok(())
}
async fn on_message(
&self,
message: &Message,
_ctx: &ConnectionContext,
) -> Result<Option<Message>, ExtensionError> {
// Return Some(message) to pass through
// Return None to filter out
// Receives &Message for zero-copy efficiency
Ok(Some(message.clone()))
}
}
Monitor connection state changes:
use stream_tungstenite::prelude::*;
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = Arc::new(WebSocketClient::new("wss://example.com/ws"));
// Subscribe to connection events
let mut events = client.subscribe_events();
tokio::spawn({
let client = client.clone();
async move { client.run().await }
});
while let Ok(event) = events.recv().await {
match event {
ConnectionEvent::Connected { id } => {
println!("Connected: {}", id);
}
ConnectionEvent::Disconnected { reason } => {
println!("Disconnected: {:?}", reason);
}
ConnectionEvent::ReconnectScheduled { delay, attempt } => {
println!("Reconnecting in {:?} (attempt {})", delay, attempt);
}
ConnectionEvent::Error { error, attempt } => {
println!("Error on attempt {}: {}", attempt, error);
}
_ => {}
}
}
Ok(())
}
Access detailed connection state:
// Get current state snapshot
let state = client.state().await;
println!("Connection ID: {}", state.id);
println!("Status: {:?}", state.status);
println!("Reconnect count: {}", state.reconnect_count);
println!("Error count: {}", state.error_count);
if let Some(duration) = state.connection_duration {
println!("Connected for: {:?}", duration);
}
// Quick connection check
if client.is_connected() {
println!("Currently connected");
}
The library provides structured error types:
use stream_tungstenite::error::*;
// Top-level errors
match client.run().await {
Ok(()) => println!("Client shutdown gracefully"),
Err(ClientError::Connect(e)) => println!("Connection error: {}", e),
Err(ClientError::AlreadyRunning) => println!("Client already running"),
Err(e) => println!("Other error: {}", e),
}
// Connection errors include retryability info
let error = ConnectError::TcpFailed("connection refused".into());
if error.is_retryable() {
println!("Will retry connection");
}
The library supports multiple TLS backends via feature flags. Choose one based on your requirements:
[dependencies]
# Default: native-tls (system TLS, cross-platform)
stream-tungstenite = "0.6"
# Rustls with system certificates (pure Rust, recommended)
stream-tungstenite = { version = "0.6", default-features = false, features = ["rustls-tls-native-roots"] }
# Rustls with embedded certificates (no system dependency)
stream-tungstenite = { version = "0.6", default-features = false, features = ["rustls-tls-webpki-roots"] }
# Native TLS with vendored dependencies (for static linking)
stream-tungstenite = { version = "0.6", default-features = false, features = ["native-tls-vendored"] }
use stream_tungstenite::prelude::*;
use tokio_tungstenite::Connector;
// Native TLS example
#[cfg(feature = "native-tls")]
{
use native_tls::TlsConnector as NativeTlsConnector;
let tls = NativeTlsConnector::builder()
.min_protocol_version(Some(native_tls::Protocol::Tlsv12))
.build()?;
let connector = DefaultConnector::new()
.with_tls_connector(Connector::NativeTls(tls));
let client = WebSocketClient::builder("wss://example.com/ws")
.connector(connector)
.build();
}
// Rustls example
#[cfg(feature = "__rustls-tls")]
{
use rustls::ClientConfig;
use std::sync::Arc;
let mut config = ClientConfig::builder()
.with_root_certificates(/* your certs */)
.with_no_client_auth();
let connector = DefaultConnector::new()
.with_tls_connector(Connector::Rustls(Arc::new(config)));
}
Security Warning: Never use in production!
use stream_tungstenite::prelude::*;
// For testing with self-signed certificates
let connector = DefaultConnector::new()
.danger_accept_invalid_certs()?;
let client = WebSocketClient::builder("wss://localhost:8080")
.connector(connector)
.build();
use stream_tungstenite::prelude::*;
#[cfg(feature = "native-tls")]
{
use native_tls::Certificate;
use std::fs;
// Load custom CA certificate
let ca_cert = fs::read("internal-ca.pem")?;
let cert = Certificate::from_pem(&ca_cert)?;
let connector = DefaultConnector::new()
.with_custom_ca_cert(cert)?;
let client = WebSocketClient::builder("wss://internal.example.com/ws")
.connector(connector)
.build();
}
use stream_tungstenite::prelude::*;
#[cfg(feature = "native-tls")]
{
use native_tls::Identity;
use std::fs;
// Load client certificate (PKCS#12 format)
let pkcs12 = fs::read("client-cert.p12")?;
let identity = Identity::from_pkcs12(&pkcs12, "password")?;
let connector = DefaultConnector::new()
.with_client_identity(identity)?;
let client = WebSocketClient::builder("wss://api.example.com/ws")
.connector(connector)
.build();
}
| Feature | native-tls | rustls-tls-native-roots | rustls-tls-webpki-roots |
|---|---|---|---|
| Backend | System (OpenSSL/Security.framework/SChannel) | Pure Rust | Pure Rust |
| System Certs | ✅ Yes | ✅ Yes | ❌ No (embedded) |
| Binary Size | Smaller | Medium | Medium |
| Cross-compile | Harder | Easier | Easier |
| FIPS | Depends on system | ❌ No | ❌ No |
| Performance | Good | Excellent | Excellent |
Recommendation: Use rustls-tls-native-roots for most projects (better security, pure Rust). Use native-tls if you need system certificate stores or FIPS compliance.
Add to your Cargo.toml:
[dependencies]
stream-tungstenite = "0.6"
tokio = { version = "1.0", features = ["full"] }
This project is licensed under the Apache License Version 2.0. See the LICENSE file for details.
Contributions are welcome! Please feel free to submit Pull Requests or create Issues.