mecha10-messaging

Crates.iomecha10-messaging
lib.rsmecha10-messaging
version0.1.46
created_at2025-11-24 17:44:27.868079+00
updated_at2026-01-25 23:00:13.876065+00
descriptionRedis Streams messaging layer for Mecha10
homepage
repositoryhttps://github.com/mecha10/mecha10
max_upload_size
id1948255
size97,479
Peter C (PeterChauYEG)

documentation

README

@mecha10/messaging

Redis Streams-based pub/sub messaging system for inter-node communication in the Mecha10 framework.

Features

  • Redis Streams Backend - Reliable message delivery with persistence
  • Type-Safe Messaging - Generic types for compile-time safety
  • Consumer Groups - Load balancing across multiple nodes
  • Message Acknowledgment - ack()/nack() for retry logic
  • Auto-Reconnection - Automatic connection recovery
  • Resilient Subscriptions - Automatic retry with exponential backoff
  • Self-Healing - Auto-recovery from consumer group failures
  • Namespace Support - Multi-robot fleet isolation
  • Wildcard Subscriptions - Subscribe to multiple topics with patterns (e.g., */camera/rgb)

Installation

[dependencies]
mecha10-messaging = { path = "../messaging" }
tokio = { version = "1.35", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }

Quick Start

Publisher

use mecha10_messaging::MessageBus;
use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize)]
struct LaserScan {
    ranges: Vec<f32>,
    timestamp: u64,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut bus = MessageBus::connect("redis://localhost:6379", "lidar-node").await?;

    loop {
        let scan = LaserScan {
            ranges: vec![1.0, 2.0, 3.0, 4.0],
            timestamp: std::time::SystemTime::now()
                .duration_since(std::time::UNIX_EPOCH)?
                .as_millis() as u64,
        };

        bus.publish("/scan", &scan).await?;

        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
    }
}

Subscriber

use mecha10_messaging::MessageBus;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut bus = MessageBus::connect("redis://localhost:6379", "slam-node").await?;

    // Subscribe with consumer group
    let mut rx = bus.subscribe::<LaserScan>("/scan", "slam-group").await?;

    while let Some(msg) = rx.recv().await {
        println!("Received scan from {}: {} points",
            msg.publisher, msg.payload.ranges.len());

        // Process the scan...

        // Acknowledge successful processing
        msg.ack().await?;
    }

    Ok(())
}

Core Concepts

Topics

Topics are named channels for messages, following a hierarchical naming convention:

/scan           # LiDAR scans
/camera/rgb     # RGB camera frames
/camera/depth   # Depth camera frames
/odom           # Odometry
/cmd_vel        # Velocity commands

Consumer Groups

Consumer groups enable load balancing - each message is delivered to only one consumer in the group:

// Node 1
let mut rx1 = bus.subscribe::<Task>("/tasks", "worker-group").await?;

// Node 2
let mut rx2 = bus.subscribe::<Task>("/tasks", "worker-group").await?;

// Each task goes to either Node 1 OR Node 2 (not both)

Different groups receive all messages:

// SLAM node
let mut rx_slam = bus.subscribe::<Scan>("/scan", "slam-group").await?;

// Logger node
let mut rx_log = bus.subscribe::<Scan>("/scan", "logger-group").await?;

// Both receive the same scans

Message Acknowledgment

Messages must be acknowledged after processing:

while let Some(msg) = rx.recv().await {
    match process_message(&msg.payload) {
        Ok(_) => {
            msg.ack().await?; // Mark as successfully processed
        }
        Err(e) => {
            eprintln!("Processing failed: {}", e);
            msg.nack().await?; // Will be redelivered
        }
    }
}

Namespaces

Isolate multiple robots using the same Redis instance:

let mut bus = MessageBus::connect("redis://localhost:6379", "robot-1").await?;
bus.set_namespace("fleet-alpha");

// Messages published to "fleet-alpha:/scan"
bus.publish("/scan", &scan_data).await?;

API Reference

MessageBus

Connect:

let mut bus = MessageBus::connect(redis_url, node_id).await?;

Set namespace:

bus.set_namespace("my-fleet");

Publish:

bus.publish(topic, &payload).await?;

Subscribe:

let mut rx = bus.subscribe::<T>(topic, consumer_group).await?;

Subscribe with wildcard pattern:

let mut rx = bus.subscribe_pattern::<T>(pattern, consumer_group).await?;

