| Crates.io | mqtt5 |
| lib.rs | mqtt5 |
| version | 0.4.4 |
| created_at | 2025-08-04 17:12:30.63689+00 |
| updated_at | 2025-08-18 03:03:06.5251+00 |
| description | Complete MQTT v5.0 platform with high-performance async client and full-featured broker supporting TCP, TLS, WebSocket, authentication, bridging, and resource monitoring |
| homepage | |
| repository | https://github.com/fabriciobracht/mqtt-lib |
| max_upload_size | |
| id | 1780962 |
| size | 2,087,080 |
π A complete MQTT v5.0 platform featuring both high-performance client library AND full-featured broker implementation - pure Rust, zero unsafe code
This project provides everything you need for MQTT v5.0 development:
| Component | Use Case | Key Features |
|---|---|---|
| MQTT Broker | Run your own MQTT infrastructure | TLS, WebSocket, Authentication, Bridging, Monitoring |
| MQTT Client | Connect to any MQTT broker | AWS IoT compatible, Auto-reconnect, Mock testing |
[dependencies]
mqtt5 = "0.4.1"
cargo install mqttv5-cli
use mqtt5::broker::{BrokerConfig, MqttBroker};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create broker with default configuration
let mut broker = MqttBroker::bind("0.0.0.0:1883").await?;
println!("π MQTT broker running on port 1883");
// Run until shutdown
broker.run().await?;
Ok(())
}
use mqtt5::{MqttClient, QoS};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = MqttClient::new("my-device-001");
// Connect to your broker (or any MQTT broker)
client.connect("mqtt://localhost:1883").await?;
// Subscribe with callback
client.subscribe("sensors/+/data", |msg| {
println!("π§ {}: {}", msg.topic, String::from_utf8_lossy(&msg.payload));
}).await?;
// Publish a message
client.publish("sensors/temp/data", b"25.5Β°C").await?;
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
Ok(())
}
Superior CLI tool that replaces mosquitto_pub, mosquitto_sub, and mosquitto with unified ergonomics:
# Install from crates.io
cargo install mqttv5-cli
# Or build from source
git clone https://github.com/fabriciobracht/mqtt-lib
cd mqtt-lib
cargo build --release -p mqttv5-cli
# Start a broker (replaces mosquitto daemon)
mqttv5 broker --host 0.0.0.0:1883
# Publish a message (replaces mosquitto_pub)
mqttv5 pub --topic "sensors/temperature" --message "23.5"
# Subscribe to topics (replaces mosquitto_sub)
mqttv5 sub --topic "sensors/+" --verbose
# Smart prompting when arguments are missing
mqttv5 pub
# ? MQTT topic βΊ sensors/
# ? Message content βΊ Hello World!
# ? Quality of Service level βΊ β 0 (At most once)
--topic instead of -t, with short aliases available(packet_id, qos) like Python paho-mqttMqttClientTrait enables testing without real brokersuse mqtt_v5::broker::{BrokerConfig, TlsConfig, WebSocketConfig};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = BrokerConfig::default()
// TCP on port 1883
.with_bind_address("0.0.0.0:1883".parse()?)
// TLS on port 8883
.with_tls(
TlsConfig::new("certs/server.crt".into(), "certs/server.key".into())
.with_ca_file("certs/ca.crt".into())
.with_bind_address("0.0.0.0:8883".parse()?)
)
// WebSocket on port 8080
.with_websocket(
WebSocketConfig::default()
.with_bind_address("0.0.0.0:8080".parse()?)
.with_path("/mqtt")
)
.with_max_clients(10_000);
let mut broker = MqttBroker::with_config(config).await?;
println!("π Multi-transport MQTT broker running:");
println!(" π‘ TCP: mqtt://localhost:1883");
println!(" π TLS: mqtts://localhost:8883");
println!(" π WebSocket: ws://localhost:8080/mqtt");
broker.run().await?;
Ok(())
}
use mqtt_v5::broker::{BrokerConfig, AuthConfig, AuthMethod};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let auth_config = AuthConfig {
allow_anonymous: false,
password_file: Some("users.txt".into()),
auth_method: AuthMethod::Password,
auth_data: None,
};
let config = BrokerConfig::default()
.with_bind_address("0.0.0.0:1883".parse()?)
.with_auth(auth_config);
let mut broker = MqttBroker::with_config(config).await?;
broker.run().await?;
Ok(())
}
use mqtt_v5::broker::bridge::{BridgeConfig, BridgeDirection};
use mqtt_v5::QoS;
// Connect two brokers together
let bridge_config = BridgeConfig::new("edge-to-cloud", "cloud-broker:1883")
// Forward sensor data from edge to cloud
.add_topic("sensors/+/data", BridgeDirection::Out, QoS::AtLeastOnce)
// Receive commands from cloud to edge
.add_topic("commands/+/device", BridgeDirection::In, QoS::AtLeastOnce)
// Bidirectional health monitoring
.add_topic("health/+/status", BridgeDirection::Both, QoS::AtMostOnce);
// Add bridge to broker (broker handles connection management)
// broker.add_bridge(bridge_config).await?;
use mqtt_v5::{MockMqttClient, MqttClientTrait, PublishResult, QoS};
#[tokio::test]
async fn test_my_iot_function() {
// Create mock client
let mock = MockMqttClient::new("test-device");
// Configure mock responses
mock.set_connect_response(Ok(())).await;
mock.set_publish_response(Ok(PublishResult::QoS1Or2 { packet_id: 123 })).await;
// Test your function that accepts MqttClientTrait
my_iot_function(&mock).await.unwrap();
// Verify the calls
let calls = mock.get_calls().await;
assert_eq!(calls.len(), 2); // connect + publish
}
// Your production code uses the trait
async fn my_iot_function<T: MqttClientTrait>(client: &T) -> Result<(), Box<dyn std::error::Error>> {
client.connect("mqtt://broker").await?;
client.publish_qos1("telemetry", b"data").await?;
Ok(())
}
The client library includes AWS IoT compatibility features:
use mqtt_v5::{MqttClient, ConnectOptions};
use std::time::Duration;
// AWS IoT endpoint detection and connection handling
let client = MqttClient::new("aws-iot-device-12345");
// Connect to AWS IoT endpoint (automatically detects AWS IoT and optimizes connection)
client.connect("mqtts://abcdef123456.iot.us-east-1.amazonaws.com:8883").await?;
// Subscribe returns (packet_id, qos) tuple for compatibility
let (packet_id, qos) = client.subscribe("$aws/things/device-123/shadow/update/accepted", |msg| {
println!("Shadow update accepted: {:?}", msg.payload);
}).await?;
// AWS IoT topic validation prevents publishing to reserved topics
use mqtt_v5::validation::namespace::NamespaceValidator;
let validator = NamespaceValidator::aws_iot().with_device_id("device-123");
// This will succeed - device can update its own shadow
client.publish("$aws/things/device-123/shadow/update", shadow_data).await?;
// This will be rejected - device cannot publish to shadow response topics
// client.publish("$aws/things/device-123/shadow/update/accepted", data).await?; // Error!
Key AWS IoT features:
(packet_id, qos) tuple like other AWS SDKscargo install cargo-make)# Clone the repository
git clone https://github.com/fabriciobracht/mqtt-lib.git
cd mqtt-lib
# Install development tools and git hooks
./scripts/install-hooks.sh
# Run all CI checks locally (MUST pass before pushing)
cargo make ci-verify
# Development
cargo make build # Build the project
cargo make build-release # Build optimized release version
cargo make test # Run all tests
cargo make fmt # Format code
cargo make clippy # Run linter
# CI/CD
cargo make ci-verify # Run ALL CI checks (must pass before push)
cargo make pre-commit # Run before committing (fmt + clippy + test)
# Examples (use raw cargo for specific targets)
cargo run --example simple_broker # Start basic broker
cargo run --example broker_with_tls # TLS-enabled broker
cargo run --example broker_with_websocket # WebSocket-enabled broker
cargo run --example broker_bridge_demo # Broker bridging demo
# Benchmarks (use raw cargo for specific targets)
cargo bench --bench broker_performance # Broker performance tests
cargo bench --bench mqtt_benchmarks # Core MQTT benchmarks
# Generate test certificates (required for TLS tests)
./scripts/generate_test_certs.sh
# Run unit tests (fast)
cargo make test-fast
# Run all tests including integration tests
cargo make test
# Run specific test suites (use raw cargo for specific targets)
cargo test --test broker_performance_tests
cargo test --test connection_pool_performance
This project follows modern Rust async patterns:
Arc<RwLock<T>> (no message passing)The broker is designed for high performance:
Security is built-in, not bolted-on:
This project is licensed under either of
at your option.
We welcome contributions! Please see CONTRIBUTING.md for guidelines.
Built with β€οΈ in Rust. One reliable state machine.