mockforge-mqtt

Crates.iomockforge-mqtt
lib.rsmockforge-mqtt
version0.3.31
created_at2025-10-16 20:28:19.21112+00
updated_at2026-01-04 23:39:39.877889+00
descriptionMQTT protocol support for MockForge
homepagehttps://mockforge.dev
repositoryhttps://github.com/SaaSy-Solutions/mockforge
max_upload_size
id1886725
size385,989
Ray Clanan (rclanan)

documentation

https://docs.rs/mockforge

README

MockForge MQTT

MQTT protocol support for MockForge with full broker simulation, topic management, and QoS handling.

This crate provides comprehensive MQTT mocking capabilities for IoT applications, pub/sub systems, and message queue testing. Perfect for testing MQTT clients, brokers, and IoT device communication without requiring external MQTT infrastructure.

Features

  • Full MQTT Broker: Complete MQTT 3.1.1 and 5.0 protocol support
  • Topic Management: Hierarchical topic structure with wildcards
  • QoS Levels: Support for QoS 0, 1, and 2 message delivery
  • Session Management: Persistent sessions and clean session handling
  • Retained Messages: Store and deliver retained messages
  • Will Messages: Last will and testament message handling
  • Authentication: Configurable client authentication
  • Metrics & Monitoring: Comprehensive MQTT metrics collection
  • Fixture System: YAML-based message templates and auto-publishing

Quick Start

Basic MQTT Broker

use mockforge_mqtt::{MqttBroker, MqttConfig};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create broker configuration
    let config = MqttConfig {
        host: "127.0.0.1".to_string(),
        port: 1883,
        ..Default::default()
    };

    // Initialize broker
    let spec_registry = Arc::new(MqttSpecRegistry::new());
    let broker = MqttBroker::new(config, spec_registry);

    // Start the broker (this would typically run in a separate task)
    // broker.start().await?;

    Ok(())
}

Testing with MQTT Clients

use rumqttc::{AsyncClient, MqttOptions, QoS};
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Connect to MockForge MQTT broker
    let mut mqtt_options = MqttOptions::new("test-client", "localhost", 1883);
    mqtt_options.set_keep_alive(Duration::from_secs(5));

    let (client, mut eventloop) = AsyncClient::new(mqtt_options, 10);

    // Subscribe to a topic
    client.subscribe("sensors/temperature", QoS::AtMostOnce).await?;

    // Publish a message
    client.publish("sensors/temperature", QoS::AtLeastOnce, false, "23.5").await?;

    // Handle events
    loop {
        match eventloop.poll().await {
            Ok(notification) => {
                println!("Received: {:?}", notification);
            }
            Err(e) => {
                println!("Error: {:?}", e);
                break;
            }
        }
    }

    Ok(())
}

Core Components

MqttBroker

The main broker implementation handling all MQTT protocol operations:

use mockforge_mqtt::{MqttBroker, MqttConfig, MqttSpecRegistry};

let config = MqttConfig {
    host: "0.0.0.0".to_string(),
    port: 1883,
    max_connections: 1000,
    max_packet_size: 1024 * 1024, // 1MB
    keep_alive_secs: 60,
    version: MqttVersion::V5_0,
};

let spec_registry = Arc::new(MqttSpecRegistry::new());
let broker = MqttBroker::new(config, spec_registry);

Topic Management

Hierarchical topic structure with wildcard support:

use mockforge_mqtt::topics::TopicTree;

// Create topic tree
let topic_tree = TopicTree::new();

// Topics support wildcards:
// + (single level) and # (multi-level)
topic_tree.subscribe("client/sensor/+/temperature", qos);
topic_tree.subscribe("home/+/status", qos);
topic_tree.subscribe("iot/devices/#", qos);

QoS Handling

Support for all MQTT Quality of Service levels:

use mockforge_mqtt::qos::{QoSHandler, MessageState};

// QoS 0: At most once (fire and forget)
let qos_0 = QoSHandler::publish_at_most_once(&message);

// QoS 1: At least once (acknowledged delivery)
let qos_1 = QoSHandler::publish_at_least_once(&message).await?;

// QoS 2: Exactly once (two-phase commit)
let qos_2 = QoSHandler::publish_exactly_once(&message).await?;

Session Management

Persistent sessions for reliable messaging:

use mockforge_mqtt::broker::ClientSession;

// Clean session (default)
let clean_session = ClientSession {
    client_id: "client-1".to_string(),
    subscriptions: HashMap::new(),
    clean_session: true,
    connected_at: now,
    last_seen: now,
};