Discover topics:

let topics = bus.discover_topics(redis_pattern).await?;

Close:

bus.close().await?;

Message<T>

Fields:

pub struct Message<T> {
    pub id: String,          // Redis Stream ID
    pub topic: String,       // Topic name
    pub publisher: String,   // Publisher node ID
    pub timestamp: u64,      // Unix timestamp (ms)
    pub payload: T,          // Your data
}

Methods:

msg.ack().await?;   // Acknowledge
msg.nack().await?;  // Negative acknowledgment (retry)

Subscriber<T>

Receive message:

let msg = rx.recv().await;

Get topic:

let topic = rx.topic();

Advanced Usage

Wildcard Subscriptions

Subscribe to multiple topics at once using wildcard patterns. This is essential for remote nodes that aggregate data from multiple robots:

use mecha10_messaging::MessageBus;
use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize)]
struct CameraFrame {
    data: Vec<u8>,
    width: u32,
    height: u32,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create a vision processing node that receives camera feeds from ALL robots
    let mut bus = MessageBus::connect("redis://localhost:6379", "vision-processor").await?;

    // Subscribe to camera/rgb topic from all robots using wildcard pattern
    let mut rx = bus.subscribe_pattern::<CameraFrame>("*/camera/rgb", "vision-group").await?;

    while let Some(msg) = rx.recv().await {
        println!("Received frame from robot: {} (publisher: {})",
            msg.topic, msg.publisher);

        // Process the frame (e.g., run YOLO detection)
        process_frame(&msg.payload).await?;

        msg.ack().await?;
    }

    Ok(())
}

Pattern Syntax:

  • */camera/rgb - Matches robot-1/camera/rgb, robot-2/camera/rgb, etc.
  • robot-*/scan - Matches robot-1/scan, robot-alpha/scan, etc.
  • */sensor/* - Matches any topic with "sensor" in the middle segment

Use Cases:

  • Centralized Vision Processing: One node processes camera feeds from entire fleet
  • Fleet-wide Logging: Single logger consumes logs from all robots
  • Cross-Robot Monitoring: Dashboard aggregates metrics from multiple robots
  • Multi-Robot Coordination: Coordinator receives state updates from all robots

Load Balancing with Wildcards:

// Multiple vision processors share the workload
// Node 1
let mut rx1 = bus.subscribe_pattern::<Frame>("*/camera/rgb", "vision-group").await?;

// Node 2
let mut rx2 = bus.subscribe_pattern::<Frame>("*/camera/rgb", "vision-group").await?;

// Messages from all robots are load-balanced across Node 1 and Node 2

Multiple Subscribers

let mut bus = MessageBus::connect("redis://localhost:6379", "multi-node").await?;

// Subscribe to multiple topics
let mut rx_scan = bus.subscribe::<LaserScan>("/scan", "processing").await?;
let mut rx_odom = bus.subscribe::<Odometry>("/odom", "processing").await?;
let mut rx_cmd = bus.subscribe::<Twist>("/cmd_vel", "execution").await?;

// Use tokio::select! to handle multiple streams
loop {
    tokio::select! {
        Some(msg) = rx_scan.recv() => {
            process_scan(&msg.payload).await?;
            msg.ack().await?;
        }
        Some(msg) = rx_odom.recv() => {
            process_odom(&msg.payload).await?;
            msg.ack().await?;
        }
        Some(msg) = rx_cmd.recv() => {
            process_cmd(&msg.payload).await?;
            msg.ack().await?;
        }
    }
}

Custom Message Types

use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize)]
struct RobotState {
    position: (f32, f32, f32),
    velocity: (f32, f32, f32),
    battery: f32,
    mode: String,
}

let state = RobotState {
    position: (1.0, 2.0, 0.5),
    velocity: (0.5, 0.0, 0.1),
    battery: 87.5,
    mode: "autonomous".to_string(),
};

bus.publish("/state", &state).await?;

Testing

Unit Tests

cargo test

Integration Tests (Requires Redis)

Start Redis:

docker run -d -p 6379:6379 redis:latest

Run tests:

cargo test -- --ignored

Tests included:

  • test_pub_sub() - Basic publish/subscribe
  • test_multiple_subscribers() - Consumer group behavior

Error Handling

use mecha10_messaging::MessagingError;

