Crates.io | rmqtt-net |
lib.rs | rmqtt-net |
version | 0.2.0 |
created_at | 2025-04-17 14:49:32.361194+00 |
updated_at | 2025-09-14 10:29:02.76058+00 |
description | Basic Implementation of MQTT Server |
homepage | |
repository | https://github.com/rmqtt/rmqtt/tree/master/rmqtt-net |
max_upload_size | |
id | 1637933 |
size | 122,840 |
🔌 rmqtt-net provides a foundational implementation of an MQTT server network layer, supporting MQTT v3.1.1 and v5.0 protocols over TCP, TLS, and WebSocket transports. It is designed for flexibility, performance, and easy integration into custom broker logic.
rustls
] backend (AWS-LC or ring, depending on platform)Builder
-based API for configuration and extensibilityuse rmqtt_net::{Builder, ListenerType};
use std::net::SocketAddr;
#[tokio::main]
async fn main() -> Result<()> {
SimpleLogger::new().with_level(log::LevelFilter::Info).init()?;
let tcp_listener =
Builder::new().name("external/tcp").laddr(([0, 0, 0, 0], 1883).into()).bind()?.tcp()?;
let tcp = async {
loop {
match tcp_listener.accept().await {
Ok(a) => {
tokio::spawn(async move {
log::info!("tcp {:?}", a.remote_addr);
let d = match a.tcp() {
Ok(d) => d,
Err(e) => {
log::warn!("Failed to mqtt(tcp) accept, {:?}", e);
return;
}
};
match d.mqtt().await {
Ok(MqttStream::V3(s)) => {
if let Err(e) = process_v3(s).await {
log::warn!("Failed to process mqtt v3, {:?}", e);
}
}
Ok(MqttStream::V5(s)) => {
if let Err(e) = process_v5(s).await {
log::warn!("Failed to process mqtt v5, {:?}", e);
}
}
Err(e) => {
log::warn!("Failed to probe MQTT version, {:?}", e);
}
}
});
}
Err(e) => {
log::warn!("Failed to accept TCP socket connection, {:?}", e);
sleep(Duration::from_millis(300)).await;
}
}
}
};
tcp.await;
Ok(())
}
async fn process_v3<Io>(mut s: MqttStreamV3<Io>) -> Result<()>
where
Io: AsyncRead + AsyncWrite + Unpin,
{
...
Ok(())
}
async fn process_v5<Io>(mut s: MqttStreamV5<Io>) -> Result<()>
where
Io: AsyncRead + AsyncWrite + Unpin,
{
...
Ok(())
}
Add rmqtt-net
to your Cargo.toml
, with optional TLS/WebSocket support:
[dependencies]
rmqtt-net = { version = "0.1", features = ["tls", "ws"] }
tls
: Enables TLS support using rustls
ws
: Enables WebSocket transportBuilder
/ Listener
– Configure and bind MQTT listenersMqttStream
– Abstract stream wrapper supporting v3/v5 logicMqttError
– Common error type for network operationstls_provider
aliasaws-lc-rs
as TLS backendring
as TLS backend