// Persistent session
let persistent_session = ClientSession {
    client_id: "client-2".to_string(),
    subscriptions: HashMap::new(),
    clean_session: false, // Session persists across connections
    connected_at: now,
    last_seen: now,
};

Fixture System

Define message templates and auto-publishing rules using YAML:

# mqtt-fixture.yaml
topics:
  - name: "sensors/temperature"
    retained: false
  - name: "devices/status"
    retained: true

fixtures:
  - topic: "sensors/temperature"
    payload: '{"sensor_id": "temp-001", "value": 23.5, "unit": "celsius"}'
    qos: 1
    retain: false

  - topic: "devices/status"
    payload: '{"device_id": "dev-001", "status": "online", "battery": 85}'
    qos: 0
    retain: true

auto_publish:
  - topic: "sensors/temperature"
    payload_template: '{"sensor_id": "temp-{{sensor_id}}", "value": {{temperature}}, "timestamp": "{{now}}"}'
    qos: 1
    interval_seconds: 30
    duration_seconds: 300
    variables:
      sensor_id: "001"
      temperature: "22.5"

  - topic: "iot/heartbeat"
    payload_template: '{"service": "{{service_name}}", "status": "alive", "uptime": {{uptime}}}'
    qos: 0
    interval_seconds: 60
    variables:
      service_name: "mockforge-mqtt"
      uptime: 3600

Loading Fixtures

use mockforge_mqtt::{MqttBroker, MqttSpecRegistry};

// Create broker with fixture support
let spec_registry = Arc::new(MqttSpecRegistry::new());
let broker = MqttBroker::new(config, spec_registry);

// Load fixtures from file
broker.load_fixtures_from_file("mqtt-fixture.yaml").await?;

// Or create fixtures programmatically
use mockforge_mqtt::fixtures::{MqttFixture, AutoPublishConfig};

let fixture = MqttFixture {
    topics: vec![/* ... */],
    fixtures: vec![/* ... */],
    auto_publish: vec![/* ... */],
};

broker.add_fixture(fixture).await?;

Supported MQTT Features

Protocol Versions

  • MQTT 3.1.1: Legacy protocol support
  • MQTT 5.0: Latest protocol with enhanced features

Message Types

  • CONNECT: Client connection establishment
  • CONNACK: Connection acknowledgment
  • PUBLISH: Message publication
  • PUBACK/PUBREC/PUBREL/PUBCOMP: QoS flow control
  • SUBSCRIBE: Topic subscription
  • SUBACK: Subscription acknowledgment
  • UNSUBSCRIBE: Topic unsubscription
  • UNSUBACK: Unsubscription acknowledgment
  • PINGREQ/PINGRESP: Keep-alive handling
  • DISCONNECT: Clean disconnection

Advanced Features

  • Will Messages: Last will and testament
  • Retained Messages: Persistent topic messages
  • Topic Aliases: Bandwidth optimization (MQTT 5.0)
  • Subscription Identifiers: Subscription tracking (MQTT 5.0)
  • User Properties: Custom metadata (MQTT 5.0)

Configuration

MqttConfig

use mockforge_mqtt::{MqttConfig, MqttVersion};

let config = MqttConfig {
    host: "0.0.0.0".to_string(),
    port: 1883,
    max_connections: 1000,
    max_packet_size: 1024 * 1024, // 1MB
    keep_alive_secs: 60,
    version: MqttVersion::V5_0,
};

Environment Variables

# Server configuration
export MQTT_HOST=0.0.0.0
export MQTT_PORT=1883

# Connection limits
export MQTT_MAX_CONNECTIONS=1000
export MQTT_MAX_PACKET_SIZE=1048576

# Protocol settings
export MQTT_KEEP_ALIVE_SECS=60
export MQTT_VERSION=v5

Testing Examples

Publisher Testing

use rumqttc::{AsyncClient, MqttOptions, QoS};
use std::time::Duration;

#[tokio::test]
async fn test_mqtt_publisher() {
    // Start MockForge MQTT broker in background
    let broker = MqttBroker::new(MqttConfig::default(), Arc::new(MqttSpecRegistry::new()));
    tokio::spawn(async move { broker.start().await.unwrap() });

    // Give broker time to start
    tokio::time::sleep(Duration::from_millis(100)).await;

    // Test publisher
    let mut mqtt_options = MqttOptions::new("test-publisher", "localhost", 1883);
    let (client, mut eventloop) = AsyncClient::new(mqtt_options, 10);

    // Publish test message
    client
        .publish("test/topic", QoS::AtLeastOnce, false, "Hello MQTT!")
        .await
        .unwrap();

    // Verify message was published (check broker state)
    // ... verification logic ...
}