match bus.publish("/scan", &data).await {
    Ok(_) => println!("Published"),
    Err(MessagingError::Redis(e)) => eprintln!("Redis error: {}", e),
    Err(MessagingError::Serialization(e)) => eprintln!("Serialization error: {}", e),
    Err(MessagingError::Connection(msg)) => eprintln!("Connection error: {}", msg),
    Err(e) => eprintln!("Error: {}", e),
}

Performance Tips

  1. Batch Publishing - Publish multiple messages in quick succession
  2. Consumer Groups - Scale processing by adding more consumers
  3. Acknowledgment - Always ack messages to avoid redelivery
  4. Namespace - Isolate fleets to reduce cross-talk

Resilience & Error Recovery

The messaging system includes built-in resilience features to handle transient failures automatically:

Subscription Retry Logic

When creating a subscription, the framework automatically retries on failure with exponential backoff:

  • Up to 5 retry attempts with delays: 100ms, 200ms, 400ms, 800ms, 1600ms
  • Handles "BUSYGROUP" gracefully - recognizes when consumer group already exists
  • Detailed logging - warns on each retry attempt with error details
  • Fail-fast with clear errors - returns error after all retries exhausted
// Automatic retry - no code changes needed!
let mut rx = bus.subscribe::<LaserScan>("/scan", "processing").await?;
// ⚠️  If consumer group creation fails, automatically retries up to 5 times
// ✅ Succeeds on first retry that works
// ❌ Returns error after all retries fail

What you'll see in logs:

⚠️  Failed to create consumer group 'slam--scan' for topic 'mecha10:/scan' (attempt 1/5): Connection reset. Retrying in 100ms...
⚠️  Failed to create consumer group 'slam--scan' for topic 'mecha10:/scan' (attempt 2/5): Connection reset. Retrying in 200ms...
✅ Created consumer group 'slam--scan' for topic 'mecha10:/scan'

Self-Healing Subscriptions

The background subscription task automatically recovers from consumer group failures:

  • Detects "NOGROUP" errors - recognizes when consumer group has been deleted
  • Auto-recreates consumer groups - attempts to recreate missing groups
  • Continues retrying - keeps trying to reconnect every 1 second
  • No manual intervention - subscriptions heal themselves
// Your subscription keeps working even if:
// - Redis gets flushed (FLUSHALL)
// - Consumer group gets manually deleted
// - Redis restarts
while let Some(msg) = rx.recv().await {
    process(&msg.payload).await?;
    msg.ack().await?;
}
// Background task automatically recovers and continues delivering messages

What you'll see in logs:

❌ XREAD failed for topic 'mecha10:/scan' group 'slam--scan': NOGROUP No such consumer group
🔄 Consumer group 'slam--scan' missing, attempting to recreate...
✅ Recreated consumer group 'slam--scan' for topic 'mecha10:/scan'

Best Practices

  1. Monitor logs - Watch for retry warnings to identify infrastructure issues
  2. Set proper timeouts - Ensure your application can tolerate brief subscription delays
  3. Handle subscription errors - Catch errors on subscribe() to handle permanent failures
  4. Don't manually delete consumer groups - Let the framework manage them

Debugging Subscription Issues

If subscriptions fail even with retries, check:

  1. Redis connectivity: redis-cli -h <host> -p <port> ping
  2. Redis version: Requires Redis 5.0+ for Streams support
  3. Redis memory: Ensure sufficient memory for consumer groups
  4. Permissions: Check Redis ACLs if using authentication

Use these Redis commands to inspect subscriptions:

# List all consumer groups for a topic
redis-cli XINFO GROUPS "mecha10:/scan"

# Check pending messages in a group
redis-cli XPENDING "mecha10:/scan" "slam--scan"

# List consumers in a group
redis-cli XINFO CONSUMERS "mecha10:/scan" "slam--scan"

Architecture

┌─────────────┐
│   Node A    │
│ (Publisher) │
└──────┬──────┘
       │ publish("/scan")
       ▼
┌─────────────────────┐
│   Redis Streams     │
│   Topic: /scan      │
└──────┬──────┬───────┘
       │      │
       ▼      ▼
┌──────────┐ ┌──────────┐
│  Node B  │ │  Node C  │
│ (SLAM)   │ │ (Logger) │
└──────────┘ └──────────┘

Examples

See QUICKSTART.md for complete examples.

License

MIT

Commit count: 0

cargo fmt