| Crates.io | celers-broker-sqs |
| lib.rs | celers-broker-sqs |
| version | 0.1.0 |
| created_at | 2026-01-18 15:32:19.452541+00 |
| updated_at | 2026-01-18 15:32:19.452541+00 |
| description | AWS SQS broker implementation for CeleRS |
| homepage | |
| repository | https://github.com/cool-japan/celers |
| max_upload_size | |
| id | 2052498 |
| size | 838,260 |
Production-ready AWS SQS broker implementation for CeleRS with batch operations, FIFO queues, CloudWatch integration, and comprehensive cost optimization.
Cloud-native message broker using AWS SQS with:
| Feature | Standard Queue | FIFO Queue |
|---|---|---|
| Throughput | Nearly Unlimited | 300-3000 TPS |
| Ordering | Best-effort | Strict FIFO |
| Deduplication | ❌ | ✅ (5 min window) |
| Latency | ~10-100ms | ~10-100ms |
| Cost (per 1M) | $0.40 | $0.50 |
| Use Case | High throughput | Ordered processing |
use celers_broker_sqs::SqsBroker;
use celers_kombu::{Broker, Consumer, Producer, Transport};
use celers_protocol::builder::MessageBuilder;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create and connect
let mut broker = SqsBroker::new("my-queue").await?;
broker.connect().await?;
// Publish a message
let message = MessageBuilder::new("tasks.process_data")
.kwarg("user_id", serde_json::json!(123))
.build()?;
broker.publish("my-queue", message).await?;
// Consume messages
if let Some(envelope) = broker
.consume("my-queue", std::time::Duration::from_secs(20))
.await?
{
println!("Received: {:?}", envelope.message.task_name());
broker.ack(&envelope.delivery_tag).await?;
}
Ok(())
}
// Use cost-optimized preset for production
let mut broker = SqsBroker::cost_optimized("my-queue").await?;
broker.connect().await?;
// Or configure manually
let broker = SqsBroker::new("my-queue")
.await?
.with_wait_time(20) // Enable long polling (20s max)
.with_max_messages(10) // Receive up to 10 messages at once
.with_adaptive_polling(
AdaptivePollingConfig::new(PollingStrategy::ExponentialBackoff)
);
| Operation | Individual | Batch (10) | Cost Reduction |
|---|---|---|---|
| Publish | 10 API calls | 1 API call | 90% |
| Consume | 10 API calls | 1 API call | 90% |
| Acknowledge | 10 API calls | 1 API call | 90% |
// Batch publish (up to 10 messages per call)
let messages: Vec<_> = (1..=10)
.map(|i| MessageBuilder::new("task").kwarg("id", serde_json::json!(i)).build())
.collect::<Result<Vec<_>, _>>()?;
broker.publish_batch("my-queue", messages).await?;
// Batch consume (up to 10 messages per call)
let envelopes = broker
.consume_batch("my-queue", 10, Duration::from_secs(20))
.await?;
// Batch acknowledge
let tags: Vec<String> = envelopes.iter().map(|e| e.delivery_tag.clone()).collect();
broker.ack_batch("my-queue", tags).await?;
// Auto-chunking for large batches
let large_batch = create_many_messages(25);
broker.publish_batch_chunked("my-queue", large_batch).await?; // Automatically chunks into 10+10+5
use celers_broker_sqs::FifoConfig;
// Create FIFO queue with content-based deduplication
let fifo_config = FifoConfig::new()
.with_content_based_deduplication(true)
.with_high_throughput(true); // 3000 TPS per message group
let mut broker = SqsBroker::new("my-queue.fifo")
.await?
.with_fifo(fifo_config);
broker.connect().await?;
// Publish to FIFO queue
let message = MessageBuilder::new("tasks.ordered_processing")
.kwarg("sequence", serde_json::json!(1))
.build()?;
broker
.publish_fifo("my-queue.fifo", message, "group-1", None)
.await?;
// Get DLQ ARN
let dlq_arn = broker.get_queue_arn("my-dlq").await?;
// Configure DLQ for main queue (move to DLQ after 3 failed attempts)
broker.set_redrive_policy("my-queue", &dlq_arn, 3).await?;
// Monitor DLQ
let dlq_messages = broker.get_dlq_messages(10).await?;
println!("DLQ has {} messages", dlq_messages.len());
// Redrive message back to main queue
for envelope in dlq_messages {
broker.redrive_dlq_message(&envelope.delivery_tag).await?;
}
use celers_broker_sqs::CloudWatchConfig;
// Enable CloudWatch metrics
let cw_config = CloudWatchConfig::new("CeleRS/SQS")
.with_dimension("Environment", "production")
.with_dimension("Application", "my-app");
let mut broker = SqsBroker::new("my-queue")
.await?
.with_cloudwatch(cw_config);
broker.connect().await?;
// Publish metrics to CloudWatch
broker.publish_metrics("my-queue").await?;
Publishes these metrics:
ApproximateNumberOfMessages - Messages in queueApproximateNumberOfMessagesNotVisible - Messages being processedApproximateNumberOfMessagesDelayed - Delayed messagesApproximateAgeOfOldestMessage - Age of oldest message (seconds)use celers_broker_sqs::AlarmConfig;
// Create alarm for high queue depth
let alarm = AlarmConfig::queue_depth_alarm(
"HighQueueDepth",
"my-queue",
1000.0 // Alert when > 1000 messages
).with_alarm_action("arn:aws:sns:us-east-1:123456789012:alerts");
broker.create_alarm(alarm).await?;
// Create alarm for old messages (backlog detection)
let alarm = AlarmConfig::message_age_alarm(
"OldMessages",
"my-queue",
600.0 // Alert when oldest message > 10 minutes
).with_alarm_action("arn:aws:sns:us-east-1:123456789012:alerts");
broker.create_alarm(alarm).await?;
// List all alarms
let alarms = broker.list_alarms("my-queue").await?;
use celers_broker_sqs::{AdaptivePollingConfig, PollingStrategy};
// Exponential backoff: saves costs when queue is idle
let config = AdaptivePollingConfig::new(PollingStrategy::ExponentialBackoff)
.with_min_wait_time(1)
.with_max_wait_time(20)
.with_backoff_multiplier(2.0);
let mut broker = SqsBroker::new("my-queue")
.await?
.with_adaptive_polling(config);
broker.connect().await?;
Polling Strategies:
Fixed - Uses configured wait time (default)ExponentialBackoff - Increases wait time when queue is empty, resets when messages arriveAdaptive - Decreases wait time when busy, increases after 3+ consecutive empty receivesuse std::time::Duration;
// Process up to 10 messages concurrently
let processed = broker
.consume_parallel(
"my-queue",
10,
Duration::from_secs(20),
|envelope| async move {
// Your async processing logic
process_message(&envelope.message).await?;
Ok(())
},
)
.await?;
println!("Processed {} messages in parallel", processed);
// Enable compression for messages > 10 KB
let mut broker = SqsBroker::new("my-queue")
.await?
.with_compression(10 * 1024);
broker.connect().await?;
// Large messages are automatically compressed/decompressed
let large_payload = "x".repeat(50_000); // 50 KB
let message = MessageBuilder::new("tasks.process_data")
.kwarg("data", serde_json::json!(large_payload))
.build()?;
broker.publish("my-queue", message).await?; // Automatically compressed
// Production: optimized for reliability and performance
let broker = SqsBroker::production("my-queue").await?;
// - Long polling: 20s
// - Max messages: 10 (batch receiving)
// - Message retention: 14 days
// - Visibility timeout: 30s
// Development: quick feedback, short retention
let broker = SqsBroker::development("my-queue").await?;
// - Wait time: 5s
// - Message retention: 1 hour
// - Visibility timeout: 30s
// Cost-optimized: 90%+ cost reduction
let broker = SqsBroker::cost_optimized("my-queue").await?;
// - Adaptive polling (exponential backoff)
// - Max messages: 10 (batch receiving)
// - Long polling: 20s
use celers_broker_sqs::SseConfig;
// SQS-managed encryption (easiest)
let sse_config = SseConfig::sqs_managed();
let mut broker = SqsBroker::new("my-queue")
.await?
.with_sse(sse_config);
// KMS encryption (more control)
let kms_config = SseConfig::kms("alias/my-key")
.with_data_key_reuse_period(300); // 5 minutes
let mut broker = SqsBroker::new("my-queue")
.await?
.with_sse(kms_config);
// Get queue size
let size = broker.queue_size("my-queue").await?;
println!("Queue has {} messages", size);
// Get detailed statistics
let stats = broker.get_queue_stats("my-queue").await?;
println!("Messages: {}", stats.approximate_message_count);
println!("In flight: {}", stats.approximate_not_visible_count);
if let Some(age) = stats.approximate_age_of_oldest_message {
println!("Oldest message: {}s", age);
}
// List all queues
let queues = broker.list_queues().await?;
// Purge queue (delete all messages)
broker.purge("my-queue").await?;
// Health check (for Kubernetes probes)
let is_healthy = broker.health_check("my-queue").await?;
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"sqs:SendMessage",
"sqs:SendMessageBatch",
"sqs:ReceiveMessage",
"sqs:DeleteMessage",
"sqs:DeleteMessageBatch",
"sqs:ChangeMessageVisibility",
"sqs:GetQueueUrl",
"sqs:GetQueueAttributes"
],
"Resource": "arn:aws:sqs:*:*:my-queue*"
}
]
}
For queue management:
{
"Action": [
"sqs:CreateQueue",
"sqs:DeleteQueue",
"sqs:PurgeQueue",
"sqs:ListQueues",
"sqs:SetQueueAttributes",
"sqs:TagQueue",
"sqs:UntagQueue"
]
}
For CloudWatch:
{
"Action": ["cloudwatch:PutMetricData"],
"Resource": "*",
"Condition": {
"StringEquals": {"cloudwatch:namespace": "CeleRS/SQS"}
}
}
For CloudWatch Alarms:
{
"Action": [
"cloudwatch:PutMetricAlarm",
"cloudwatch:DeleteAlarms",
"cloudwatch:DescribeAlarms"
],
"Resource": "arn:aws:cloudwatch:*:*:alarm:*"
}
The broker uses AWS SDK's credential chain:
AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)~/.aws/credentials)Recommendation: Use IAM roles in production for enhanced security.
Protect against cascading AWS API failures:
use celers_broker_sqs::circuit_breaker::CircuitBreaker;
let circuit_breaker = CircuitBreaker::new(5, Duration::from_secs(30));
// Execute operations with circuit breaker protection
let result = circuit_breaker.call(|| async {
broker.publish("my-queue", message).await
}).await;
// Check circuit state
let stats = circuit_breaker.stats();
println!("Circuit state: {:?}", stats.state);
Monitor and control AWS SQS costs:
use celers_broker_sqs::cost_tracker::CostTracker;
let mut tracker = CostTracker::new();
// Track operations
tracker.track_publish(false, 1); // Standard queue, 1 message
tracker.track_consume(10); // 10 messages consumed
// Get cost summary
let summary = tracker.summary();
println!("Total cost: ${:.4}", summary.total_cost_usd);
println!("Monthly projection: ${:.2}", summary.monthly_projection_usd);
// Check batch savings
println!("Batch savings: ${:.4}", summary.batch_savings_usd);
Dynamically optimize batch sizes:
use celers_broker_sqs::batch_optimizer::{BatchOptimizer, OptimizerGoal};
let optimizer = BatchOptimizer::new(OptimizerGoal::MinimizeCost);
// Get recommendation based on queue metrics
let recommendation = optimizer.recommend(
1000, // Queue depth
50, // Avg message size (KB)
100, // Avg processing time (ms)
4 // Number of workers
);
println!("Recommended batch size: {}", recommendation.recommended_batch_size);
println!("Reason: {}", recommendation.reasoning);
Track messages across services:
use celers_broker_sqs::tracing_util::{TraceContext, generate_correlation_id};
// Create trace context
let trace_ctx = TraceContext::new()
.with_correlation_id(generate_correlation_id())
.with_xray_trace_id()
.with_parent_span_id("parent-123");
// Attach to message attributes
let attrs = trace_ctx.to_message_attributes();
// Track message flow
let mut flow_tracker = MessageFlowTracker::new();
flow_tracker.record_publish(&trace_ctx, "queue-1");
flow_tracker.record_consume(&trace_ctx, "queue-1");
Control API usage and budgets:
use celers_broker_sqs::quota_manager::{QuotaManager, QuotaConfig};
let config = QuotaConfig::new()
.with_daily_budget(10.0) // $10/day max
.with_monthly_budget(200.0) // $200/month max
.with_per_second_limit(100); // 100 requests/sec max
let mut quota = QuotaManager::new(config);
// Check before operations
if quota.check_request(0.0004).await? {
broker.publish("my-queue", message).await?;
} else {
println!("Quota exceeded!");
}
// Get quota status
let status = quota.status();
println!("Daily spend: ${:.2}", status.daily_spend_usd);
Route messages to multiple queues:
use celers_broker_sqs::router::{MessageRouter, RoutingRule, RoutingStrategy};
let mut router = MessageRouter::new();
// Priority-based routing
router.add_rule(RoutingRule::priority_based(
"high-priority-queue",
RoutingStrategy::PriorityThreshold(8)
));
// Pattern-based routing
router.add_rule(RoutingRule::task_pattern(
"analytics-queue",
"tasks.analytics.*"
));
// Route message
let queue = router.route(&message)?;
broker.publish(queue, message).await?;
Track and analyze operation latencies:
use celers_broker_sqs::profiler::PerformanceProfiler;
let mut profiler = PerformanceProfiler::new();
// Record operations
let start = Instant::now();
broker.publish("my-queue", message).await?;
profiler.record_publish(start.elapsed());
// Get statistics
let stats = profiler.summary();
for (op, stat) in stats {
println!("{}: P50={:.2}ms, P95={:.2}ms, P99={:.2}ms",
op, stat.p50_ms, stat.p95_ms, stat.p99_ms);
}
// Detect bottlenecks
let bottlenecks = profiler.detect_bottlenecks(100); // 100ms threshold
if !bottlenecks.is_empty() {
println!("Bottlenecks: {}", bottlenecks.join(", "));
}
Prevent system overload with automatic throttling:
use celers_broker_sqs::backpressure::{BackpressureManager, BackpressureConfig};
use std::time::Duration;
let config = BackpressureConfig::new()
.with_max_in_flight_messages(100)
.with_max_processing_time(Duration::from_secs(30))
.with_throttle_threshold(0.8) // Start throttling at 80%
.with_stop_threshold(0.95); // Stop at 95%
let mut manager = BackpressureManager::new(config);
// Check before consuming
if manager.should_consume() {
// Safe to process more messages
manager.track_message_start("msg-1");
// ... process message ...
manager.track_message_complete("msg-1");
}
// Get metrics
let metrics = manager.metrics();
println!("Utilization: {:.1}%", metrics.utilization * 100.0);
Automatically detect and isolate repeatedly failing messages:
use celers_broker_sqs::poison_detector::{PoisonDetector, PoisonConfig};
use std::time::Duration;
let config = PoisonConfig::new()
.with_max_failures(3)
.with_failure_window(Duration::from_secs(300)) // 5 minutes
.with_auto_isolate(true);
let mut detector = PoisonDetector::new(config);
// Track failures
if detector.track_failure("msg-123", "Database timeout") {
println!("Poison message detected!");
}
// Get statistics
let stats = detector.statistics();
println!("Poison messages: {}", stats.poison_message_count);
// Analyze error patterns
for pattern in detector.analyze_error_patterns() {
println!("{}: {} occurrences", pattern.pattern, pattern.count);
}
Monitor costs with automatic alerts:
use celers_broker_sqs::cost_alerts::{CostAlertSystem, CostAlertConfig};
use std::sync::Arc;
let config = CostAlertConfig::new()
.with_daily_warning_threshold(5.0)
.with_daily_critical_threshold(10.0)
.with_monthly_warning_threshold(100.0);
let mut alert_system = CostAlertSystem::new(config);
// Register callback
alert_system.register_callback(Arc::new(|alert| {
println!("COST ALERT: {} - ${:.2}", alert.level, alert.amount_usd);
}));
// Track costs
alert_system.track_cost(0.0004); // Per-request cost
// Check budget
if !alert_system.is_within_budget() {
println!("Budget exceeded!");
}
Replay messages from DLQ with selective filtering:
use celers_broker_sqs::replay::{ReplayManager, ReplayConfig, ReplayFilter};
use std::time::Duration;
// Configure replay
let config = ReplayConfig::new()
.with_batch_size(10)
.with_rate_limit(100) // 100 messages/sec max
.with_retry_failed(true);
let manager = ReplayManager::new(config);
// Create selective filter
let filter = ReplayFilter::new()
.with_time_range_hours(1)
.with_task_pattern("tasks.payment.*")
.with_min_failure_count(3);
// Messages matching the filter can be replayed
Track SLA compliance in real-time:
use celers_broker_sqs::sla_monitor::{SlaMonitor, SlaTarget};
use std::time::Duration;
// Define SLA targets
let targets = SlaTarget::production(); // or SlaTarget::critical()
let mut monitor = SlaMonitor::new(targets);
// Record message processing
monitor.record_message(Duration::from_millis(150), true);
// Generate compliance report
let report = monitor.generate_report();
println!("SLA Compliance: {:.2}%", report.overall_compliance * 100.0);
println!("P99 Latency: {:?}", report.current_p99);
// Check for violations
for violation in report.violations {
println!("Violation: {:?}", violation.violation_type);
}
Process SQS events in AWS Lambda functions:
use celers_broker_sqs::lambda_helpers::LambdaEventProcessor;
let processor = LambdaEventProcessor::new()
.with_max_retries(2);
// Process Lambda SQS event
let results = processor.process_event(event_json, |record| {
// Your message processing logic
println!("Processing: {}", record.message_id);
Ok(())
})?;
// Return partial batch response for failed messages
if results.has_failures() {
let response = results.to_batch_item_failures();
return serde_json::to_string(&response);
}
See the examples/ directory for comprehensive examples:
basic_usage.rs - Basic operations (publish, consume, batch, queue management)advanced_features.rs - FIFO, DLQ, SSE, CloudWatch, adaptive polling, parallel processingcost_optimization.rs - Cost reduction strategies and comparisonsmonitoring_alarms.rs - CloudWatch metrics and alarms setupmonitoring_utilities.rs - Monitoring utilities (lag analysis, velocity, health assessment)production_optimization.rs - Auto-tuning and optimization for different workloadsproduction_suite.rs - Complete production setup with all featuresquota_management.rs - API quota and budget managementrouting_patterns.rs - Multi-queue routing and message distributiondistributed_tracing.rs - Distributed tracing with correlation IDsadvanced_production_features.rs - Backpressure, poison detection, cost alerts integration ✨ NEWlambda_sqs_handler.rs - AWS Lambda SQS event processing example ✨ NEWreplay_and_sla.rs - Message replay and SLA monitoring examples ✨ NEWRun examples:
# Core features
cargo run --example basic_usage
cargo run --example advanced_features
cargo run --example cost_optimization
cargo run --example monitoring_alarms
# Production features
cargo run --example monitoring_utilities
cargo run --example production_optimization
cargo run --example production_suite
cargo run --example quota_management
cargo run --example routing_patterns
cargo run --example distributed_tracing
cargo run --example advanced_production_features # NEW
cargo run --example lambda_sqs_handler # NEW
cargo run --example replay_and_sla # NEW
See benches/sqs_benchmarks.rs for performance benchmarks:
cargo bench --bench sqs_benchmarks
Benchmarks include:
# Run unit tests
cargo test
# Run with AWS credentials for integration tests
export AWS_ACCESS_KEY_ID=your_key
export AWS_SECRET_ACCESS_KEY=your_secret
export AWS_REGION=us-east-1
cargo test --features integration
# Check for warnings
cargo clippy --all-targets -- -D warnings
MIT OR Apache-2.0
Contributions are welcome! Please see the main CeleRS repository for contribution guidelines.