| Crates.io | aimdb-mqtt-connector |
| lib.rs | aimdb-mqtt-connector |
| version | 0.4.0 |
| created_at | 2025-11-06 22:23:53.79167+00 |
| updated_at | 2025-12-25 21:00:03.07085+00 |
| description | MQTT connector for AimDB - bidirectional pub/sub for Tokio and Embassy runtimes |
| homepage | https://aimdb.dev |
| repository | https://github.com/aimdb-dev/aimdb |
| max_upload_size | |
| id | 1920826 |
| size | 113,290 |
MQTT connector for AimDB - supports both std (Tokio) and no_std (Embassy) environments.
⚠️ Important: For Embassy runtime support, this connector requires a patched version of mountain-mqtt with updated Embassy dependencies.
Add to your Cargo.toml:
[dependencies]
# For Tokio runtime (std)
aimdb-mqtt-connector = { version = "0.2", features = ["tokio-runtime"] }
# For Embassy runtime (embedded)
aimdb-mqtt-connector = { version = "0.2", features = ["embassy-runtime"] }
# REQUIRED for Embassy: Patch mountain-mqtt to match Embassy versions
[patch.crates-io]
mountain-mqtt = { git = "https://github.com/aimdb-dev/mountain-mqtt.git", branch = "main" }
mountain-mqtt-embassy = { git = "https://github.com/aimdb-dev/mountain-mqtt.git", branch = "main" }
Why the patch?
Tokio runtime users: The patch is optional but recommended for consistency.
aimdb-mqtt-connector provides MQTT publishing capabilities for AimDB records with automatic consumer registration. Works seamlessly across standard library (Tokio) and embedded (Embassy) environments.
Key Features:
Example:
use aimdb_core::{AimDbBuilder, buffer::BufferCfg};
use aimdb_tokio_adapter::{TokioAdapter, TokioRecordRegistrarExt};
use aimdb_mqtt_connector::MqttConnector;
use std::sync::Arc;
#[derive(Clone, serde::Serialize)]
struct Temperature {
celsius: f32,
sensor_id: String,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create runtime adapter
let runtime = Arc::new(TokioAdapter::new()?);
// Create MQTT connector
let mqtt = Arc::new(MqttConnector::new("mqtt://localhost:1883").await?);
// Build database with connector
let mut builder = AimDbBuilder::new()
.runtime(runtime)
.with_connector("mqtt", mqtt);
builder.configure::<Temperature>(|reg| {
reg.buffer(BufferCfg::SingleLatest)
.link("mqtt://sensors/temperature")
.with_serializer(|t| {
serde_json::to_vec(t)
.map_err(|_| aimdb_core::connector::SerializeError::InvalidData)
})
.finish();
});
builder.run().await
}
Add to your Cargo.toml:
[dependencies]
aimdb-core = { version = "0.1", default-features = false }
aimdb-embassy-adapter = { version = "0.1", default-features = false }
aimdb-mqtt-connector = { version = "0.1", default-features = false, features = ["embassy-runtime"] }
Example:
#![no_std]
#![no_main]
use aimdb_core::AimDbBuilder;
use aimdb_embassy_adapter::{EmbassyAdapter, EmbassyBufferType, EmbassyRecordRegistrarExt};
use aimdb_mqtt_connector::embassy_client::MqttConnectorBuilder;
use alloc::sync::Arc;
#[embassy_executor::main]
async fn main(spawner: Spawner) {
// Initialize network stack
let stack = /* ... */;
// Create runtime adapter with network stack access
let runtime = Arc::new(EmbassyAdapter::new_with_network(spawner, stack));
// Build database with MQTT connector - background tasks spawn automatically
let mut builder = AimDbBuilder::new()
.runtime(runtime)
.with_connector(MqttConnectorBuilder::new("mqtt://192.168.1.100:1883"));
builder.configure::<SensorData>(|reg| {
reg.buffer_sized::<4, 1>(EmbassyBufferType::SingleLatest)
.source(sensor_producer)
// Outbound: Publish to MQTT
.link_to("mqtt://data/sensor")
.with_serializer(|data| {
// Use postcard or similar no_std serializer
postcard::to_vec(data)
.map_err(|_| aimdb_core::connector::SerializeError::InvalidData)
})
.finish()
// Inbound: Subscribe from MQTT
.link_from("mqtt://commands/sensor")
.with_deserializer(|data| SensorCommand::from_bytes(data))
.finish();
});
let _db = builder.build().await.unwrap();
// Database and MQTT run in background automatically
loop {
// Main application loop
}
}
┌─────────────────────┐
│ AimDB Record │
│ (Temperature) │
└──────────┬──────────┘
│
▼
┌──────────────┐
│ Consumer │
│ (auto-reg) │
└──────┬───────┘
│
▼
┌──────────────────────┐
│ MQTT Connector │
│ - Serialize │
│ - Publish │
└──────────┬───────────┘
│
▼
MQTT Broker
// Tokio - simple broker URL
let mqtt = MqttConnector::new("mqtt://localhost:1883").await?;
// Tokio - secure connection
let mqtt = MqttConnector::new("mqtts://broker.example.com:8883").await?;
// Note: Client ID is auto-generated, credentials extracted from URL if provided
// Example with credentials: mqtt://username:password@broker:1883
// Basic link with JSON serialization
reg.link("mqtt://topic/path")
.with_serializer(|data| {
serde_json::to_vec(data)
.map_err(|_| aimdb_core::connector::SerializeError::InvalidData)
})
.finish();
// Override QoS and retain per link
reg.link("mqtt://topic/path")
.with_qos(1) // QoS 1
.with_retain(true) // Retain message
.with_serializer(|data| {
serde_json::to_vec(data)
.map_err(|_| aimdb_core::connector::SerializeError::InvalidData)
})
.finish();
// Simple topic
reg.link("mqtt://sensors/temperature")
// Nested topics
reg.link("mqtt://building/floor1/room5/temperature")
// Template-based (future feature)
reg.link("mqtt://sensors/{sensor_id}/temperature")
.with_serializer(|data| {
serde_json::to_vec(data)
.map_err(|_| aimdb_core::connector::SerializeError::InvalidData)
})
.with_serializer(|data| {
rmp_serde::to_vec(data)
.map_err(|_| aimdb_core::connector::SerializeError::InvalidData)
})
.with_serializer(|data| {
postcard::to_vec(data)
.map_err(|_| aimdb_core::connector::SerializeError::InvalidData)
})
.with_serializer(|data: &Temperature| {
let mut buf = Vec::new();
buf.extend_from_slice(&data.celsius.to_le_bytes());
buf.extend_from_slice(data.sensor_id.as_bytes());
Ok(buf)
})
MQTT Quality of Service levels are configured using integers:
.with_qos(0) // AtMostOnce - Fire and forget
.with_qos(1) // AtLeastOnce - Acknowledged delivery (default)
.with_qos(2) // ExactlyOnce - Assured delivery
Recommendations:
pub enum MqttError {
InvalidUrl(String),
ConnectionFailed(String),
PublishFailed(String),
SubscriptionFailed(String),
MissingConfig(String),
DbError(DbError),
}
Serialization errors are returned via SerializeError:
pub enum SerializeError {
InvalidData,
TypeMismatch,
}
The connector automatically handles reconnection. Serialization errors will be logged and the producer will continue operating.
## Features
```toml
[features]
tokio-runtime = ["dep:rumqttc", "dep:tokio"] # Tokio support
embassy-runtime = ["dep:mountain-mqtt"] # Embassy support
tracing = ["dep:tracing"] # Logging (std)
defmt = ["dep:defmt"] # Logging (embedded)
The connector automatically handles:
When broker is unavailable:
# Start MQTT broker
docker run -d -p 1883:1883 eclipse-mosquitto
# Run tests
cargo test -p aimdb-mqtt-connector --features tokio-runtime
# Cross-compile test
cargo build -p aimdb-mqtt-connector \
--target thumbv7em-none-eabihf \
--no-default-features \
--features embassy-runtime
let runtime = Arc::new(TokioAdapter::new()?);
let mqtt = Arc::new(MqttConnector::new("mqtt://localhost:1883").await?);
let mut builder = AimDbBuilder::new()
.runtime(runtime)
.with_connector("mqtt", mqtt);
builder.configure::<Temperature>(|reg| {
reg.buffer(BufferCfg::SingleLatest)
.link("mqtt://sensors/temperature")
.with_serializer(json_serializer)
.finish();
});
builder.configure::<Humidity>(|reg| {
reg.buffer(BufferCfg::SingleLatest)
.link("mqtt://sensors/humidity")
.with_serializer(json_serializer)
.finish();
});
builder.configure::<Pressure>(|reg| {
reg.buffer(BufferCfg::SingleLatest)
.link("mqtt://sensors/pressure")
.with_serializer(json_serializer)
.finish();
});
builder.configure::<HighPriorityAlert>(|reg| {
reg.buffer(BufferCfg::SingleLatest)
.link("mqtt://alerts/critical")
.with_qos(2) // QoS 2 - ExactlyOnce
.with_retain(true)
.with_serializer(json_serializer)
.finish();
});
builder.configure::<SensorTelemetry>(|reg| {
reg.buffer(BufferCfg::SpmcRing { capacity: 100 })
.link("mqtt://telemetry/raw")
.with_qos(0) // QoS 0 - AtMostOnce
.with_retain(false)
.with_serializer(json_serializer)
.finish();
});
See repository examples:
examples/tokio-mqtt-connector-demo - Full Tokio MQTT integrationexamples/embassy-mqtt-connector-demo - RP2040 with WiFi MQTTGenerate API docs:
cargo doc -p aimdb-mqtt-connector --open
See LICENSE file.