Crates.io | mqttier |
lib.rs | mqttier |
version | 0.3.0 |
created_at | 2025-09-24 15:04:32.99578+00 |
updated_at | 2025-09-24 15:04:32.99578+00 |
description | A Rust MQTT client library providing an abstracted interface around rumqttc. |
homepage | https://github.com/stinger-ipc/mqttier |
repository | https://github.com/stinger-ipc/mqttier |
max_upload_size | |
id | 1853254 |
size | 103,593 |
A Rust MQTT client library providing an abstracted interface around rumqttc.
Add this to your Cargo.toml
(use the current crate version):
[dependencies]
mqttier = "0.2"
This example has been abbreviated to focus on library-specific calls.
// Create a new MQTT client
let client = MqttierClient::new("localhost", 1883, Some("mqttier_example".to_string())).unwrap();
// Start the run loop (spawned background tasks). Call once per client.
client.run_loop().await.unwrap();
// Create mpsc channel for receiving messages
let (message_tx, mut message_rx) = mpsc::channel::<ReceivedMessage>(64);
// Subscribe to a topic. We pass the tx-side of the channel that will receive messages
// for this subscription. The function returns a subscription id (`usize`).
let subscription_id = client.subscribe("test/topic".to_string(), 0, message_tx).await.unwrap();
// Start a task to handle incoming messages
tokio::spawn(async move {
while let Some(message) = message_rx.recv().await {
if let Ok(test_msg) = serde_json::from_slice::<TestMessage>(&message.payload) {
println!("Parsed message: {:?}", test_msg);
}
}
});
// Publish a serializable structure and wait for its publish completion
let completion_rx = client.publish_structure("test/topic".to_string(), &test_message).await.unwrap();
match completion_rx.await {
Ok(result) => println!("Publish result: {:?}", result),
Err(_) => println!("Publish completion channel closed"),
}
new(hostname: &str, port: u16, client_id: Option<String>) -> Result<Self>
Creates a new MQTT client.
hostname
: The MQTT broker hostnameport
: The MQTT broker portclient_id
: Optional client ID. If None
, a random UUID is generatedrun_loop() -> Result<()>
Starts the background run loop for connections and publishing. This should be called once per client. If already running, this method does nothing.
subscribe(topic: String, qos: u8, message_tx: mpsc::Sender<ReceivedMessage>) -> Result<usize>
Subscribes to a topic and returns a subscription ID (usize
).
topic
: The MQTT topic to subscribe to
qos
: The Quality of Service level (0, 1, or 2)
message_tx
: The sender channel for delivering received messages (bounded channel with capacity you choose)
Returns: The subscription ID (usize
)
publish_structure<T: Serialize>(topic: String, payload: T) -> Result<oneshot::Receiver<PublishResult>>
Publishes a serializable struct as JSON to a topic and returns a oneshot::Receiver<PublishResult>
which will resolve with the publish outcome.
topic
: The MQTT topic to publish topayload
: The struct to publish (must implement Serialize
)publish_request<T: Serialize>(topic: String, payload: T, response_topic: String, correlation_id: Vec<u8>) -> Result<oneshot::Receiver<PublishResult>>
Publishes a request message with response topic and correlation ID.
topic
: The MQTT topic to publish topayload
: The struct to publishresponse_topic
: The topic for responsescorrelation_id
: Correlation ID for matching responses (Vec<u8>
)publish_response<T: Serialize>(topic: String, payload: T, correlation_id: Vec<u8>) -> Result<oneshot::Receiver<PublishResult>>
Publishes a response message with correlation ID.
topic
: The MQTT topic to publish topayload
: The struct to publishcorrelation_id
: Correlation ID for matching requestspublish_state<T: Serialize>(topic: String, payload: T, state_version: u32) -> Result<oneshot::Receiver<PublishResult>>
Publishes a state message with a version property.
topic
: The MQTT topic to publish topayload
: The struct to publishstate_version
: Version number for the stateThe client automatically reconnects when the connection is lost, with a 5-second delay between attempts.
When disconnected:
The library emits tracing events using the tracing
crate. Initialize a tracing subscriber in your application, for example:
tracing_subscriber::fmt().with_env_filter("info").init();
See examples/basic_usage.rs
for a complete example.
Run the example with:
cargo run --example basic_usage
This project is licensed under the LGPLv2 License. (Not v3!)