Subscriber Testing

use rumqttc::{AsyncClient, MqttOptions, QoS, Event};
use futures::StreamExt;

#[tokio::test]
async fn test_mqtt_subscriber() {
    // Start broker and publish test message
    // ... setup code ...

    // Create subscriber
    let mut mqtt_options = MqttOptions::new("test-subscriber", "localhost", 1883);
    let (client, mut eventloop) = AsyncClient::new(mqtt_options, 10);

    // Subscribe to topic
    client.subscribe("test/topic", QoS::AtMostOnce).await.unwrap();

    // Publish a message
    client.publish("test/topic", QoS::AtLeastOnce, false, "test message").await.unwrap();

    // Receive message
    let event = eventloop.next().await.unwrap().unwrap();
    match event {
        Event::Incoming(incoming) => {
            if let rumqttc::Packet::Publish(publish) = incoming {
                let payload = std::str::from_utf8(&publish.payload).unwrap();
                assert_eq!(payload, "test message");
            }
        }
        _ => panic!("Expected publish event"),
    }
}

QoS Testing

use rumqttc::{AsyncClient, MqttOptions, QoS};

#[tokio::test]
async fn test_mqtt_qos_levels() {
    // Test QoS 0 (At most once)
    let (client, mut eventloop) = AsyncClient::new(MqttOptions::new("qos-test", "localhost", 1883), 10);
    client.subscribe("qos/test", QoS::AtMostOnce).await.unwrap();
    client.publish("qos/test", QoS::AtMostOnce, false, "QoS 0 message").await.unwrap();

    // Test QoS 1 (At least once)
    client.publish("qos/test", QoS::AtLeastOnce, false, "QoS 1 message").await.unwrap();

    // Test QoS 2 (Exactly once)
    client.publish("qos/test", QoS::ExactlyOnce, false, "QoS 2 message").await.unwrap();

    // Verify messages are received (broker should handle QoS flows)
}

Retained Messages

use rumqttc::{AsyncClient, MqttOptions, QoS};

#[tokio::test]
async fn test_retained_messages() {
    // Publish retained message
    let (publisher, _) = AsyncClient::new(MqttOptions::new("publisher", "localhost", 1883), 10);
    publisher
        .publish("retained/topic", QoS::AtLeastOnce, true, "retained message")
        .await
        .unwrap();

    // New subscriber should receive retained message immediately
    let (subscriber, mut eventloop) = AsyncClient::new(MqttOptions::new("subscriber", "localhost", 1883), 10);
    subscriber.subscribe("retained/topic", QoS::AtMostOnce).await.unwrap();

    // Should receive retained message
    let event = eventloop.next().await.unwrap().unwrap();
    match event {
        Event::Incoming(incoming) => {
            if let rumqttc::Packet::Publish(publish) = incoming {
                assert!(publish.retain);
                let payload = std::str::from_utf8(&publish.payload).unwrap();
                assert_eq!(payload, "retained message");
            }
        }
        _ => panic!("Expected retained publish event"),
    }
}

Performance

MockForge MQTT is optimized for testing scenarios:

  • In-Memory Operations: Fast message routing without persistence
  • Concurrent Connections: Handle multiple simultaneous MQTT clients
  • Low Latency: Minimal overhead for message operations
  • Scalable: Support for high-throughput IoT testing scenarios
  • Resource Efficient: Configurable connection limits and cleanup

Integration with MockForge

MockForge MQTT integrates seamlessly with the MockForge ecosystem:

  • MockForge Core: Shared configuration and logging
  • MockForge CLI: Command-line MQTT broker management
  • MockForge Data: Enhanced message generation with templates
  • MockForge Observability: Metrics and tracing integration

Troubleshooting

Common Issues

Connection refused:

  • Ensure broker is started and listening on correct port
  • Check firewall settings and port availability
  • Verify client connection parameters

Messages not received:

  • Check topic subscription patterns and wildcards
  • Verify QoS levels match between publisher and subscriber
  • Check retained message settings

QoS issues:

  • Ensure broker supports requested QoS level
  • Check network reliability for higher QoS levels
  • Verify client acknowledgment handling

Session persistence:

  • Check clean session flag settings
  • Verify client ID consistency across connections
  • Check session expiry settings

Examples

See the examples directory for complete working examples including:

  • Basic MQTT broker setup
  • Publisher/subscriber testing patterns
  • QoS level verification
  • Retained message scenarios
  • IoT device simulation
  • Load testing with multiple clients

Related Crates

License

Licensed under MIT OR Apache-2.0

Commit count: 664

cargo fmt