| Crates.io | mecha10-messaging |
| lib.rs | mecha10-messaging |
| version | 0.1.46 |
| created_at | 2025-11-24 17:44:27.868079+00 |
| updated_at | 2026-01-25 23:00:13.876065+00 |
| description | Redis Streams messaging layer for Mecha10 |
| homepage | |
| repository | https://github.com/mecha10/mecha10 |
| max_upload_size | |
| id | 1948255 |
| size | 97,479 |
Redis Streams-based pub/sub messaging system for inter-node communication in the Mecha10 framework.
ack()/nack() for retry logic*/camera/rgb)[dependencies]
mecha10-messaging = { path = "../messaging" }
tokio = { version = "1.35", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
use mecha10_messaging::MessageBus;
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize)]
struct LaserScan {
ranges: Vec<f32>,
timestamp: u64,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut bus = MessageBus::connect("redis://localhost:6379", "lidar-node").await?;
loop {
let scan = LaserScan {
ranges: vec![1.0, 2.0, 3.0, 4.0],
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_millis() as u64,
};
bus.publish("/scan", &scan).await?;
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
}
use mecha10_messaging::MessageBus;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut bus = MessageBus::connect("redis://localhost:6379", "slam-node").await?;
// Subscribe with consumer group
let mut rx = bus.subscribe::<LaserScan>("/scan", "slam-group").await?;
while let Some(msg) = rx.recv().await {
println!("Received scan from {}: {} points",
msg.publisher, msg.payload.ranges.len());
// Process the scan...
// Acknowledge successful processing
msg.ack().await?;
}
Ok(())
}
Topics are named channels for messages, following a hierarchical naming convention:
/scan # LiDAR scans
/camera/rgb # RGB camera frames
/camera/depth # Depth camera frames
/odom # Odometry
/cmd_vel # Velocity commands
Consumer groups enable load balancing - each message is delivered to only one consumer in the group:
// Node 1
let mut rx1 = bus.subscribe::<Task>("/tasks", "worker-group").await?;
// Node 2
let mut rx2 = bus.subscribe::<Task>("/tasks", "worker-group").await?;
// Each task goes to either Node 1 OR Node 2 (not both)
Different groups receive all messages:
// SLAM node
let mut rx_slam = bus.subscribe::<Scan>("/scan", "slam-group").await?;
// Logger node
let mut rx_log = bus.subscribe::<Scan>("/scan", "logger-group").await?;
// Both receive the same scans
Messages must be acknowledged after processing:
while let Some(msg) = rx.recv().await {
match process_message(&msg.payload) {
Ok(_) => {
msg.ack().await?; // Mark as successfully processed
}
Err(e) => {
eprintln!("Processing failed: {}", e);
msg.nack().await?; // Will be redelivered
}
}
}
Isolate multiple robots using the same Redis instance:
let mut bus = MessageBus::connect("redis://localhost:6379", "robot-1").await?;
bus.set_namespace("fleet-alpha");
// Messages published to "fleet-alpha:/scan"
bus.publish("/scan", &scan_data).await?;
MessageBusConnect:
let mut bus = MessageBus::connect(redis_url, node_id).await?;
Set namespace:
bus.set_namespace("my-fleet");
Publish:
bus.publish(topic, &payload).await?;
Subscribe:
let mut rx = bus.subscribe::<T>(topic, consumer_group).await?;
Subscribe with wildcard pattern:
let mut rx = bus.subscribe_pattern::<T>(pattern, consumer_group).await?;
Discover topics:
let topics = bus.discover_topics(redis_pattern).await?;
Close:
bus.close().await?;
Message<T>Fields:
pub struct Message<T> {
pub id: String, // Redis Stream ID
pub topic: String, // Topic name
pub publisher: String, // Publisher node ID
pub timestamp: u64, // Unix timestamp (ms)
pub payload: T, // Your data
}
Methods:
msg.ack().await?; // Acknowledge
msg.nack().await?; // Negative acknowledgment (retry)
Subscriber<T>Receive message:
let msg = rx.recv().await;
Get topic:
let topic = rx.topic();
Subscribe to multiple topics at once using wildcard patterns. This is essential for remote nodes that aggregate data from multiple robots:
use mecha10_messaging::MessageBus;
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize)]
struct CameraFrame {
data: Vec<u8>,
width: u32,
height: u32,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create a vision processing node that receives camera feeds from ALL robots
let mut bus = MessageBus::connect("redis://localhost:6379", "vision-processor").await?;
// Subscribe to camera/rgb topic from all robots using wildcard pattern
let mut rx = bus.subscribe_pattern::<CameraFrame>("*/camera/rgb", "vision-group").await?;
while let Some(msg) = rx.recv().await {
println!("Received frame from robot: {} (publisher: {})",
msg.topic, msg.publisher);
// Process the frame (e.g., run YOLO detection)
process_frame(&msg.payload).await?;
msg.ack().await?;
}
Ok(())
}
Pattern Syntax:
*/camera/rgb - Matches robot-1/camera/rgb, robot-2/camera/rgb, etc.robot-*/scan - Matches robot-1/scan, robot-alpha/scan, etc.*/sensor/* - Matches any topic with "sensor" in the middle segmentUse Cases:
Load Balancing with Wildcards:
// Multiple vision processors share the workload
// Node 1
let mut rx1 = bus.subscribe_pattern::<Frame>("*/camera/rgb", "vision-group").await?;
// Node 2
let mut rx2 = bus.subscribe_pattern::<Frame>("*/camera/rgb", "vision-group").await?;
// Messages from all robots are load-balanced across Node 1 and Node 2
let mut bus = MessageBus::connect("redis://localhost:6379", "multi-node").await?;
// Subscribe to multiple topics
let mut rx_scan = bus.subscribe::<LaserScan>("/scan", "processing").await?;
let mut rx_odom = bus.subscribe::<Odometry>("/odom", "processing").await?;
let mut rx_cmd = bus.subscribe::<Twist>("/cmd_vel", "execution").await?;
// Use tokio::select! to handle multiple streams
loop {
tokio::select! {
Some(msg) = rx_scan.recv() => {
process_scan(&msg.payload).await?;
msg.ack().await?;
}
Some(msg) = rx_odom.recv() => {
process_odom(&msg.payload).await?;
msg.ack().await?;
}
Some(msg) = rx_cmd.recv() => {
process_cmd(&msg.payload).await?;
msg.ack().await?;
}
}
}
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize)]
struct RobotState {
position: (f32, f32, f32),
velocity: (f32, f32, f32),
battery: f32,
mode: String,
}
let state = RobotState {
position: (1.0, 2.0, 0.5),
velocity: (0.5, 0.0, 0.1),
battery: 87.5,
mode: "autonomous".to_string(),
};
bus.publish("/state", &state).await?;
cargo test
Start Redis:
docker run -d -p 6379:6379 redis:latest
Run tests:
cargo test -- --ignored
Tests included:
test_pub_sub() - Basic publish/subscribetest_multiple_subscribers() - Consumer group behavioruse mecha10_messaging::MessagingError;
match bus.publish("/scan", &data).await {
Ok(_) => println!("Published"),
Err(MessagingError::Redis(e)) => eprintln!("Redis error: {}", e),
Err(MessagingError::Serialization(e)) => eprintln!("Serialization error: {}", e),
Err(MessagingError::Connection(msg)) => eprintln!("Connection error: {}", msg),
Err(e) => eprintln!("Error: {}", e),
}
The messaging system includes built-in resilience features to handle transient failures automatically:
When creating a subscription, the framework automatically retries on failure with exponential backoff:
// Automatic retry - no code changes needed!
let mut rx = bus.subscribe::<LaserScan>("/scan", "processing").await?;
// ⚠️ If consumer group creation fails, automatically retries up to 5 times
// ✅ Succeeds on first retry that works
// ❌ Returns error after all retries fail
What you'll see in logs:
⚠️ Failed to create consumer group 'slam--scan' for topic 'mecha10:/scan' (attempt 1/5): Connection reset. Retrying in 100ms...
⚠️ Failed to create consumer group 'slam--scan' for topic 'mecha10:/scan' (attempt 2/5): Connection reset. Retrying in 200ms...
✅ Created consumer group 'slam--scan' for topic 'mecha10:/scan'
The background subscription task automatically recovers from consumer group failures:
// Your subscription keeps working even if:
// - Redis gets flushed (FLUSHALL)
// - Consumer group gets manually deleted
// - Redis restarts
while let Some(msg) = rx.recv().await {
process(&msg.payload).await?;
msg.ack().await?;
}
// Background task automatically recovers and continues delivering messages
What you'll see in logs:
❌ XREAD failed for topic 'mecha10:/scan' group 'slam--scan': NOGROUP No such consumer group
🔄 Consumer group 'slam--scan' missing, attempting to recreate...
✅ Recreated consumer group 'slam--scan' for topic 'mecha10:/scan'
subscribe() to handle permanent failuresIf subscriptions fail even with retries, check:
redis-cli -h <host> -p <port> pingUse these Redis commands to inspect subscriptions:
# List all consumer groups for a topic
redis-cli XINFO GROUPS "mecha10:/scan"
# Check pending messages in a group
redis-cli XPENDING "mecha10:/scan" "slam--scan"
# List consumers in a group
redis-cli XINFO CONSUMERS "mecha10:/scan" "slam--scan"
┌─────────────┐
│ Node A │
│ (Publisher) │
└──────┬──────┘
│ publish("/scan")
▼
┌─────────────────────┐
│ Redis Streams │
│ Topic: /scan │
└──────┬──────┬───────┘
│ │
▼ ▼
┌──────────┐ ┌──────────┐
│ Node B │ │ Node C │
│ (SLAM) │ │ (Logger) │
└──────────┘ └──────────┘
See QUICKSTART.md for complete examples.
MIT