| Crates.io | mockforge-kafka |
| lib.rs | mockforge-kafka |
| version | 0.3.31 |
| created_at | 2025-10-16 20:59:40.758757+00 |
| updated_at | 2026-01-04 23:44:38.515453+00 |
| description | Kafka protocol support for MockForge |
| homepage | https://mockforge.dev |
| repository | https://github.com/SaaSy-Solutions/mockforge |
| max_upload_size | |
| id | 1886758 |
| size | 309,268 |
Kafka protocol support for MockForge with full broker simulation, topic management, and consumer group coordination.
This crate provides comprehensive Kafka mocking capabilities, allowing you to simulate Apache Kafka brokers for testing event-driven applications. Perfect for testing Kafka producers, consumers, and stream processing applications without requiring a full Kafka cluster.
use mockforge_kafka::KafkaMockBroker;
use mockforge_core::config::KafkaConfig;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create broker configuration
let config = KafkaConfig {
host: "127.0.0.1".to_string(),
port: 9092,
..Default::default()
};
// Initialize and start broker
let broker = KafkaMockBroker::new(config).await?;
broker.start().await?;
Ok(())
}
use rdkafka::config::ClientConfig;
use rdkafka::producer::{FutureProducer, FutureRecord};
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Connect to MockForge Kafka broker
let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", "localhost:9092")
.set("message.timeout.ms", "5000")
.create()?;
// Produce a message
let delivery_status = producer
.send(
FutureRecord::to("test-topic")
.payload("Hello from MockForge!")
.key("test-key"),
Duration::from_secs(0),
)
.await;
match delivery_status {
Ok((partition, offset)) => {
println!("Message delivered to partition {} at offset {}", partition, offset);
}
Err((e, _)) => println!("Failed to deliver message: {}", e),
}
Ok(())
}
The main broker implementation that handles all Kafka protocol operations:
use mockforge_kafka::KafkaMockBroker;
use mockforge_core::config::KafkaConfig;
let config = KafkaConfig {
host: "0.0.0.0".to_string(),
port: 9092,
auto_create_topics: true,
default_partitions: 3,
..Default::default()
};
let broker = KafkaMockBroker::new(config).await?;
broker.start().await?;
Create and manage Kafka topics dynamically:
use mockforge_kafka::topics::{Topic, TopicConfig};
// Create a topic with specific configuration
let topic_config = TopicConfig {
name: "user-events".to_string(),
partitions: 3,
replication_factor: 1,
retention_ms: Some(604800000), // 7 days
};
let topic = Topic::new(topic_config);
// Topics are automatically created when first accessed
// or can be pre-created through the broker API
Handle produce requests with full protocol compliance:
use mockforge_kafka::partitions::KafkaMessage;
// Create messages for production
let messages = vec![
KafkaMessage {
key: Some(b"user-123".to_vec()),
value: b"{\"action\": \"login\", \"user_id\": 123}".to_vec(),
timestamp: Some(chrono::Utc::now().timestamp_millis()),
headers: None,
},
KafkaMessage {
key: Some(b"user-456".to_vec()),
value: b"{\"action\": \"logout\", \"user_id\": 456}".to_vec(),
timestamp: Some(chrono::Utc::now().timestamp_millis()),
headers: None,
},
];
// Messages are automatically routed to appropriate partitions
// based on key hashing (if key provided) or round-robin
Simulate consumer group behavior and coordination:
use mockforge_kafka::consumer_groups::{ConsumerGroup, ConsumerGroupManager};
// Create consumer group manager
let group_manager = ConsumerGroupManager::new();
// Consumer groups are automatically managed when consumers join
// Partition assignment follows Kafka's standard algorithms
let group = ConsumerGroup::new(
"my-consumer-group".to_string(),
vec!["consumer-1".to_string(), "consumer-2".to_string()],
);
// Group handles partition rebalancing when members join/leave
Define message templates and auto-production rules using YAML:
# kafka-fixture.yaml
topics:
- name: "user-events"
partitions: 3
config:
retention.ms: "604800000" # 7 days
- name: "order-events"
partitions: 2
fixtures:
- topic: "user-events"
key_template: "{{uuid}}"
value_template: |
{
"user_id": "{{uuid}}",
"action": "{{random_element 'login' 'logout' 'signup' 'update_profile'}}",
"timestamp": "{{now}}",
"metadata": {
"source": "web",
"version": "1.0"
}
}
headers:
content-type: "application/json"
auto_produce:
- topic: "user-events"
rate_per_second: 5
duration_seconds: 300 # 5 minutes
key_template: "{{uuid}}"
value_template: |
{
"event_type": "heartbeat",
"service": "user-service",
"timestamp": "{{now}}"
}
- topic: "order-events"
rate_per_second: 2
duration_seconds: 600 # 10 minutes
key_template: "order-{{sequence}}"
value_template: |
{
"order_id": "{{sequence}}",
"user_id": "{{uuid}}",
"amount": {{float_range 10.0 1000.0}},
"items": {{int_range 1 10}},
"status": "created",
"created_at": "{{now}}"
}
use mockforge_kafka::{KafkaMockBroker, KafkaSpecRegistry};
// Create broker with fixture support
let spec_registry = KafkaSpecRegistry::new();
let broker = KafkaMockBroker::with_registry(config, spec_registry).await?;
// Load fixtures from file
broker.load_fixtures_from_file("kafka-fixture.yaml").await?;
// Or create fixtures programmatically
use mockforge_kafka::fixtures::{KafkaFixture, AutoProduceConfig};
let fixture = KafkaFixture {
topics: vec![/* ... */],
fixtures: vec![/* ... */],
auto_produce: vec![/* ... */],
};
broker.add_fixture(fixture).await?;
MockForge Kafka implements the following Kafka protocol APIs:
Comprehensive metrics exported in Prometheus format:
use mockforge_kafka::metrics::MetricsExporter;
// Create metrics exporter
let exporter = MetricsExporter::new();
// Export current metrics
let metrics = exporter.export_prometheus().await?;
println!("{}", metrics);
// Sample metrics:
// kafka_requests_total{api="produce"} 150
// kafka_messages_produced_total{topic="user-events"} 1000
// kafka_consumer_groups_total 5
// kafka_connections_active 12
use mockforge_core::config::KafkaConfig;
let config = KafkaConfig {
host: "0.0.0.0".to_string(),
port: 9092,
auto_create_topics: true,
default_partitions: 3,
default_replication_factor: 1,
log_retention_hours: 168, // 7 days
max_message_size: 1048576, // 1MB
num_threads: 4,
..Default::default()
};
# Server configuration
export KAFKA_HOST=0.0.0.0
export KAFKA_PORT=9092
# Topic defaults
export KAFKA_AUTO_CREATE_TOPICS=true
export KAFKA_DEFAULT_PARTITIONS=3
# Performance
export KAFKA_MAX_MESSAGE_SIZE=1048576
export KAFKA_NUM_THREADS=4
use rdkafka::config::ClientConfig;
use rdkafka::producer::{FutureProducer, FutureRecord};
use std::time::Duration;
#[tokio::test]
async fn test_kafka_producer() {
// Start MockForge Kafka broker in background
let broker = KafkaMockBroker::new(KafkaConfig::default()).await.unwrap();
tokio::spawn(async move { broker.start().await.unwrap() });
// Give broker time to start
tokio::time::sleep(Duration::from_millis(100)).await;
// Test producer
let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", "localhost:9092")
.set("message.timeout.ms", "5000")
.create()
.unwrap();
// Send test message
let result = producer
.send(
FutureRecord::to("test-topic")
.payload("test message")
.key("test-key"),
Duration::from_secs(5),
)
.await;
assert!(result.is_ok());
let (partition, offset) = result.unwrap();
assert!(partition >= 0);
assert!(offset >= 0);
}
use rdkafka::config::ClientConfig;
use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::Message;
use futures::StreamExt;
#[tokio::test]
async fn test_kafka_consumer() {
// Start broker and produce test messages
// ... setup code ...
// Create consumer
let consumer: StreamConsumer = ClientConfig::new()
.set("bootstrap.servers", "localhost:9092")
.set("group.id", "test-group")
.set("auto.offset.reset", "earliest")
.create()
.unwrap();
consumer.subscribe(&["test-topic"]).unwrap();
// Consume messages
let mut message_stream = consumer.stream();
let message = message_stream.next().await.unwrap().unwrap();
let payload = message.payload().unwrap();
assert_eq!(std::str::from_utf8(payload).unwrap(), "test message");
}
use rdkafka::config::ClientConfig;
use rdkafka::consumer::{Consumer, StreamConsumer};
#[tokio::test]
async fn test_consumer_groups() {
// Start broker
// ... setup code ...
// Create multiple consumers in same group
let mut consumers = vec![];
for i in 0..3 {
let consumer: StreamConsumer = ClientConfig::new()
.set("bootstrap.servers", "localhost:9092")
.set("group.id", "test-group")
.set("client.id", &format!("consumer-{}", i))
.create()
.unwrap();
consumer.subscribe(&["test-topic"]).unwrap();
consumers.push(consumer);
}
// Verify partition assignment
// Consumers should automatically balance partitions
for consumer in consumers {
let assignment = consumer.assignment().unwrap();
assert!(!assignment.is_empty());
}
}
MockForge Kafka is optimized for testing scenarios:
MockForge Kafka integrates seamlessly with the MockForge ecosystem:
Connection refused:
Messages not consumed:
High latency:
Protocol errors:
See the examples directory for complete working examples including:
mockforge-core: Core mocking functionalityrdkafka: Kafka client library for testingLicensed under MIT OR Apache-2.0