Crates.io | rmqtt |
lib.rs | rmqtt |
version | 0.15.0 |
created_at | 2022-10-26 08:04:59.959785+00 |
updated_at | 2025-08-28 09:16:44.733741+00 |
description | MQTT Server for v3.1, v3.1.1 and v5.0 protocols |
homepage | |
repository | https://github.com/rmqtt/rmqtt.git |
max_upload_size | |
id | 697622 |
size | 599,296 |
A high-performance, asynchronous MQTT server library built with Tokio. rmqtt-server
is designed for flexibility, allowing you to configure multiple listeners with different protocols and security settings. Ideal for building custom or embedded MQTT services in Rust.
A basic MQTT server with RMQTT.
[dependencies]
rmqtt = "0.15"
tokio = { version = "1", features = ["full"] }
simple_logger = "5"
log = "0.4"
Then, on your main.rs:
use rmqtt::{context::ServerContext, net::Builder, server::MqttServer, Result};
use simple_logger::SimpleLogger;
#[tokio::main]
async fn main() -> Result<()> {
SimpleLogger::new().with_level(log::LevelFilter::Info).init()?;
let scx = ServerContext::new().build().await;
MqttServer::new(scx)
.listener(Builder::new().name("external/tcp").laddr(([0, 0, 0, 0], 1883).into()).bind()?.tcp()?)
.build()
.run()
.await?;
Ok(())
}
A flexible MQTT server with multi-protocol and multi-listener support using RMQTT.
Make sure you included the rmqtt crate with the required features in your Cargo.toml:
[dependencies]
rmqtt = { version = "0.15", features = ["ws", "tls"] }
tokio = { version = "1", features = ["full"] }
simple_logger = "5"
log = "0.4"
use rmqtt::{context::ServerContext, net::Builder, server::MqttServer, Result};
use simple_logger::SimpleLogger;
#[tokio::main]
async fn main() -> Result<()> {
SimpleLogger::new().with_level(log::LevelFilter::Info).init()?;
let scx = ServerContext::new().build().await;
MqttServer::new(scx)
.listener(Builder::new().name("external/tcp").laddr(([0, 0, 0, 0], 1883).into()).bind()?.tcp()?)
.listener(Builder::new().name("internal/tcp").laddr(([0, 0, 0, 0], 11883).into()).bind()?.tcp()?)
.listener(
Builder::new()
.name("external/tls")
.laddr(([0, 0, 0, 0], 8883).into())
.tls_key(Some("./rmqtt-bin/rmqtt.key"))
.tls_cert(Some("./rmqtt-bin/rmqtt.pem"))
.bind()?
.tls()?,
)
.listener(Builder::new().name("external/ws").laddr(([0, 0, 0, 0], 8080).into()).bind()?.ws()?)
.listener(
Builder::new()
.name("external/wss")
.laddr(([0, 0, 0, 0], 8443).into())
.tls_key(Some("./rmqtt-bin/rmqtt.key"))
.tls_cert(Some("./rmqtt-bin/rmqtt.pem"))
.bind()?
.wss()?,
)
.build()
.run()
.await?;
Ok(())
}
An MQTT server with plugin support using RMQTT.
Make sure you included the rmqtt crate with the required features in your Cargo.toml:
[dependencies]
rmqtt = { version = "0.15", features = ["plugin"] }
rmqtt-plugins = { version = "0.15", features = ["full"] }
tokio = { version = "1", features = ["full"] }
simple_logger = "5"
log = "0.4"
use rmqtt::{context::ServerContext, net::Builder, server::MqttServer, Result};
use simple_logger::SimpleLogger;
#[tokio::main]
async fn main() -> Result<()> {
SimpleLogger::new().with_level(log::LevelFilter::Info).init()?;
let scx = ServerContext::new().plugins_config_dir("rmqtt-plugins/").build().await;
rmqtt_plugins::acl::register(&scx, true, false).await?;
rmqtt_plugins::http_api::register(&scx, true, false).await?;
rmqtt_plugins::retainer::register(&scx, true, false).await?;
MqttServer::new(scx)
.listener(
Builder::new()
.name("external/tcp")
.laddr(([0, 0, 0, 0], 1883).into())
.allow_anonymous(false)
.bind()?
.tcp()?,
)
.build()
.run()
.await?;
Ok(())
}
or
[dependencies]
rmqtt = { version = "0.15", features = ["plugin"] }
rmqtt-plugins = { version = "0.15", features = ["full"] }
tokio = { version = "1", features = ["full"] }
simple_logger = "5"
log = "0.4"
use rmqtt::{context::ServerContext, net::Builder, server::MqttServer, Result};
use simple_logger::SimpleLogger;
#[tokio::main]
async fn main() -> Result<()> {
SimpleLogger::new().with_level(log::LevelFilter::Info).init()?;
let rules = r###"rules = [
["allow", { user = "dashboard" }, "subscribe", ["$SYS/#"]],
["allow", { ipaddr = "127.0.0.1" }, "pubsub", ["$SYS/#", "#"]],
["deny", "all", "subscribe", ["$SYS/#", { eq = "#" }]],
["allow", "all"]
]"###;
let scx = ServerContext::new()
.plugins_config_dir("rmqtt-plugins/")
.plugins_config_map_add("rmqtt-acl", rules)
.build()
.await;
rmqtt_plugins::acl::register(&scx, true, false).await?;
rmqtt_plugins::http_api::register(&scx, true, false).await?;
rmqtt_plugins::retainer::register(&scx, true, false).await?;
MqttServer::new(scx)
.listener(
Builder::new()
.name("external/tcp")
.laddr(([0, 0, 0, 0], 1883).into())
.allow_anonymous(false)
.bind()?
.tcp()?,
)
.build()
.run()
.await?;
Ok(())
}
or
[dependencies]
rmqtt = { version = "0.15", features = ["plugin"] }
rmqtt-acl = "0.15"
rmqtt-retainer = "0.15"
rmqtt-http-api = "0.15"
rmqtt-web-hook = "0.15"
tokio = { version = "1", features = ["full"] }
simple_logger = "5"
log = "0.4"
use rmqtt::{context::ServerContext, net::Builder, server::MqttServer, Result};
use simple_logger::SimpleLogger;
#[tokio::main]
async fn main() -> Result<()> {
SimpleLogger::new().with_level(log::LevelFilter::Info).init()?;
// put plugin config files in ./rmqtt-plugins/ , like rmqtt_web_hook.toml
let scx = ServerContext::new().plugins_config_dir("rmqtt-plugins/").build().await;
// or load by string
// let scx = ServerContext::new().plugins_config_map(load_config()).build().await;
rmqtt_acl::register(&scx, true, false).await?;
rmqtt_retainer::register(&scx, true, false).await?;
rmqtt_http_api::register(&scx, true, false).await?;
rmqtt_web_hook::register(&scx, true, false).await?;
MqttServer::new(scx)
.listener(Builder::new().name("external/tcp").laddr(([0, 0, 0, 0], 1883).into()).bind()?.tcp()?)
.build()
.run()
.await?;
Ok(())
}
use ahash::HashMapExt;
fn load_config() -> rmqtt::types::HashMap<String, String> {
let mut config = HashMap::new();
config.insert("rmqtt_web_hook".to_owned(), r#"worker_threads = 3
queue_capacity = 300_000
concurrency_limit = 128"#.to_owned());
return config;
}
More examples can be found here. For a larger "real world" example, see the rmqtt repository.