rumqttc-async-std

Crates.iorumqttc-async-std
lib.rsrumqttc-async-std
version0.5.0
sourcesrc
created_at2021-02-07 19:37:12.319016
updated_at2021-02-07 19:37:12.319016
descriptionAn efficient and robust mqtt client for your connected devices
homepage
repositoryhttps://github.com/Frans-Willem/rumqttc-async-std
max_upload_size
id352040
size180,443
Frans-Willem Hardijzer (Frans-Willem)

documentation

README

rumqttc-async-std

A fork of the rumqttc library found here, changed to use async-std instead of the tokio asynchronous runtime.

A pure rust MQTT client which strives to be robust, efficient and easy to use. This library is backed by an async (async-std) eventloop which handles all the robustness and and efficiency parts of MQTT but naturally fits into both sync and async worlds as we'll see

Let's jump into examples right away

A simple synchronous publish and subscribe

use rumqttc::{MqttOptions, Client, QoS};
use std::time::Duration;
use std::thread;

let mut mqttoptions = MqttOptions::new("rumqtt-sync", "test.mosquitto.org", 1883);
mqttoptions.set_keep_alive(5);

let (mut client, mut connection) = Client::new(mqttoptions, 10);
client.subscribe("hello/rumqtt", QoS::AtMostOnce).unwrap();
thread::spawn(move || for i in 0..10 {
   client.publish("hello/rumqtt", QoS::AtLeastOnce, false, vec![i; i as usize]).unwrap();
   thread::sleep(Duration::from_millis(100));
});

// Iterate to poll the eventloop for connection progress
for (i, notification) in connection.iter().enumerate() {
    println!("Notification = {:?}", notification);
}

A simple asynchronous publish and subscribe

use rumqttc::{MqttOptions, AsyncClient, QoS};
use async_std::task;
use std::time::Duration;
use std::error::Error;

let mut mqttoptions = MqttOptions::new("rumqtt-async", "test.mosquitto.org", 1883);
mqttoptions.set_keep_alive(5);

let (mut client, mut eventloop) = AsyncClient::new(mqttoptions, 10);
client.subscribe("hello/rumqtt", QoS::AtMostOnce).await.unwrap();

task::spawn(async move {
    for i in 0..10 {
        client.publish("hello/rumqtt", QoS::AtLeastOnce, false, vec![i; i as usize]).await.unwrap();
        task::sleep(Duration::from_millis(100)).await;
    }
});

loop {
    let notification = eventloop.poll().await.unwrap();
    println!("Received = {:?}", notification);
}

Quick overview of features

  • Eventloop orchestrates outgoing/incoming packets concurrently and hadles the state
  • Pings the broker when necessary and detects client side half open connections as well
  • Throttling of outgoing packets (todo)
  • Queue size based flow control on outgoing packets
  • Automatic reconnections by just continuing the eventloop.poll()/connection.iter() loop
  • Natural backpressure to client APIs during bad network
  • Immediate cancellation with client.cancel()

In short, everything necessary to maintain a robust connection

Since the eventloop is externally polled (with iter()/poll() in a loop) out side the library and Eventloop is accessible, users can

  • Distribute incoming messages based on topics
  • Stop it when required
  • Access internal state for use cases like graceful shutdown or to modify options before reconnection

Important notes

  • Looping on connection.iter()/eventloop.poll() is necessary to run the event loop and make progress. It yields incoming and outgoing activity notifications which allows customization as you see fit.

  • Blocking inside the connection.iter()/eventloop.poll() loop will block connection progress.

FAQ

Connecting to a broker using raw ip doesn't work

You cannot create a TLS connection to a bare IP address with a self-signed certificate. This is a limitation of rustls. One workaround, which only works under *nix/BSD-like systems, is to add an entry to wherever your DNS resolver looks (e.g. /etc/hosts) for the bare IP address and use that name in your code.

License: Apache-2.0

Commit count: 226

cargo fmt