| Crates.io | stinger-mqtt-trait |
| lib.rs | stinger-mqtt-trait |
| version | 0.5.1 |
| created_at | 2025-10-24 14:50:00.859588+00 |
| updated_at | 2025-10-31 20:36:09.742997+00 |
| description | Rust trait for defining an interface for a MQTT client |
| homepage | |
| repository | https://github.com/pearmaster/stinger-mqtt-trait |
| max_upload_size | |
| id | 1898533 |
| size | 152,916 |
A Rust minimal trait abstraction for MQTT pub/sub operations, providing a clean interface for publishing and subscribing to MQTT topics without being tied to a specific MQTT library. It allows libraries to perform MQTT pub/sub operations using a client connection managed by the application.
It is part of the "Stinger" suite of inter-process communication tools, but has no requirements on the suite and can easily be used elsewhere. See documentation
stinger-mqtt-trait is intentionally minimal and focused on pub/sub operations. The trait defines the essential pub/sub operations that MQTT clients must support, while deliberately avoiding connection lifecycle management and implementation details that would constrain implementers.
Application manages connections, libraries use pub/sub:
Mqtt5PubSub to provide pub/sub operations on an already-connected clientMqtt5PubSub work with any implementation - swap MQTT backends without changing application codemock_client feature)This is not a batteries-included MQTT framework or full client abstraction. If you need:
...these belong in your concrete implementation or application code. The trait simply ensures your implementation's pub/sub operations can be used anywhere Mqtt5PubSub is accepted.
Mqtt5PubSub trait with any MQTT clientMqttMessage struct supports MQTT v5 properties (content type, correlation data, response topic, user properties)publish() - Await until broker acknowledges (QoS-aware: sent/PUBACK/PUBCOMP)publish_nowait() - Fire-and-forget, returns immediately after queuing (synchronous, no async overhead)publish_noblock() - Returns oneshot receiver for concurrent operationsbroadcast::Sender enables flexible message routing:
Sync + Send, and make it much easier to use with context than callbacks which must have 'static lifetimeswatch::Receiver for tracking connection lifecycle (read-only from library perspective)get_client_id() method for identifying the MQTT client instanceMqttMessageBuildermock_client)validation)Connection lifecycle varies dramatically between MQTT implementations and deployment scenarios:
The solution: Applications create and connect their MQTT client implementation, then pass it to libraries that use Mqtt5PubSub for pub/sub operations. This separation of concerns provides maximum flexibility while maintaining trait compatibility.
broadcast::Sender for Subscriptions?The subscribe() method requires a broadcast::Sender<MqttMessage> parameter. While other channel types are viable options, tokio::sync::broadcast was selected because of the wide support for tokio and ability to support multiple consumers. This design choice maximizes flexibility:
Application controls the channel: You create the broadcast channel with your desired capacity and lagging strategy, then pass the sender to subscribe(). This means:
Each method serves a semantically distinct use case:
publish() - "I need confirmation" - Most common case, blocks until QoS acknowledgment receivedpublish_noblock() - "I need to do other work" - Returns immediately with oneshot receiver for concurrent workflowspublish_nowait() - "Fire and forget" - Synchronous method with no async overhead, ideal for high-frequency telemetry or non-critical messagesThese are not redundant - they represent fundamentally different control flow patterns and have distinctly different return types and semantics.
Why not: Connection management is inherently application-specific and implementation-dependent. Different MQTT libraries have completely different connection APIs, configuration options, and lifecycle models. The current design recognizes this by:
The solution: Applications create and manage their MQTT client (with whatever connection logic they need), then use it via the Mqtt5PubSub trait.
Why channels are better for a trait:
If you prefer callbacks, write an adapter function.
Why they're not in the trait:
Where they belong: In your concrete implementation or as extension traits. The base trait ensures interoperability; implementations provide richness.
Add this to your Cargo.toml:
[dependencies]
stinger-mqtt-trait = "0.2"
To use the validation suite for testing your implementation:
[dev-dependencies]
stinger-mqtt-trait = { version = "0.2", features = ["validation"] }
use stinger_mqtt_trait::{MqttMessage, MqttMessageBuilder, QoS};
let msg = MqttMessageBuilder::default()
.topic("sensor/data")
.qos(QoS::ExactlyOnce)
.retain(false)
.payload(Payload::from_serializable(&my_struct)?)
.content_type(Some("application/json".to_string()))
.response_topic(Some("sensor/response".to_string()))
.user_property("PropertyKey", "PropertyValue")
.build()?;
Each consuming library will have their own style of messages, and I recommend the libraries simplify the builder pattern with static functions to create messages like this:
pub fn request_message<T: Serialize>(topic: &str, payload: &T, correlation_id: Uuid, response_topic: String) -> Result<MqttMessage> {
MqttMessageBuilder::default()
.topic(topic)
.object_payload(payload)?
.qos(QoS::ExactlyOnce)
.retain(false)
.correlation_data(Bytes::from(correlation_id.to_string()))
.response_topic(response_topic)
.build()?
}
use stinger_mqtt_trait::{Mqtt5PubSub, MqttMessage, Mqtt5PubSubError, MqttConnectionState, MqttPublishSuccess, QoS};
use async_trait::async_trait;
use tokio::sync::{broadcast, oneshot, watch};
struct MyMqtt5PubSubClient {
// Your implementation fields (wrapping an actual MQTT client)
// Note: Your client should already be connected before being used via this trait
}
#[async_trait]
impl Mqtt5PubSub for MyMqtt5PubSubClient {
fn get_client_id(&self) -> String {
// Return the MQTT client ID
todo!()
}
fn get_state(&self) -> watch::Receiver<MqttConnectionState> {
// Return watch receiver for connection state monitoring
todo!()
}
async fn subscribe(
&mut self,
topic: String,
qos: QoS,
tx: broadcast::Sender<MqttMessage>
) -> Result<u32, Mqtt5PubSubError> {
// Your subscribe logic, return subscription ID
todo!()
}
async fn unsubscribe(&mut self, topic: String) -> Result<(), Mqtt5PubSubError> {
// Your unsubscribe logic
todo!()
}
async fn publish(&mut self, message: MqttMessage) -> Result<MqttPublishSuccess, Mqtt5PubSubError> {
// Your publish logic (awaits completion based on QoS)
todo!()
}
async fn publish_noblock(&mut self, message: MqttMessage)
-> oneshot::Receiver<Result<MqttPublishSuccess, Mqtt5PubSubError>> {
// Your non-blocking publish logic (returns immediately with oneshot receiver)
todo!()
}
fn publish_nowait(&mut self, message: MqttMessage) -> Result<MqttPublishSuccess, Mqtt5PubSubError> {
// Your fire-and-forget publish logic (synchronous, no async overhead)
todo!()
}
}
The validation feature provides a comprehensive test suite for validating Mqtt5PubSub implementations:
use stinger_mqtt_trait::validation::run_full_pubsub_validation_suite;
#[tokio::test]
async fn test_my_implementation() {
// Application connects the client first
let mut client = MyMqtt5PubSubClient::new();
// ... connect your client here ...
// Test pub/sub operations
run_full_pubsub_validation_suite(&mut client)
.await
.unwrap();
}
See src/validation/README.md for more details on the validation suite.
A: This is a valid point - MQTT brokers can grant a different QoS than requested. A future version could return both the subscription ID and granted QoS, and this is an area for improvement.
A: Absolutely! The trait defines the minimum interface. Your implementation can (and should) add:
publish_json, subscribe_many)A: The trait is designed to work with both:
MqttMessage includes v5.0 properties (optional to use)A: Connection management is intentionally outside the scope of the Mqtt5PubSub trait. Your application should:
Mqtt5PubSub for pub/sub operationsThis separation allows maximum flexibility in how you handle connections, reconnections, and lifecycle management.
MIT