| Crates.io | mqtt-endpoint-tokio |
| lib.rs | mqtt-endpoint-tokio |
| version | 0.6.5 |
| created_at | 2025-08-04 14:28:44.427974+00 |
| updated_at | 2026-01-17 01:48:02.041361+00 |
| description | A high-performance async MQTT client/server library for Rust with tokio, supporting MQTT v5.0 and v3.1.1 with TCP, TLS, and WebSocket transports |
| homepage | https://github.com/redboltz/mqtt-endpoint-tokio |
| repository | https://github.com/redboltz/mqtt-endpoint-tokio |
| max_upload_size | |
| id | 1780776 |
| size | 691,032 |
A high-performance async MQTT client/server library for Rust with tokio, supporting MQTT v5.0 and v3.1.1 with TCP, TLS, WebSocket, and QUIC transports.
Basic TCP connections for standard MQTT communication.
Secure connections with full TLS support using rustls.
WebSocket connections for web-based MQTT clients, supporting both:
High-performance QUIC connections for low-latency MQTT communication using quinn.
Local inter-process communication on Unix-based systems (Linux, macOS, BSD). Ideal for scenarios where the client and broker run on the same machine, offering better performance than TCP/IP sockets.
Add this to your Cargo.toml:
[dependencies]
mqtt-endpoint-tokio = "0.6"
The library provides optional features to reduce dependencies based on your transport needs:
# All transports enabled (default)
mqtt-endpoint-tokio = "0.6"
# Only TCP transport (minimal dependencies)
mqtt-endpoint-tokio = { version = "0.6", default-features = false }
# TCP + TLS
mqtt-endpoint-tokio = { version = "0.6", default-features = false, features = ["tls"] }
# TCP + WebSocket
mqtt-endpoint-tokio = { version = "0.6", default-features = false, features = ["ws"] }
# TCP + QUIC (includes TLS)
mqtt-endpoint-tokio = { version = "0.6", default-features = false, features = ["quic"] }
# Custom combination
mqtt-endpoint-tokio = { version = "0.6", default-features = false, features = ["tls", "ws"] }
Available Features:
tls - TLS transport support (enabled by default)ws - WebSocket transport support (enabled by default)quic - QUIC transport support (enabled by default, requires tls)unix-socket - Unix domain socket transport support (enabled by default, Unix systems only)tracing - Enable tracing support for debuggingDefault features: ["tls", "ws", "quic", "unix-socket"]
use mqtt_endpoint_tokio::mqtt_ep;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create endpoint
let endpoint = mqtt_ep::endpoint::Endpoint::new(mqtt_ep::Version::V5_0);
// Connect to broker
let tcp_stream = mqtt_ep::transport::connect_helper::connect_tcp("127.0.0.1:1883", None).await?;
let transport = mqtt_ep::transport::TcpTransport::from_stream(tcp_stream);
endpoint.attach(transport, mqtt_ep::endpoint::Mode::Client).await?;
// Send CONNECT packet
let connect = mqtt_ep::packet::v5_0::Connect::builder()
.client_id("rust-client")
.unwrap()
.keep_alive(60)
.clean_start(true)
.build()
.unwrap();
endpoint.send(connect).await?;
// Receive CONNACK
let packet = endpoint.recv().await?;
println!("Received: {packet:?}");
Ok(())
}
use mqtt_endpoint_tokio::mqtt_ep;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Connect via WebSocket
let ws_stream = mqtt_ep::transport::connect_helper::connect_tcp_ws(
"127.0.0.1:8080",
"/mqtt",
None,
None
).await?;
let transport = mqtt_ep::transport::WebSocketTransport::from_tcp_client_stream(ws_stream.into_inner());
// ... rest of MQTT communication
Ok(())
}
use mqtt_endpoint_tokio::mqtt_ep;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Connect via TLS
let tls_stream = mqtt_ep::transport::connect_helper::connect_tcp_tls(
"broker.example.com:8883",
"broker.example.com",
None,
None
).await?;
let transport = mqtt_ep::transport::TlsTransport::from_stream(tls_stream);
// ... rest of MQTT communication
Ok(())
}
use mqtt_endpoint_tokio::mqtt_ep;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Connect via QUIC
let (send_stream, recv_stream) = mqtt_ep::transport::connect_helper::connect_quic(
"127.0.0.1:14567",
"localhost",
None,
None,
None
).await?;
let transport = mqtt_ep::transport::QuicTransport::from_streams(send_stream, recv_stream);
// ... rest of MQTT communication
Ok(())
}
use mqtt_endpoint_tokio::mqtt_ep;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Connect via Unix domain socket
let unix_stream = mqtt_ep::transport::connect_helper::connect_unix(
"/tmp/mqtt.sock",
None
).await?;
let transport = mqtt_ep::transport::UnixStreamTransport::from_stream(unix_stream);
// ... rest of MQTT communication
Ok(())
}
This library is built on top of mqtt-protocol-core, which provides the Sans-I/O MQTT protocol implementation. The mqtt-endpoint-tokio layer adds:
The library supports both standard u16 packet IDs and extended u32 packet IDs for broker clustering scenarios:
use mqtt_endpoint_tokio::mqtt_ep;
// Standard u16 packet IDs
type StandardEndpoint = mqtt_ep::endpoint::Endpoint<mqtt_ep::role::Client>;
// Extended u32 packet IDs for broker clustering
type ExtendedEndpoint = mqtt_ep::endpoint::GenericEndpoint<mqtt_ep::role::Client, u32>;
Comprehensive error types provide detailed information about failures:
use mqtt_endpoint_tokio::mqtt_ep;
match endpoint.send(packet).await {
Ok(()) => println!("Packet sent"),
Err(mqtt_ep::connection_error::ConnectionError::NotConnected) => println!("Need to connect first"),
Err(mqtt_ep::connection_error::ConnectionError::Transport(e)) => println!("Network error: {e}"),
Err(mqtt_ep::connection_error::ConnectionError::Mqtt(e)) => println!("Protocol error: {e:?}"),
Err(e) => println!("Other error: {e}"),
}
This project is licensed under the MIT License - see the LICENSE file for details.
Contributions are welcome! Please feel free to submit a Pull Request.