| Crates.io | celers-broker-amqp |
| lib.rs | celers-broker-amqp |
| version | 0.1.0 |
| created_at | 2026-01-18 15:22:21.761837+00 |
| updated_at | 2026-01-18 15:22:21.761837+00 |
| description | RabbitMQ/AMQP broker implementation for CeleRS |
| homepage | |
| repository | https://github.com/cool-japan/celers |
| max_upload_size | |
| id | 2052483 |
| size | 853,820 |
RabbitMQ/AMQP broker implementation for CeleRS, providing a full-featured message broker with exchange/queue topology management, publisher confirms, and advanced features like priority queues, dead letter exchanges, and transactions.
start_consumer()Add to your Cargo.toml:
[dependencies]
celers-broker-amqp = "0.1"
celers-protocol = "0.1"
celers-kombu = "0.1"
use celers_broker_amqp::{AmqpBroker, AmqpConfig};
use celers_kombu::{Transport, Producer, Consumer};
use celers_protocol::builder::MessageBuilder;
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create and connect to broker
let mut broker = AmqpBroker::new("amqp://localhost:5672", "my_queue").await?;
broker.connect().await?;
// Publish a message
let message = MessageBuilder::new("tasks.process")
.args(vec![serde_json::json!({"data": "hello"})])
.build()?;
broker.publish("my_queue", message).await?;
// Consume messages
if let Ok(Some(envelope)) = broker.consume("my_queue", Duration::from_secs(5)).await {
println!("Received: {:?}", envelope.message);
broker.ack(&envelope.delivery_tag).await?;
}
broker.disconnect().await?;
Ok(())
}
The examples/ directory contains 16 comprehensive examples demonstrating various features:
# Basic publish/consume workflow
cargo run --example basic_publish_consume
# High-throughput batch operations
cargo run --example batch_publish
# Priority-based message processing
cargo run --example priority_queue
# Dead Letter Exchange configuration
cargo run --example dead_letter_exchange
# AMQP transaction support
cargo run --example transaction
# Async streaming consumer pattern
cargo run --example streaming_consumer
# RabbitMQ Management API usage
cargo run --example management_api
# Modern queue features (quorum, stream, lazy mode)
cargo run --example modern_queue_features
# Advanced monitoring & batch consumption (v4)
cargo run --example advanced_monitoring
# Monitoring & utility functions demo (v5)
cargo run --example monitoring_utilities
# v6 features: circuit breaker, retry, compression, topology, tracing, consumer groups
cargo run --example v6_features_demo
# Production patterns: complete integration of all v6 features
cargo run --example production_patterns
# v7 features: rate limiting, bulkhead, scheduling, metrics export
cargo run --example v7_features_demo
# v8 features: hooks, DLX analytics, adaptive batching, profiling
cargo run --example v8_features_demo
# v9 features: backpressure, poison detection, routing, optimization
cargo run --example v9_features_demo
# v9 production integration: complete production-ready integration of all v9 features (RECOMMENDED)
cargo run --example v9_production_integration
Note: Most examples require a running RabbitMQ instance. See the setup guide below.
The easiest way to get started is using Docker:
# Start RabbitMQ with management plugin
docker run -d --name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
rabbitmq:3-management
# Access management UI at http://localhost:15672
# Default credentials: guest/guest
# Add RabbitMQ repository
curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.deb.sh | sudo bash
# Install RabbitMQ
sudo apt-get install rabbitmq-server
# Enable and start service
sudo systemctl enable rabbitmq-server
sudo systemctl start rabbitmq-server
# Enable management plugin
sudo rabbitmq-plugins enable rabbitmq_management
# Install via Homebrew
brew install rabbitmq
# Start service
brew services start rabbitmq
Create /etc/rabbitmq/rabbitmq.conf:
# Memory threshold (60% of available memory)
vm_memory_high_watermark.relative = 0.6
# Disk free space threshold (50GB)
disk_free_limit.absolute = 50GB
# Heartbeat timeout
heartbeat = 60
# Maximum number of channels
channel_max = 2047
# Enable lazy queues for better performance with large queues
queue_master_locator = min-masters
# Log level
log.console.level = info
Create isolated environments for different applications:
# Create virtual host
sudo rabbitmqctl add_vhost production
# Create user
sudo rabbitmqctl add_user myapp secretpassword
# Set permissions
sudo rabbitmqctl set_permissions -p production myapp ".*" ".*" ".*"
# Use in your application
let broker = AmqpBroker::with_config(
"amqp://myapp:secretpassword@localhost:5672",
"my_queue",
AmqpConfig::default().with_vhost("production")
).await?;
Best for distributing tasks among multiple workers with load balancing.
use celers_broker_amqp::{AmqpBroker, AmqpConfig, QueueConfig};
use celers_kombu::{Transport, Producer};
async fn setup_work_queue() -> Result<(), Box<dyn std::error::Error>> {
let mut broker = AmqpBroker::new("amqp://localhost:5672", "tasks").await?;
broker.connect().await?;
// Configure queue with prefetch for fair dispatch
let config = AmqpConfig::default()
.with_prefetch(1) // One message per worker at a time
.with_exchange("tasks")
.with_exchange_type(celers_broker_amqp::AmqpExchangeType::Direct);
// Workers will automatically round-robin messages
Ok(())
}
Broadcast messages to all consumers.
use celers_broker_amqp::{AmqpBroker, AmqpExchangeType};
async fn setup_pubsub() -> Result<(), Box<dyn std::error::Error>> {
let mut broker = AmqpBroker::new("amqp://localhost:5672", "notifications").await?;
broker.connect().await?;
// Declare fanout exchange
broker.declare_exchange("notifications", AmqpExchangeType::Fanout).await?;
// Each consumer gets its own queue
broker.declare_queue_with_config("email_notifications", &QueueConfig::new()).await?;
broker.declare_queue_with_config("sms_notifications", &QueueConfig::new()).await?;
// Bind queues to exchange
broker.bind_queue("email_notifications", "notifications", "").await?;
broker.bind_queue("sms_notifications", "notifications", "").await?;
Ok(())
}
Route messages based on routing key patterns.
use celers_broker_amqp::{AmqpBroker, AmqpExchangeType};
async fn setup_routing() -> Result<(), Box<dyn std::error::Error>> {
let mut broker = AmqpBroker::new("amqp://localhost:5672", "logs").await?;
broker.connect().await?;
// Declare topic exchange
broker.declare_exchange("logs", AmqpExchangeType::Topic).await?;
// Bind with patterns
broker.bind_queue("error_logs", "logs", "*.error").await?;
broker.bind_queue("all_logs", "logs", "*").await?;
broker.bind_queue("kernel_logs", "logs", "kernel.*").await?;
Ok(())
}
Process high-priority messages first.
use celers_broker_amqp::{AmqpBroker, QueueConfig};
use celers_protocol::builder::MessageBuilder;
use celers_kombu::Producer;
async fn setup_priority_queue() -> Result<(), Box<dyn std::error::Error>> {
let mut broker = AmqpBroker::new("amqp://localhost:5672", "priority_tasks").await?;
broker.connect().await?;
// Declare priority queue (max priority: 10)
let config = QueueConfig::new().with_max_priority(10);
broker.declare_queue_with_config("priority_tasks", &config).await?;
// Publish with priority
let urgent_msg = MessageBuilder::new("urgent.task")
.priority(9) // High priority
.build()?;
let normal_msg = MessageBuilder::new("normal.task")
.priority(5) // Normal priority
.build()?;
broker.publish("priority_tasks", urgent_msg).await?;
broker.publish("priority_tasks", normal_msg).await?;
Ok(())
}
Handle failed messages automatically.
use celers_broker_amqp::{AmqpBroker, QueueConfig, DlxConfig};
async fn setup_dlx() -> Result<(), Box<dyn std::error::Error>> {
let mut broker = AmqpBroker::new("amqp://localhost:5672", "main_queue").await?;
broker.connect().await?;
// Setup dead letter exchange and queue
broker.declare_dlx("failed_exchange", "failed_queue").await?;
// Configure main queue with DLX
let dlx = DlxConfig::new("failed_exchange").with_routing_key("failed_queue");
let config = QueueConfig::new()
.with_dlx(dlx)
.with_message_ttl(60000); // 60 second TTL
broker.declare_queue_with_config("main_queue", &config).await?;
// Failed/expired messages will automatically go to failed_queue
Ok(())
}
Schedule tasks for future execution.
use celers_protocol::builder::MessageBuilder;
use celers_kombu::Producer;
use std::time::Duration;
async fn schedule_delayed_task(broker: &mut AmqpBroker) -> Result<(), Box<dyn std::error::Error>> {
// Schedule task for 5 minutes from now
let message = MessageBuilder::new("delayed.task")
.countdown(300) // 300 seconds
.build()?;
broker.publish("delayed_queue", message).await?;
Ok(())
}
Publish multiple messages efficiently:
use celers_protocol::builder::MessageBuilder;
async fn batch_publish(broker: &mut AmqpBroker) -> Result<(), Box<dyn std::error::Error>> {
let messages: Vec<_> = (0..100)
.map(|i| {
MessageBuilder::new("batch.task")
.args(vec![serde_json::json!(i)])
.build()
.unwrap()
})
.collect();
// Publish all at once with confirms
let count = broker.publish_batch("my_queue", messages).await?;
println!("Published {} messages", count);
Ok(())
}
Control throughput with pipeline depth:
async fn pipeline_publish(broker: &mut AmqpBroker) -> Result<(), Box<dyn std::error::Error>> {
let messages = vec![/* ... */];
// Send 50 messages before waiting for confirms
let count = broker.publish_pipeline("my_queue", messages, 50).await?;
Ok(())
}
High-throughput async consumption:
use futures_lite::StreamExt;
async fn stream_consume(broker: &mut AmqpBroker) -> Result<(), Box<dyn std::error::Error>> {
let mut consumer = broker.start_consumer("my_queue", "consumer-1").await?;
while let Some(delivery) = consumer.next().await {
let delivery = delivery?;
// Process message
println!("Received: {:?}", delivery.data);
// Acknowledge
delivery.ack(lapin::options::BasicAckOptions::default()).await?;
}
Ok(())
}
Ensure atomic operations:
async fn transactional_publish(broker: &mut AmqpBroker) -> Result<(), Box<dyn std::error::Error>> {
// Start transaction
broker.start_transaction().await?;
// Publish multiple messages
for i in 0..5 {
let msg = MessageBuilder::new("task").args(vec![serde_json::json!(i)]).build()?;
broker.publish("my_queue", msg).await?;
}
// Commit all or rollback
if some_condition {
broker.commit_transaction().await?;
} else {
broker.rollback_transaction().await?;
}
Ok(())
}
Monitor broker connection health:
async fn monitor_health(broker: &AmqpBroker) {
let status = broker.health_status();
if !status.is_healthy() {
println!("Broker unhealthy!");
println!("Connected: {}", status.connected);
println!("Channel open: {}", status.channel_open);
}
// Get metrics
let metrics = broker.channel_metrics();
println!("Published: {}", metrics.messages_published);
println!("Consumed: {}", metrics.messages_consumed);
println!("Errors: {}", metrics.publish_errors);
// Publisher confirm stats
let confirm_stats = broker.publisher_confirm_stats();
println!("Avg latency: {}μs", confirm_stats.avg_confirm_latency_us);
}
Improve concurrency with connection pooling:
let config = AmqpConfig::default()
.with_connection_pool_size(10) // 10 connections
.with_channel_pool_size(100); // 100 channels per connection
let broker = AmqpBroker::with_config(
"amqp://localhost:5672",
"my_queue",
config
).await?;
Prevent duplicate processing:
let config = AmqpConfig::default()
.with_deduplication(true)
.with_deduplication_config(10000, Duration::from_secs(3600)); // Cache 10k IDs for 1 hour
let mut broker = AmqpBroker::with_config(
"amqp://localhost:5672",
"my_queue",
config
).await?;
// Duplicate messages with same ID will be automatically skipped
Protect your system from cascading failures with automatic circuit breaking:
use celers_broker_amqp::circuit_breaker::{CircuitBreaker, CircuitBreakerConfig};
use std::time::Duration;
async fn with_circuit_breaker() -> Result<(), Box<dyn std::error::Error>> {
let config = CircuitBreakerConfig {
failure_threshold: 5, // Open after 5 failures
success_threshold: 2, // Close after 2 successes
timeout: Duration::from_secs(60), // Try again after 60s
half_open_max_calls: 3, // Test with 3 calls in half-open state
};
let mut circuit = CircuitBreaker::new(config);
// Execute operation with circuit breaker protection
match circuit.call(|| async {
// Your operation here
Ok(())
}).await {
Ok(result) => println!("Success: {:?}", result),
Err(e) => println!("Circuit open or operation failed: {:?}", e),
}
// Monitor circuit state
let metrics = circuit.metrics();
println!("State: {:?}", metrics.state);
println!("Failures: {}", metrics.failure_count);
Ok(())
}
Implement sophisticated retry logic with exponential backoff and jitter:
use celers_broker_amqp::retry::{
ExponentialBackoff, Jitter, RetryStrategy, RetryExecutor
};
use std::time::Duration;
async fn with_retry() -> Result<(), Box<dyn std::error::Error>> {
// Create exponential backoff strategy
let strategy = ExponentialBackoff::new(Duration::from_millis(100))
.with_max_delay(Duration::from_secs(30))
.with_max_retries(5)
.with_jitter(Jitter::Full); // Full jitter to prevent thundering herd
// Execute with retry
let executor = RetryExecutor::new(strategy);
let result = executor.execute(|| async {
// Your operation that might fail
publish_message().await
}).await?;
Ok(())
}
async fn publish_message() -> Result<(), Box<dyn std::error::Error>> {
// Simulated operation
Ok(())
}
Available jitter strategies:
Jitter::None - No randomizationJitter::Full - Randomize between 0 and calculated delayJitter::Equal - Half deterministic, half randomJitter::Decorrelated - Based on previous delayReduce network overhead with built-in compression:
use celers_broker_amqp::compression::{
compress_message, decompress_message, CompressionCodec,
should_compress_message, CompressionStats
};
fn compression_example() -> Result<(), Box<dyn std::error::Error>> {
let data = b"Your message data here...".repeat(100);
// Check if compression is beneficial
if should_compress_message(&data, 1024) {
// Compress with gzip
let compressed = compress_message(&data, CompressionCodec::Gzip)?;
println!("Original: {} bytes", data.len());
println!("Compressed: {} bytes", compressed.len());
// Or use zstd for better compression ratios
let zstd_compressed = compress_message(&data, CompressionCodec::Zstd)?;
println!("Zstd compressed: {} bytes", zstd_compressed.len());
// Decompress
let decompressed = decompress_message(&compressed, CompressionCodec::Gzip)?;
assert_eq!(data, decompressed.as_slice());
}
Ok(())
}
Validate your AMQP topology before deployment:
use celers_broker_amqp::topology::{
TopologyValidator, ExchangeDefinition, QueueDefinition, BindingDefinition,
validate_queue_naming, calculate_topology_complexity, analyze_topology_issues
};
use celers_broker_amqp::AmqpExchangeType;
fn validate_topology() -> Result<(), Box<dyn std::error::Error>> {
let mut validator = TopologyValidator::new();
// Define exchanges
let exchange = ExchangeDefinition {
name: "tasks".to_string(),
exchange_type: AmqpExchangeType::Topic,
durable: true,
auto_delete: false,
};
validator.add_exchange(exchange)?;
// Define queues
let queue = QueueDefinition {
name: "tasks.high".to_string(),
durable: true,
auto_delete: false,
arguments: Default::default(),
};
validator.add_queue(queue)?;
// Define bindings
let binding = BindingDefinition {
source: "tasks".to_string(),
destination: "tasks.high".to_string(),
routing_key: "tasks.high.*".to_string(),
arguments: Default::default(),
};
validator.add_binding(binding)?;
// Validate topology
let issues = validator.validate()?;
if !issues.is_empty() {
println!("Topology issues found:");
for issue in issues {
println!(" - {}", issue);
}
}
// Analyze complexity
let summary = validator.summary();
let complexity = calculate_topology_complexity(
summary.exchanges,
summary.queues,
summary.bindings
);
println!("Topology complexity score: {:.1}", complexity);
Ok(())
}
Track message flow and analyze patterns:
use celers_broker_amqp::tracing_util::{
TraceRecorder, MessageTrace, TraceEvent, MessageFlowAnalyzer
};
use uuid::Uuid;
async fn message_tracing() -> Result<(), Box<dyn std::error::Error>> {
let mut recorder = TraceRecorder::new(10000); // Track up to 10k messages
// Record message lifecycle
let msg_id = Uuid::new_v4().to_string();
recorder.record_event(&msg_id, TraceEvent::Published);
recorder.record_event(&msg_id, TraceEvent::Consumed);
recorder.record_event(&msg_id, TraceEvent::Acknowledged);
// Analyze message flow
let analyzer = MessageFlowAnalyzer::new(recorder);
let insights = analyzer.analyze();
println!("Total messages: {}", insights.total_messages);
println!("Success rate: {:.2}%", insights.success_rate * 100.0);
println!("Rejection rate: {:.2}%", insights.rejection_rate * 100.0);
println!("Avg processing time: {:.2}ms", insights.avg_processing_time_ms);
println!("Health status: {:?}", insights.health_status);
Ok(())
}
Coordinate multiple consumers with load balancing:
use celers_broker_amqp::consumer_groups::{
ConsumerGroup, ConsumerInfo, LoadBalancingStrategy
};
fn consumer_groups_example() -> Result<(), Box<dyn std::error::Error>> {
// Create consumer group with round-robin strategy
let mut group = ConsumerGroup::new(
"my-consumer-group".to_string(),
LoadBalancingStrategy::RoundRobin
);
// Add consumers
for i in 0..5 {
let consumer = ConsumerInfo::new(
format!("consumer-{}", i),
"my_queue".to_string()
);
group.add_consumer(consumer);
}
// Select next consumer for message delivery
if let Some(consumer_id) = group.next_consumer() {
println!("Routing to consumer: {}", consumer_id);
// Track message processing
group.mark_processing_started(&consumer_id);
// ... process message ...
group.mark_processing_completed(&consumer_id, true);
}
// Get group statistics
let stats = group.get_statistics();
println!("Active consumers: {}", stats.active_consumers);
println!("Total processed: {}", stats.total_messages_processed);
println!("Avg utilization: {:.2}%", stats.avg_utilization * 100.0);
Ok(())
}
Available load balancing strategies:
LoadBalancingStrategy::RoundRobin - Distribute evenly across consumersLoadBalancingStrategy::LeastConnections - Route to least busy consumerLoadBalancingStrategy::Priority - Route to highest priority available consumerLoadBalancingStrategy::Random - Random consumer selectionCause: RabbitMQ is not running or not accessible.
Solutions:
# Check if RabbitMQ is running
sudo systemctl status rabbitmq-server
# Check if port is open
telnet localhost 5672
# Check RabbitMQ logs
sudo tail -f /var/log/rabbitmq/rabbit@hostname.log
# Restart RabbitMQ
sudo systemctl restart rabbitmq-server
Cause: Invalid credentials or permissions.
Solutions:
# List users
sudo rabbitmqctl list_users
# Add user
sudo rabbitmqctl add_user myuser mypassword
# Set permissions
sudo rabbitmqctl set_permissions -p / myuser ".*" ".*" ".*"
# Set admin tag
sudo rabbitmqctl set_user_tags myuser administrator
Cause: Network issues or RabbitMQ restart.
Solution: Enable auto-reconnection:
let config = AmqpConfig::default()
.with_auto_reconnect(true)
.with_auto_reconnect_config(5, Duration::from_secs(2)); // 5 retries, 2s delay
let broker = AmqpBroker::with_config(url, queue, config).await?;
Causes & Solutions:
let config = AmqpConfig::default()
.with_prefetch(50); // Increase prefetch
// Use manual ack in batches
let mut count = 0;
while let Ok(Some(envelope)) = broker.consume(queue, timeout).await {
// Process message
count += 1;
// Ack every 10 messages
if count % 10 == 0 {
broker.ack(&envelope.delivery_tag).await?;
}
}
// Use multiple consumers with streaming
for i in 0..num_workers {
let mut consumer = broker.start_consumer(queue, &format!("worker-{}", i)).await?;
tokio::spawn(async move {
// Process messages
});
}
Causes & Solutions:
# Check queue sizes
sudo rabbitmqctl list_queues name messages
# Purge if needed
sudo rabbitmqctl purge_queue queue_name
queue_mode = lazy
let config = QueueConfig::new()
.with_max_length(10000)
.with_max_length_bytes(1_000_000_000); // 1GB
Cause: High load or slow disk I/O.
Solutions:
// Use pipeline publishing for better throughput
broker.publish_pipeline(queue, messages, 100).await?;
// Or batch publishing
broker.publish_batch(queue, messages).await?;
Checks:
// 1. Check queue size
let size = broker.queue_size(queue).await?;
println!("Queue has {} messages", size);
// 2. Check consumer count
// Use RabbitMQ management API or CLI:
// sudo rabbitmqctl list_queues name consumers
// 3. Verify queue binding
broker.bind_queue(queue, exchange, routing_key).await?;
Possible causes:
// Increase or remove TTL
let config = QueueConfig::new()
.with_message_ttl(600000); // 10 minutes
// Increase limit or use DLX
let dlx = DlxConfig::new("overflow_exchange");
let config = QueueConfig::new()
.with_max_length(50000)
.with_dlx(dlx);
// Make queue persistent
let config = QueueConfig::new()
.durable(true)
.auto_delete(false);
Solution: Enable deduplication:
let config = AmqpConfig::default()
.with_deduplication(true);
let broker = AmqpBroker::with_config(url, queue, config).await?;
Cause: Queue configuration mismatch.
Solution: Delete and recreate queue:
sudo rabbitmqctl delete_queue queue_name
Cause: Queue is used exclusively by another connection.
Solution: Close the exclusive connection or wait for it to disconnect.
Cause: RabbitMQ is running out of resources (memory/disk).
Solutions:
# Check alarms
sudo rabbitmqctl eval 'rabbit_alarm:get_alarms().'
# Check memory
free -h
# Increase memory threshold in config
vm_memory_high_watermark.relative = 0.7
Enable detailed logging:
// Set RUST_LOG environment variable
// RUST_LOG=debug cargo run
use tracing_subscriber;
tracing_subscriber::fmt()
.with_max_level(tracing::Level::DEBUG)
.init();
Use RabbitMQ Management UI:
http://localhost:15672Run unit tests:
cargo test
Run integration tests (requires RabbitMQ):
# Start RabbitMQ
docker run -d --name rabbitmq -p 5672:5672 rabbitmq:3
# Run integration tests
cargo test --ignored
Typical performance on modest hardware (4 CPU cores, 8GB RAM):
This implementation is 100% compatible with Python Celery:
list_queues() requires RabbitMQ Management API (not available via AMQP protocol)MIT OR Apache-2.0