| Crates.io | celers-kombu |
| lib.rs | celers-kombu |
| version | 0.1.0 |
| created_at | 2026-01-18 14:59:41.12898+00 |
| updated_at | 2026-01-18 14:59:41.12898+00 |
| description | Broker abstraction layer for CeleRS (inspired by Python Kombu) |
| homepage | |
| repository | https://github.com/cool-japan/celers |
| max_upload_size | |
| id | 2052449 |
| size | 752,164 |
Broker abstraction layer for CeleRS, inspired by Python's Kombu library. Provides unified traits for message broker implementations.
Production-ready broker abstraction with:
┌─────────────────────────────────────────────────────────┐
│ Broker Trait │
│ (Full producer + consumer) │
└─────────────────────────────────────────────────────────┘
│
┌───────────────┴───────────────┐
│ │
┌───────▼────────┐ ┌─────────▼────────┐
│ Producer │ │ Consumer │
│ (Publishing) │ │ (Consuming) │
└───────┬────────┘ └─────────┬────────┘
│ │
└───────────────┬───────────────┘
│
┌───────▼────────┐
│ Transport │
│ (Connection) │
└────────────────┘
│
┌───────────────┼───────────────┐
│ │ │
┌───────▼──┐ ┌──────▼───┐ ┌──────▼───┐
│ Redis │ │ AMQP │ │ SQS │
└──────────┘ └──────────┘ └──────────┘
use celers_kombu::{Transport, Producer, Consumer, Broker, Message, Envelope, Result};
use async_trait::async_trait;
struct MyBroker {
connected: bool,
}
#[async_trait]
impl Transport for MyBroker {
async fn connect(&mut self) -> Result<()> {
self.connected = true;
Ok(())
}
async fn disconnect(&mut self) -> Result<()> {
self.connected = false;
Ok(())
}
fn is_connected(&self) -> bool {
self.connected
}
fn name(&self) -> &str {
"my-broker"
}
}
#[async_trait]
impl Producer for MyBroker {
async fn publish(&mut self, queue: &str, message: Message) -> Result<()> {
// Implement message publishing
Ok(())
}
async fn publish_with_routing(
&mut self,
exchange: &str,
routing_key: &str,
message: Message,
) -> Result<()> {
// Implement routing
Ok(())
}
}
#[async_trait]
impl Consumer for MyBroker {
async fn consume(&mut self, queue: &str, timeout: Duration) -> Result<Option<Envelope>> {
// Implement message consumption
Ok(None)
}
async fn ack(&mut self, delivery_tag: &str) -> Result<()> {
// Implement acknowledgment
Ok(())
}
async fn reject(&mut self, delivery_tag: &str, requeue: bool) -> Result<()> {
// Implement rejection
Ok(())
}
async fn queue_size(&mut self, queue: &str) -> Result<usize> {
// Implement queue size check
Ok(0)
}
}
#[async_trait]
impl Broker for MyBroker {
async fn purge(&mut self, queue: &str) -> Result<usize> {
// Implement queue purge
Ok(0)
}
async fn create_queue(&mut self, queue: &str, mode: QueueMode) -> Result<()> {
// Implement queue creation
Ok(())
}
async fn delete_queue(&mut self, queue: &str) -> Result<()> {
// Implement queue deletion
Ok(())
}
async fn list_queues(&mut self) -> Result<Vec<String>> {
// Implement queue listing
Ok(vec![])
}
}
Low-level connection management:
#[async_trait]
pub trait Transport: Send + Sync {
/// Connect to the broker
async fn connect(&mut self) -> Result<()>;
/// Disconnect from the broker
async fn disconnect(&mut self) -> Result<()>;
/// Check if connected
fn is_connected(&self) -> bool;
/// Get transport name ("redis", "amqp", "sqs")
fn name(&self) -> &str;
}
Usage:
let mut transport = MyBroker::new();
transport.connect().await?;
assert!(transport.is_connected());
println!("Connected to: {}", transport.name());
transport.disconnect().await?;
Message publishing interface:
#[async_trait]
pub trait Producer: Transport {
/// Publish a message to a queue
async fn publish(&mut self, queue: &str, message: Message) -> Result<()>;
/// Publish a message with routing key (AMQP-style)
async fn publish_with_routing(
&mut self,
exchange: &str,
routing_key: &str,
message: Message,
) -> Result<()>;
}
Usage:
use celers_protocol::Message;
use uuid::Uuid;
let message = Message::new("task".to_string(), Uuid::new_v4(), vec![]);
// Simple publish
producer.publish("celery", message.clone()).await?;
// Routing (AMQP-style)
producer.publish_with_routing("tasks", "high_priority", message).await?;
Message consumption interface:
#[async_trait]
pub trait Consumer: Transport {
/// Consume a message from a queue (blocking with timeout)
async fn consume(&mut self, queue: &str, timeout: Duration) -> Result<Option<Envelope>>;
/// Acknowledge a message
async fn ack(&mut self, delivery_tag: &str) -> Result<()>;
/// Reject a message (requeue or send to DLQ)
async fn reject(&mut self, delivery_tag: &str, requeue: bool) -> Result<()>;
/// Get queue size
async fn queue_size(&mut self, queue: &str) -> Result<usize>;
}
Usage:
use std::time::Duration;
// Consume with timeout
if let Some(envelope) = consumer.consume("celery", Duration::from_secs(5)).await? {
println!("Received: {:?}", envelope.message);
// Process message
match process_message(&envelope.message) {
Ok(_) => {
// Acknowledge success
consumer.ack(&envelope.delivery_tag).await?;
}
Err(_) => {
// Reject and requeue
consumer.reject(&envelope.delivery_tag, true).await?;
}
}
}
// Check queue size
let size = consumer.queue_size("celery").await?;
println!("Queue has {} messages", size);
Full broker interface combining producer and consumer:
#[async_trait]
pub trait Broker: Producer + Consumer + Transport {
/// Purge a queue (remove all messages)
async fn purge(&mut self, queue: &str) -> Result<usize>;
/// Create a queue
async fn create_queue(&mut self, queue: &str, mode: QueueMode) -> Result<()>;
/// Delete a queue
async fn delete_queue(&mut self, queue: &str) -> Result<()>;
/// List all queues
async fn list_queues(&mut self) -> Result<Vec<String>>;
}
Usage:
// Create queues
broker.create_queue("celery", QueueMode::Fifo).await?;
broker.create_queue("priority", QueueMode::Priority).await?;
// List queues
let queues = broker.list_queues().await?;
println!("Queues: {:?}", queues);
// Purge queue
let removed = broker.purge("celery").await?;
println!("Removed {} messages", removed);
// Delete queue
broker.delete_queue("old_queue").await?;
pub struct Envelope {
/// The actual message
pub message: Message,
/// Delivery tag (for acknowledgment)
pub delivery_tag: String,
/// Redelivery flag
pub redelivered: bool,
}
Usage:
if let Some(envelope) = consumer.consume("celery", timeout).await? {
// Access message
let task_name = &envelope.message.headers.task;
// Check if redelivered (retry)
if envelope.redelivered {
println!("This is a retry");
}
// Acknowledge using delivery tag
consumer.ack(&envelope.delivery_tag).await?;
}
pub enum QueueMode {
/// First-In-First-Out
Fifo,
/// Priority-based
Priority,
}
FIFO Mode:
Priority Mode:
Usage:
// FIFO queue (default)
broker.create_queue("celery", QueueMode::Fifo).await?;
// Priority queue
broker.create_queue("priority", QueueMode::Priority).await?;
pub struct QueueConfig {
/// Queue name
pub name: String,
/// Queue mode (FIFO or Priority)
pub mode: QueueMode,
/// Durable (survive broker restart)
pub durable: bool,
/// Auto-delete (delete when no consumers)
pub auto_delete: bool,
/// Maximum message size
pub max_message_size: Option<usize>,
/// Message TTL (time-to-live)
pub message_ttl: Option<Duration>,
}
Builder pattern:
use std::time::Duration;
let config = QueueConfig::new("celery".to_string())
.with_mode(QueueMode::Priority)
.with_ttl(Duration::from_secs(3600));
assert_eq!(config.name, "celery");
assert_eq!(config.mode, QueueMode::Priority);
assert!(config.durable);
pub enum BrokerError {
/// Connection error
Connection(String),
/// Serialization error
Serialization(String),
/// Queue not found
QueueNotFound(String),
/// Message not found
MessageNotFound(Uuid),
/// Timeout waiting for message
Timeout,
/// Invalid configuration
Configuration(String),
/// Broker operation failed
OperationFailed(String),
}
Usage:
use celers_kombu::BrokerError;
match consumer.consume("celery", timeout).await {
Ok(Some(envelope)) => { /* process */ }
Ok(None) => println!("No messages"),
Err(BrokerError::Timeout) => println!("Timed out"),
Err(BrokerError::QueueNotFound(q)) => eprintln!("Queue {} not found", q),
Err(BrokerError::Connection(e)) => eprintln!("Connection error: {}", e),
Err(e) => eprintln!("Error: {}", e),
}
| Broker | Crate | Transport Type | Features |
|---|---|---|---|
| Redis | celers-broker-redis |
In-memory | Fast, simple, FIFO/Priority |
| PostgreSQL | celers-broker-postgres |
Database | Durable, transactional |
| RabbitMQ | celers-broker-amqp |
Message broker | Advanced routing, exchanges |
| AWS SQS | celers-broker-sqs |
Cloud queue | Managed, scalable |
| SQL | celers-broker-sql |
Database | Generic SQL support |
use celers_broker_redis::RedisBroker;
use celers_kombu::{Broker, Producer, Consumer};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut broker = RedisBroker::new("redis://localhost:6379", "celery")?;
broker.connect().await?;
// Publish message
let message = Message::new("task".to_string(), Uuid::new_v4(), vec![]);
broker.publish("celery", message).await?;
// Consume message
if let Some(envelope) = broker.consume("celery", Duration::from_secs(5)).await? {
println!("Received: {:?}", envelope.message);
broker.ack(&envelope.delivery_tag).await?;
}
Ok(())
}
use celers_kombu::Producer;
async fn enqueue_tasks(producer: &mut impl Producer) -> Result<()> {
for i in 0..10 {
let message = Message::new(
"process_data".to_string(),
Uuid::new_v4(),
serde_json::to_vec(&i)?,
);
producer.publish("celery", message).await?;
}
Ok(())
}
use celers_kombu::Consumer;
use std::time::Duration;
async fn worker_loop(consumer: &mut impl Consumer) -> Result<()> {
loop {
match consumer.consume("celery", Duration::from_secs(5)).await? {
Some(envelope) => {
// Process message
println!("Processing: {:?}", envelope.message);
// Acknowledge
consumer.ack(&envelope.delivery_tag).await?;
}
None => {
// No messages, continue polling
}
}
}
}
if let Some(envelope) = consumer.consume("celery", timeout).await? {
consumer.ack(&envelope.delivery_tag).await?;
// Process after ack (at-most-once delivery)
process_message(&envelope.message);
}
Pros: Lower latency, simpler Cons: Message loss if processing fails
if let Some(envelope) = consumer.consume("celery", timeout).await? {
// Process first
match process_message(&envelope.message) {
Ok(_) => {
// Acknowledge only on success (at-least-once delivery)
consumer.ack(&envelope.delivery_tag).await?;
}
Err(e) => {
// Reject and requeue on failure
consumer.reject(&envelope.delivery_tag, true).await?;
}
}
}
Pros: No message loss Cons: Possible duplicates, higher latency
const MAX_RETRIES: u32 = 3;
if let Some(envelope) = consumer.consume("celery", timeout).await? {
let retry_count = envelope.message.headers.retries.unwrap_or(0);
match process_message(&envelope.message) {
Ok(_) => {
consumer.ack(&envelope.delivery_tag).await?;
}
Err(_) if retry_count < MAX_RETRIES => {
// Requeue for retry
consumer.reject(&envelope.delivery_tag, true).await?;
}
Err(_) => {
// Max retries reached, don't requeue (send to DLQ)
consumer.reject(&envelope.delivery_tag, false).await?;
}
}
}
// Simple queue name routing
producer.publish("celery", message).await?;
// Routing key pattern: "tasks.high_priority"
producer.publish_with_routing("tasks", "high_priority", message).await?;
// Routing key pattern: "logs.error"
producer.publish_with_routing("logs", "error", message).await?;
// Publish to exchange, all queues receive
producer.publish_with_routing("broadcast", "*", message).await?;
The crate provides a powerful middleware system for message transformation, validation, security, and reliability.
use celers_kombu::{MiddlewareChain, ValidationMiddleware, LoggingMiddleware};
let chain = MiddlewareChain::new()
.with_middleware(Box::new(ValidationMiddleware::new()))
.with_middleware(Box::new(LoggingMiddleware::new("MyApp")));
// Use with producer
producer.publish_with_middleware("celery", message, &chain).await?;
// Use with consumer
if let Some(envelope) = consumer.consume_with_middleware("celery", timeout, &chain).await? {
// Process validated and logged message
}
Validates message structure and size limits:
use celers_kombu::ValidationMiddleware;
let validator = ValidationMiddleware::new()
.with_max_body_size(10 * 1024 * 1024) // 10MB limit
.with_require_task_name(true);
let chain = MiddlewareChain::new()
.with_middleware(Box::new(validator));
Logs message events for debugging:
use celers_kombu::LoggingMiddleware;
let logger = LoggingMiddleware::new("MyApp")
.with_body_logging(); // Enable detailed logging
let chain = MiddlewareChain::new()
.with_middleware(Box::new(logger));
Collects message statistics:
use celers_kombu::MetricsMiddleware;
use std::sync::{Arc, Mutex};
let metrics = Arc::new(Mutex::new(BrokerMetrics::new()));
let metrics_mw = MetricsMiddleware::new(metrics.clone());
let chain = MiddlewareChain::new()
.with_middleware(Box::new(metrics_mw));
// Later, get metrics snapshot
let snapshot = metrics.lock().unwrap().clone();
println!("Published: {}, Consumed: {}",
snapshot.messages_published,
snapshot.messages_consumed);
Enforces maximum retry count:
use celers_kombu::RetryLimitMiddleware;
let retry_limiter = RetryLimitMiddleware::new(3); // Max 3 retries
let chain = MiddlewareChain::new()
.with_middleware(Box::new(retry_limiter));
Controls message publishing rate using token bucket algorithm:
use celers_kombu::RateLimitingMiddleware;
let rate_limiter = RateLimitingMiddleware::new(100.0); // 100 messages/sec
let chain = MiddlewareChain::new()
.with_middleware(Box::new(rate_limiter));
// Automatically enforces rate limit on publish
producer.publish_with_middleware("celery", message, &chain).await?;
Prevents duplicate message processing:
use celers_kombu::DeduplicationMiddleware;
let dedup = DeduplicationMiddleware::new(10_000); // Track 10K message IDs
// Or use default: let dedup = DeduplicationMiddleware::with_default_cache();
let chain = MiddlewareChain::new()
.with_middleware(Box::new(dedup));
// Rejects duplicate messages based on task_id
consumer.consume_with_middleware("celery", timeout, &chain).await?;
Enforces message processing timeouts:
use celers_kombu::TimeoutMiddleware;
use std::time::Duration;
let timeout = TimeoutMiddleware::new(Duration::from_secs(30));
let chain = MiddlewareChain::new()
.with_middleware(Box::new(timeout));
// Timeout metadata injected into message headers
Selectively processes messages based on custom predicates:
use celers_kombu::FilterMiddleware;
let filter = FilterMiddleware::new(|msg| {
msg.headers.task.starts_with("high_priority")
});
let chain = MiddlewareChain::new()
.with_middleware(Box::new(filter));
// Only processes messages matching the predicate
Statistical message sampling for monitoring/testing:
use celers_kombu::SamplingMiddleware;
let sampler = SamplingMiddleware::new(0.1); // Sample 10% of messages
let chain = MiddlewareChain::new()
.with_middleware(Box::new(sampler));
Custom message content transformation:
use celers_kombu::TransformationMiddleware;
let transformer = TransformationMiddleware::new(|msg| {
// Transform message body
msg.body = transform_body(&msg.body);
Ok(())
});
let chain = MiddlewareChain::new()
.with_middleware(Box::new(transformer));
Distributed tracing support with automatic trace ID propagation:
use celers_kombu::TracingMiddleware;
let tracer = TracingMiddleware::new("my-service");
let chain = MiddlewareChain::new()
.with_middleware(Box::new(tracer));
// Injects trace IDs, span IDs, and timestamps for latency analysis
Automatic message batching hints for batch-aware consumers:
use celers_kombu::BatchingMiddleware;
use std::time::Duration;
let batcher = BatchingMiddleware::new(100, Duration::from_secs(5));
let chain = MiddlewareChain::new()
.with_middleware(Box::new(batcher));
// Suggests batching metadata (size: 100, timeout: 5s)
Comprehensive audit logging for compliance:
use celers_kombu::AuditMiddleware;
let auditor = AuditMiddleware::new("audit-system")
.with_body_logging(); // Include message body in audit trail
let chain = MiddlewareChain::new()
.with_middleware(Box::new(auditor));
// Generates unique audit IDs and tracks all operations
Hard deadline enforcement (absolute time-based):
use celers_kombu::DeadlineMiddleware;
use std::time::Duration;
let deadline = DeadlineMiddleware::new(Duration::from_secs(300)); // 5 min deadline
let chain = MiddlewareChain::new()
.with_middleware(Box::new(deadline));
// Rejects messages that exceed their absolute deadline
Content type validation and conversion:
use celers_kombu::ContentTypeMiddleware;
let validator = ContentTypeMiddleware::new()
.with_allowed_types(vec!["application/json", "application/msgpack"])
.with_default_type("application/json");
let chain = MiddlewareChain::new()
.with_middleware(Box::new(validator));
// Validates and enforces content types
Dynamic routing key assignment:
use celers_kombu::RoutingKeyMiddleware;
// From task name
let router = RoutingKeyMiddleware::from_task_name();
// From task and priority
let router = RoutingKeyMiddleware::from_task_and_priority();
// Custom routing logic
let router = RoutingKeyMiddleware::new(|msg| {
format!("tasks.{}.{}", msg.headers.task, msg.headers.priority)
});
let chain = MiddlewareChain::new()
.with_middleware(Box::new(router));
Exactly-once message processing guarantee:
use celers_kombu::IdempotencyMiddleware;
let idempotency = IdempotencyMiddleware::new(10_000); // Track 10K message IDs
// Or use default: IdempotencyMiddleware::with_default_cache();
let chain = MiddlewareChain::new()
.with_middleware(Box::new(idempotency));
// Tracks processed messages to prevent duplicate processing on retries
// Sets x-already-processed header to true for duplicates
Automatic retry backoff calculation with jitter:
use celers_kombu::BackoffMiddleware;
use std::time::Duration;
let backoff = BackoffMiddleware::new(
Duration::from_secs(1), // Initial delay
Duration::from_secs(300), // Max delay (5 min)
2.0, // Multiplier
);
// Or use defaults: BackoffMiddleware::with_defaults();
let chain = MiddlewareChain::new()
.with_middleware(Box::new(backoff));
// Calculates exponential backoff with 0-25% jitter
// Injects x-backoff-delay and x-next-retry headers
The following middleware require enabling feature flags in Cargo.toml:
Compresses message bodies (requires compression feature):
[dependencies]
celers-kombu = { version = "0.1", features = ["compression"] }
#[cfg(feature = "compression")]
use celers_kombu::CompressionMiddleware;
#[cfg(feature = "compression")]
use celers_protocol::compression::CompressionType;
#[cfg(feature = "compression")]
{
let compressor = CompressionMiddleware::new(CompressionType::Gzip)
.with_min_size(1024) // Only compress messages > 1KB
.with_level(6); // Compression level 1-9
let chain = MiddlewareChain::new()
.with_middleware(Box::new(compressor));
}
Signs messages with HMAC-SHA256 (requires signing feature):
[dependencies]
celers-kombu = { version = "0.1", features = ["signing"] }
#[cfg(feature = "signing")]
use celers_kombu::SigningMiddleware;
#[cfg(feature = "signing")]
{
let secret_key = b"your-secret-key-min-32-bytes-long!!!";
let signer = SigningMiddleware::new(secret_key);
let chain = MiddlewareChain::new()
.with_middleware(Box::new(signer));
}
Encrypts messages with AES-256-GCM (requires encryption feature):
[dependencies]
celers-kombu = { version = "0.1", features = ["encryption"] }
#[cfg(feature = "encryption")]
use celers_kombu::EncryptionMiddleware;
#[cfg(feature = "encryption")]
{
let encryption_key = b"32-byte-secret-key-for-aes-256!!"; // Must be 32 bytes
let encryptor = EncryptionMiddleware::new(encryption_key)?;
let chain = MiddlewareChain::new()
.with_middleware(Box::new(encryptor));
}
[dependencies]
celers-kombu = { version = "0.1", features = ["full"] }
Create a complete middleware pipeline:
use celers_kombu::*;
use std::sync::{Arc, Mutex};
let metrics = Arc::new(Mutex::new(BrokerMetrics::new()));
let chain = MiddlewareChain::new()
// Validation first
.with_middleware(Box::new(ValidationMiddleware::new()))
// Rate limiting
.with_middleware(Box::new(RateLimitingMiddleware::new(100.0)))
// Deduplication
.with_middleware(Box::new(DeduplicationMiddleware::with_default_cache()))
// Logging
.with_middleware(Box::new(LoggingMiddleware::new("MyApp")))
// Metrics collection
.with_middleware(Box::new(MetricsMiddleware::new(metrics.clone())))
// Retry limit
.with_middleware(Box::new(RetryLimitMiddleware::new(3)));
// Optional: Add compression, signing, encryption (if features enabled)
#[cfg(feature = "compression")]
let chain = chain.with_middleware(Box::new(
CompressionMiddleware::new(CompressionType::Gzip)
));
#[cfg(feature = "signing")]
let chain = chain.with_middleware(Box::new(
SigningMiddleware::new(b"secret-key")
));
#[cfg(feature = "encryption")]
let chain = chain.with_middleware(Box::new(
EncryptionMiddleware::new(b"32-byte-encryption-key-here!!!!!")?
));
Handle failed messages with automatic retry tracking:
use celers_kombu::{DlqConfig, DeadLetterQueue};
// Configure DLQ
let dlq_config = DlqConfig::new("my-queue-dlq")
.with_max_retries(3)
.with_ttl(Duration::from_secs(86400)) // 24 hours
.with_metadata("reason", "processing_failed");
// Send to DLQ
broker.send_to_dlq(&message, &dlq_config).await?;
// Retrieve from DLQ
if let Some(msg) = broker.get_from_dlq("my-queue-dlq", None).await? {
println!("Found failed message: {:?}", msg);
}
// Retry from DLQ
broker.retry_from_dlq("my-queue-dlq", &message_id, "my-queue").await?;
// Get DLQ statistics
let stats = broker.dlq_stats("my-queue-dlq").await?;
println!("DLQ has {} messages, oldest: {}s",
stats.message_count,
stats.oldest_message_age_secs().unwrap_or(0));
// Purge DLQ
broker.purge_dlq("my-queue-dlq").await?;
ACID guarantees for message operations:
use celers_kombu::{MessageTransaction, IsolationLevel};
// Begin transaction with isolation level
let tx_id = broker.begin_transaction(IsolationLevel::ReadCommitted).await?;
// Publish within transaction
broker.publish_transactional(&tx_id, "queue", message1).await?;
broker.publish_transactional(&tx_id, "queue", message2).await?;
// Consume within transaction
if let Some(env) = broker.consume_transactional(&tx_id, "queue", timeout).await? {
// Process message
}
// Commit or rollback
if success {
broker.commit_transaction(&tx_id).await?;
} else {
broker.rollback_transaction(&tx_id).await?;
}
// Check transaction state
let state = broker.transaction_state(&tx_id).await?;
Isolation Levels:
ReadUncommitted: Dirty reads allowedReadCommitted: Only committed data visibleRepeatableRead: Consistent snapshotSerializable: Full isolationDelay message delivery with flexible timing:
use celers_kombu::{ScheduleConfig, MessageScheduler};
// Schedule with delay
let schedule = ScheduleConfig::delay(Duration::from_secs(3600)); // 1 hour delay
// Schedule at absolute time
let schedule = ScheduleConfig::at(SystemTime::now() + Duration::from_secs(7200));
// Schedule with execution window
let schedule = ScheduleConfig::delay(Duration::from_secs(60))
.with_window(Duration::from_secs(30)); // Allow ±30s variance
// Schedule message
let scheduled_id = broker.schedule_message("queue", message, &schedule).await?;
// Cancel scheduled message
broker.cancel_scheduled(&scheduled_id).await?;
// List scheduled messages
let scheduled = broker.list_scheduled("queue").await?;
for msg in scheduled {
println!("Message {} scheduled for {:?}", msg.message_id, msg.scheduled_time);
}
// Check if ready for delivery
if schedule.is_ready() {
println!("Message is ready for delivery");
}
Load-balanced distributed consumption:
use celers_kombu::{ConsumerGroup, ConsumerGroupConfig};
// Configure consumer group
let config = ConsumerGroupConfig::new("my-group")
.with_max_consumers(10)
.with_heartbeat_interval(Duration::from_secs(30))
.with_rebalance_timeout(Duration::from_secs(60));
// Join group
let consumer_id = broker.join_group(&config).await?;
println!("Joined group as: {}", consumer_id);
// Send heartbeat (keep membership alive)
broker.heartbeat(&consumer_id).await?;
// Consume with automatic load balancing
if let Some(envelope) = broker.consume_from_group("queue", &consumer_id, timeout).await? {
println!("Received: {:?}", envelope);
}
// Get group members
let members = broker.group_members("my-group").await?;
println!("Group has {} consumers", members.len());
// Leave group
broker.leave_group(&consumer_id).await?;
Debug and recover with historical message replay:
use celers_kombu::{ReplayConfig, MessageReplay};
// Replay last hour
let config = ReplayConfig::from_duration(Duration::from_secs(3600));
// Replay from specific timestamp
let config = ReplayConfig::from_timestamp(SystemTime::now() - Duration::from_secs(7200));
// Replay with limits
let config = ReplayConfig::from_duration(Duration::from_secs(3600))
.with_max_messages(1000)
.with_speed(2.0); // 2x speed
// Begin replay session
let session_id = broker.begin_replay("queue", &config).await?;
// Replay messages
loop {
match broker.replay_next(&session_id).await? {
Some(envelope) => {
println!("Replaying: {:?}", envelope);
}
None => break,
}
}
// Track progress
let progress = broker.replay_progress(&session_id).await?;
println!("Replay {}% complete", progress.completion_percent());
// Stop replay
broker.stop_replay(&session_id).await?;
Resource limits with flexible enforcement:
use celers_kombu::{QuotaConfig, QuotaManager, QuotaEnforcement};
// Configure quotas
let quota = QuotaConfig::new()
.with_max_messages(10_000)
.with_max_bytes(100 * 1024 * 1024) // 100 MB
.with_max_rate(100.0) // 100 messages/sec
.with_max_per_consumer(50)
.with_enforcement(QuotaEnforcement::Throttle);
// Set quota
broker.set_quota("queue", quota).await?;
// Check quota before operation
match broker.check_quota("queue", message_size).await? {
Ok(_) => {
broker.publish("queue", message).await?;
}
Err(e) => println!("Quota exceeded: {}", e),
}
// Get quota usage
let usage = broker.quota_usage("queue").await?;
println!("Message quota: {}%", usage.message_usage_percent());
println!("Byte quota: {}%", usage.byte_usage_percent());
println!("Rate quota: {}%", usage.rate_usage_percent());
if usage.is_message_quota_exceeded() {
println!("WARNING: Message quota exceeded!");
}
// Reset quota
broker.reset_quota("queue").await?;
Enforcement Policies:
Reject: Reject operations exceeding quotaThrottle: Slow down operationsWarn: Log warnings but allowAutomatic flow control based on queue metrics:
use celers_kombu::BackpressureConfig;
let backpressure = BackpressureConfig::new()
.with_max_pending(1000)
.with_max_queue_size(10_000)
.with_high_watermark(0.8) // 80%
.with_low_watermark(0.2); // 20%
// Check if backpressure should be applied
if backpressure.should_apply_backpressure(pending, queue_size) {
println!("Applying backpressure - queue is full");
tokio::time::sleep(Duration::from_millis(100)).await;
}
// Check if backpressure can be released
if backpressure.should_release_backpressure(pending, queue_size) {
println!("Releasing backpressure - queue drained");
}
Prevent infinite retry loops:
use celers_kombu::PoisonMessageDetector;
let detector = PoisonMessageDetector::new()
.with_max_failures(5)
.with_failure_window(Duration::from_secs(300)); // 5 min window
// Record failure
detector.record_failure(&message_id);
// Check if poison
if detector.is_poison(&message_id) {
println!("Poison message detected! Sending to DLQ...");
broker.send_to_dlq(&message, &dlq_config).await?;
detector.clear_failures(&message_id);
} else {
let failures = detector.failure_count(&message_id);
println!("Message has {} failures", failures);
}
// Clear all tracking
detector.clear_all();
47 helper functions for broker operations, optimization, and monitoring:
use celers_kombu::utils;
// Calculate optimal batch size
let batch_size = utils::calculate_optimal_batch_size(
1024, // avg_message_size
10_000, // target_throughput
100, // processing_time_ms
);
// Calculate optimal workers
let workers = utils::calculate_optimal_workers(
10_000, // queue_size
100, // processing_rate per worker
3600, // target_drain_time_secs
);
// Analyze broker performance
let (success_rate, error_rate, ack_rate) = utils::analyze_broker_performance(&metrics);
println!("Success: {:.1}%, Errors: {:.1}%", success_rate, error_rate);
// Calculate throughput
let throughput = utils::calculate_throughput(messages_count, duration_secs);
println!("Throughput: {:.1} msgs/sec", throughput);
// Analyze circuit breaker
let action = utils::analyze_circuit_breaker(&stats, &config);
println!("Recommended action: {:?}", action);
// Analyze queue health
let health = utils::analyze_queue_health(queue_size, high_watermark, low_watermark);
println!("Queue health: {:?}", health); // Healthy, Warning, Critical
// Estimate drain time
let drain_time = utils::estimate_drain_time(queue_size, consumption_rate);
println!("Queue will drain in {:.1} seconds", drain_time);
// Estimate memory usage
let memory_mb = utils::estimate_queue_memory(queue_size, avg_message_size);
println!("Queue using ~{:.1} MB", memory_mb);
// Anomaly detection
let (is_anomaly, severity, description) = utils::detect_anomalies(
¤t_rates,
&baseline_rates,
2.0, // threshold_multiplier
);
// SLA compliance
let (compliance_pct, violations, avg_time) = utils::calculate_sla_compliance(
&processing_times,
target_ms,
);
// Error budget tracking
let (budget_remaining, errors_allowed, hours_to_exhaustion) = utils::calculate_error_budget(
99.9, // sla_target
total_requests,
failed_requests,
requests_per_hour,
);
// Cost estimation
let monthly_cost = utils::estimate_infrastructure_cost(
messages_per_day,
cost_per_million,
30,
);
// Queue saturation prediction
let (hours_to_saturation, growth_rate) = utils::predict_queue_saturation(
&queue_sizes,
max_capacity,
hours_per_sample,
);
See examples for comprehensive usage: cargo run --example monitoring and cargo run --example operational_excellence.
use std::time::Duration;
let timeout = Duration::from_secs(5);
match consumer.consume("celery", timeout).await {
Ok(Some(envelope)) => { /* process */ }
Ok(None) => println!("No messages, continuing..."),
Err(BrokerError::Timeout) => println!("Timed out, retrying..."),
Err(e) => return Err(e),
}
use tokio::signal;
async fn worker_with_shutdown(consumer: &mut impl Consumer) -> Result<()> {
loop {
tokio::select! {
result = consumer.consume("celery", Duration::from_secs(5)) => {
if let Some(envelope) = result? {
process_message(&envelope.message)?;
consumer.ack(&envelope.delivery_tag).await?;
}
}
_ = signal::ctrl_c() => {
println!("Shutting down gracefully...");
break;
}
}
}
Ok(())
}
// Create connection pool
let pool = create_broker_pool(config)?;
// Acquire from pool
let mut broker = pool.acquire().await?;
broker.publish("celery", message).await?;
// Return to pool (automatic)
drop(broker);
async fn monitor_queues(consumer: &mut impl Consumer) -> Result<()> {
let size = consumer.queue_size("celery").await?;
if size > 1000 {
println!("WARNING: Queue backlog detected ({} messages)", size);
// Trigger alert or scale workers
}
Ok(())
}
// High-priority queue
broker.create_queue("high_priority", QueueMode::Fifo).await?;
// Normal queue
broker.create_queue("celery", QueueMode::Fifo).await?;
// Low-priority queue
broker.create_queue("low_priority", QueueMode::Fifo).await?;
// Publish to appropriate queue
if is_urgent {
producer.publish("high_priority", message).await?;
} else {
producer.publish("celery", message).await?;
}
#[cfg(test)]
mod tests {
use super::*;
struct MockBroker {
connected: bool,
messages: Vec<Message>,
}
#[async_trait]
impl Transport for MockBroker {
async fn connect(&mut self) -> Result<()> {
self.connected = true;
Ok(())
}
async fn disconnect(&mut self) -> Result<()> {
self.connected = false;
Ok(())
}
fn is_connected(&self) -> bool {
self.connected
}
fn name(&self) -> &str {
"mock"
}
}
#[tokio::test]
async fn test_connection() {
let mut broker = MockBroker {
connected: false,
messages: vec![],
};
assert!(!broker.is_connected());
broker.connect().await.unwrap();
assert!(broker.is_connected());
}
}
The crate includes 11 comprehensive examples demonstrating all features:
# Complete broker implementation
cargo run --example basic_broker
# Middleware usage patterns
cargo run --example middleware_usage
# Batch operations
cargo run --example batch_operations
# Dead Letter Queue (DLQ)
cargo run --example dlq_usage
# Message transactions with ACID guarantees
cargo run --example transactions
# Scheduling, consumer groups, replay, quotas
cargo run --example advanced_features
# Backpressure, poison detection, timeout, filter
cargo run --example flow_control
# Circuit breaker, connection pooling, health checks
cargo run --example circuit_breaker
# 47 utility functions showcase
cargo run --example utilities_showcase
# Production monitoring and observability
cargo run --example monitoring
# Idempotency, backoff, anomaly detection, SLA tracking, error budgets
cargo run --example operational_excellence
Each example includes detailed comments and demonstrates best practices for production use.
celers-protocol - Message format definitionscelers-broker-redis, celers-broker-postgres, etc.celers-core - Task executionMIT OR Apache-2.0