| Crates.io | convoy |
| lib.rs | convoy |
| version | 0.1.0 |
| created_at | 2025-10-12 21:14:30.520862+00 |
| updated_at | 2025-10-12 21:14:30.520862+00 |
| description | A reliable MQTT bridge with SQLite message caching - for edge devices with patchy connectivity |
| homepage | https://github.com/yammpio/edge-convoy |
| repository | https://github.com/ammpio/edge-convoy |
| max_upload_size | |
| id | 1879710 |
| size | 169,258 |
A reliable MQTT bridge with SQLite message caching - for edge devices with patchy connectivity.
This is an opinionated implementation. Currently it has the following limitations:
native-tls and SQLite support via system library - to reduce binary size for edge environments+, #)cargo build --release
Copy the example configuration and edit it:
cp config.example.toml config.toml
# Edit config.toml with your broker settings
./target/release/convoy --config config.toml
Or with debug logging:
./target/release/convoy --config config.toml --log-level debug
Add to your Cargo.toml (no default features suppresses default CLI dependencies):
[dependencies]
convoy = { version = "0.1", default-features = false }
Example usage (see examples/programmatic.rs for a complete example):
use convoy::{Bridge, BridgeConfig, BrokerConfig, CacheConfig, CacheManager, ForwardRule, TlsConfig};
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Configure bridge programmatically
let bridge_config = BridgeConfig {
local: BrokerConfig {
addr: "127.0.0.1:1883".to_string(),
client_id: "convoy-local".to_string(),
keep_alive_secs: 30,
clean_session: false,
max_inflight: 100,
username: None,
password: None,
tls: None,
},
remote: BrokerConfig {
addr: "mqtt.example.com:8883".to_string(),
client_id: "convoy-remote".to_string(),
keep_alive_secs: 30,
clean_session: false,
max_inflight: 100,
username: Some("user".to_string()),
password: Some("pass".to_string()),
tls: Some(TlsConfig {
ca_file: Some("/etc/ssl/certs/ca-certificates.crt".into()),
client_cert: None,
client_password: None,
danger_accept_invalid_certs: false,
}),
},
state_topic: "bridge/state".to_string(),
state_online_payload: "1".to_string(),
state_offline_payload: "0".to_string(),
forward: vec![ForwardRule {
local_filter: "sensors/#".to_string(),
remote_prefix: "devices/edge1/".to_string(),
qos: 1,
}],
subscribe: vec![],
};
// Configure cache with defaults
let cache_config = CacheConfig {
sqlite_path: "/tmp/convoy-cache.db".into(),
..Default::default()
};
// Create cache and bridge
let cache = Arc::new(CacheManager::new(cache_config)?);
let bridge = Bridge::new(bridge_config, cache).await?;
// Run bridge
bridge.run().await?;
Ok(())
}
Run the example:
cargo run --example programmatic
See config.example.toml for a fully documented configuration template. Key sections:
[bridge]
# Local broker (no auth/TLS)
local_addr = "127.0.0.1:1883"
local_client_id = "convoy-local"
# Remote broker (TLS + auth)
remote_addr = "mqtt.example.com:8883"
remote_client_id = "convoy-remote"
remote_username = "device01"
remote_password = "secret"
# Bridge state topic (published to remote)
state_topic = "bridge/convoy/state"
state_online_payload = "1"
state_offline_payload = "0"
Messages are cached if remote is unavailable:
[[bridge.forward]]
local_filter = "sensors/#"
remote_prefix = "devices/edge1/"
qos = 1
Example: Local sensors/temp → Remote devices/edge1/sensors/temp
Messages are NOT cached (real-time only):
[[bridge.subscribe]]
remote_filter = "commands/edge1/#"
remote_prefix = "commands/edge1/"
qos = 1
Example: Remote commands/edge1/restart → Local restart
[cache]
sqlite_path = "/var/lib/convoy/cache.sqlite"
max_rows = 500000 # Maximum cached messages
eviction = "drop_oldest" # or "reject_new"
flush_batch = 1000 # Messages per replay batch
flush_interval_ms = 100 # Replay interval
Local → Remote (with caching):
Remote → Local (no caching):
Wildcards:
+ matches single level: sensors/+/temp matches sensors/room1/temp# matches multiple levels: data/# matches data/sensor/temp/valuePrefix mapping:
remote_prefix to local topicremote_prefix from remote topicstate_topic (retained)state_topic┌──────────────────┐
│ Local Broker │ (localhost:1883, no auth)
│ (Mosquitto) │
└────────┬─────────┘
│
┌────▼────┐
│ Bridge │ (rumqttc clients)
│ Client │
└────┬────┘
│
┌────▼────┐
│ SQLite │ (cache A→B only)
│ Cache │
└────┬────┘
│
┌────────▼─────────┐
│ Remote Broker │ (TLS, port 8883)
│ (e.g. AWS IoT) │
└──────────────────┘
cli (default): Enables the command-line interface with TOML config file support
clap, toml, tracing-subscriberdefault-features = false)Run unit tests:
cargo test
For integration testing with actual MQTT brokers, see SPECS.md section 6.
convoy [OPTIONS]
Options:
-c, --config <CONFIG> Path to config file [default: config.toml]
-l, --log-level <LOG_LEVEL> Log level (trace|debug|info|warn|error) [default: info]
-h, --help Print help
See SPECS.md for design documentation and acceptance criteria.