| Crates.io | celers-broker-sql |
| lib.rs | celers-broker-sql |
| version | 0.1.0 |
| created_at | 2026-01-18 15:43:27.851447+00 |
| updated_at | 2026-01-18 15:43:27.851447+00 |
| description | SQL database broker implementation for CeleRS (MySQL) |
| homepage | |
| repository | https://github.com/cool-japan/celers |
| max_upload_size | |
| id | 2052521 |
| size | 883,343 |
MySQL database broker implementation for CeleRS - a high-performance Celery-compatible task queue framework for Rust.
FOR UPDATE SKIP LOCKED for distributed workersenqueue_at and enqueue_aftermetrics feature)FOR UPDATE SKIP LOCKED support)Add to your Cargo.toml:
[dependencies]
celers-broker-sql = "0.1"
celers-core = "0.1"
# Optional: Enable Prometheus metrics
# celers-broker-sql = { version = "0.1", features = ["metrics"] }
CREATE DATABASE celers;
use celers_broker_sql::MysqlBroker;
use celers_core::Broker;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create broker
let broker = MysqlBroker::new("mysql://user:pass@localhost/celers").await?;
// Run migrations (creates tables and indexes)
broker.migrate().await?;
Ok(())
}
use celers_core::SerializedTask;
// Create a task
let task = SerializedTask::new("my_task".to_string(), vec![1, 2, 3, 4]);
// Enqueue it
let task_id = broker.enqueue(task).await?;
println!("Enqueued task: {}", task_id);
// Dequeue a task
if let Some(message) = broker.dequeue().await? {
let task = message.task;
println!("Processing task: {}", task.metadata.name);
// Process the task...
// Acknowledge completion
broker.ack(&task.metadata.id, message.receipt_handle.as_deref()).await?;
}
use std::time::SystemTime;
// Schedule for specific timestamp (Unix seconds)
let execute_at = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)?
.as_secs() as i64 + 3600; // 1 hour from now
broker.enqueue_at(task, execute_at).await?;
// Or schedule after a delay (seconds)
broker.enqueue_after(task, 300).await?; // 5 minutes
// Batch enqueue (high throughput)
let tasks = vec![
SerializedTask::new("task1".to_string(), vec![1]),
SerializedTask::new("task2".to_string(), vec![2]),
SerializedTask::new("task3".to_string(), vec![3]),
];
let task_ids = broker.enqueue_batch(tasks).await?;
// Batch dequeue
let messages = broker.dequeue_batch(10).await?;
// Batch ack
let tasks_to_ack: Vec<_> = messages.iter()
.map(|m| (m.task.metadata.id, m.receipt_handle.clone()))
.collect();
broker.ack_batch(&tasks_to_ack).await?;
// Pause queue (dequeue returns None)
broker.pause();
assert!(broker.is_paused());
// Resume queue
broker.resume();
assert!(!broker.is_paused());
use celers_broker_sql::DbTaskState;
// Get task details
if let Some(task_info) = broker.get_task(&task_id).await? {
println!("Task: {} - State: {:?}", task_info.task_name, task_info.state);
}
// List tasks by state
let pending_tasks = broker.list_tasks(Some(DbTaskState::Pending), 100, 0).await?;
// Get queue statistics
let stats = broker.get_statistics().await?;
println!("Pending: {}, Processing: {}, Completed: {}",
stats.pending, stats.processing, stats.completed);
// Count by task name
let counts = broker.count_by_task_name().await?;
for count in counts {
println!("{}: {} pending, {} completed",
count.task_name, count.pending, count.completed);
}
// List scheduled tasks
let scheduled = broker.list_scheduled_tasks(100, 0).await?;
for task in scheduled {
println!("Task {} scheduled in {} seconds",
task.task_name, task.delay_remaining_secs);
}
// Dequeue with worker ID
let worker_id = "worker-001";
if let Some(message) = broker.dequeue_with_worker_id(worker_id).await? {
// Process task...
broker.ack(&message.task.metadata.id, message.receipt_handle.as_deref()).await?;
}
// Get tasks by worker
let worker_tasks = broker.get_tasks_by_worker(worker_id).await?;
use celers_broker_sql::TaskResultStatus;
use serde_json::json;
// Store result
broker.store_result(
&task_id,
"my_task",
TaskResultStatus::Success,
Some(json!({"output": "done"})),
None, // error
None, // traceback
Some(1500), // runtime in ms
).await?;
// Retrieve result
if let Some(result) = broker.get_result(&task_id).await? {
println!("Result: {:?}", result.result);
println!("Runtime: {}ms", result.runtime_ms.unwrap_or(0));
}
// List DLQ tasks
let dlq_tasks = broker.list_dlq(100, 0).await?;
// Requeue from DLQ
if let Some(dlq_task) = dlq_tasks.first() {
let new_task_id = broker.requeue_from_dlq(&dlq_task.id).await?;
println!("Requeued as: {}", new_task_id);
}
// Purge DLQ
let purged = broker.purge_all_dlq().await?;
println!("Purged {} tasks from DLQ", purged);
use std::time::Duration;
// Health check
let health = broker.check_health().await?;
println!("MySQL version: {}", health.database_version);
println!("Pending tasks: {}", health.pending_tasks);
// Archive old completed tasks (older than 7 days)
let archived = broker.archive_completed_tasks(Duration::from_secs(7 * 24 * 3600)).await?;
println!("Archived {} old tasks", archived);
// Recover stuck tasks (processing > 1 hour)
let recovered = broker.recover_stuck_tasks(Duration::from_secs(3600)).await?;
println!("Recovered {} stuck tasks", recovered);
// Archive old results (older than 30 days)
let archived_results = broker.archive_results(Duration::from_secs(30 * 24 * 3600)).await?;
println!("Archived {} old results", archived_results);
// Get table sizes
let table_sizes = broker.get_table_sizes().await?;
for table in table_sizes {
println!("{}: {} rows, {} bytes data, {} bytes indexes",
table.table_name, table.row_count, table.data_size_bytes, table.index_size_bytes);
}
// Optimize tables (run periodically)
broker.optimize_tables().await?;
// Analyze tables (update index statistics)
broker.analyze_tables().await?;
metrics feature)// Update metrics (call periodically, e.g., every 10 seconds)
#[cfg(feature = "metrics")]
broker.update_metrics().await?;
// Metrics exposed:
// - celers_tasks_enqueued_total
// - celers_tasks_enqueued_by_type
// - celers_queue_size (pending tasks)
// - celers_processing_queue_size
// - celers_dlq_size
// Create broker with specific queue name
let queue_a = MysqlBroker::with_queue("mysql://...", "queue_a").await?;
let queue_b = MysqlBroker::with_queue("mysql://...", "queue_b").await?;
// Each queue is logically separated (stored in metadata)
queue_a.enqueue(task_a).await?;
queue_b.enqueue(task_b).await?;
celers_tasks - Main task queuecelers_dead_letter_queue - Failed tasks that exceeded max retriescelers_task_results - Task execution resultscelers_task_history - Task audit trail (future)idx_tasks_state_priority - Efficient dequeue by state and priorityidx_tasks_scheduled - Scheduled task processingidx_tasks_worker - Worker trackingidx_tasks_task_name - Task name lookupsidx_results_task_name - Result queries by task type// Default: 20 connections, 5s timeout
// For high throughput, increase max_connections:
// Edit MysqlBroker::new() or MysqlBroker::with_queue()
Use batch operations for high throughput:
enqueue_batch() - Up to 10x faster than individual enqueuesdequeue_batch() - Fetch multiple tasks in one transactionack_batch() - Acknowledge multiple tasks at onceRecommended my.cnf settings:
[mysqld]
# Connection settings
max_connections = 500
connect_timeout = 10
wait_timeout = 28800
# Performance
innodb_buffer_pool_size = 2G # 70-80% of RAM
innodb_log_file_size = 512M
innodb_flush_log_at_trx_commit = 2
innodb_flush_method = O_DIRECT
# Query cache (MySQL 5.7)
query_cache_type = 0
query_cache_size = 0
Run these operations periodically:
// Daily: Archive old tasks
broker.archive_completed_tasks(Duration::from_secs(7 * 24 * 3600)).await?;
// Daily: Recover stuck tasks
broker.recover_stuck_tasks(Duration::from_secs(3600)).await?;
// Weekly: Optimize tables
broker.optimize_tables().await?;
// Weekly: Analyze tables
broker.analyze_tables().await?;
// Monthly: Archive old results
broker.archive_results(Duration::from_secs(30 * 24 * 3600)).await?;
FOR UPDATE SKIP LOCKED patternBroker trait)? placeholders vs PostgreSQL $1, $2CHAR(36) vs native UUID typeDATE_ADD() vs PostgreSQL INTERVAL syntaxON DUPLICATE KEY UPDATE vs PostgreSQL ON CONFLICTAll operations return Result<T, CelersError>:
match broker.enqueue(task).await {
Ok(task_id) => println!("Enqueued: {}", task_id),
Err(e) => eprintln!("Failed to enqueue: {}", e),
}
Migrations are embedded in the binary and run via broker.migrate():
001_init.sql - Initial schema (tasks, DLQ, history tables)002_results.sql - Results table003_performance_indexes.sql - Additional performance indexesTo run migrations:
broker.migrate().await?;
Migrations are idempotent and can be run multiple times safely.
Use mysqldump for backing up CeleRS tables:
# Backup all CeleRS tables
mysqldump -u user -p database_name \
celers_tasks \
celers_dead_letter_queue \
celers_task_results \
celers_task_history \
celers_migrations \
> celers_backup_$(date +%Y%m%d_%H%M%S).sql
# Backup with compression
mysqldump -u user -p database_name \
celers_tasks \
celers_dead_letter_queue \
celers_task_results \
celers_task_history \
celers_migrations \
| gzip > celers_backup_$(date +%Y%m%d_%H%M%S).sql.gz
# Include routines (stored procedures)
mysqldump -u user -p database_name \
--routines \
--triggers \
celers_tasks \
celers_dead_letter_queue \
celers_task_results \
celers_task_history \
celers_migrations \
> celers_full_backup_$(date +%Y%m%d_%H%M%S).sql
# Backup only pending and processing tasks (for migration)
mysqldump -u user -p database_name celers_tasks \
--where="state IN ('pending', 'processing')" \
> celers_active_tasks_$(date +%Y%m%d).sql
# Backup DLQ for analysis
mysqldump -u user -p database_name celers_dead_letter_queue \
> celers_dlq_$(date +%Y%m%d).sql
# Backup results for auditing
mysqldump -u user -p database_name celers_task_results \
> celers_results_$(date +%Y%m%d).sql
# Restore from backup
mysql -u user -p database_name < celers_backup_20260118_120000.sql
# Restore from compressed backup
gunzip < celers_backup_20260118_120000.sql.gz | mysql -u user -p database_name
# Restore specific table
mysql -u user -p database_name < celers_tasks_backup.sql
Enable binary logging in MySQL for PITR:
[mysqld]
log_bin = /var/log/mysql/mysql-bin.log
binlog_format = ROW
expire_logs_days = 7
Recovery procedure:
# 1. Restore from last full backup
mysql -u user -p database_name < last_full_backup.sql
# 2. Apply binary logs up to specific point
mysqlbinlog --start-datetime="2026-01-18 10:00:00" \
--stop-datetime="2026-01-18 11:30:00" \
/var/log/mysql/mysql-bin.000001 | \
mysql -u user -p database_name
#!/bin/bash
# celers_backup.sh - Automated CeleRS backup script
BACKUP_DIR="/backups/celers"
DB_NAME="your_database"
DB_USER="backup_user"
RETENTION_DAYS=30
TIMESTAMP=$(date +%Y%m%d_%H%M%S)
# Create backup directory
mkdir -p "$BACKUP_DIR"
# Perform backup
mysqldump -u "$DB_USER" -p"$DB_PASS" "$DB_NAME" \
--routines \
--triggers \
celers_tasks \
celers_dead_letter_queue \
celers_task_results \
celers_task_history \
celers_migrations \
| gzip > "$BACKUP_DIR/celers_$TIMESTAMP.sql.gz"
# Remove old backups
find "$BACKUP_DIR" -name "celers_*.sql.gz" -mtime +$RETENTION_DAYS -delete
# Verify backup
if [ -f "$BACKUP_DIR/celers_$TIMESTAMP.sql.gz" ]; then
echo "Backup completed: celers_$TIMESTAMP.sql.gz"
# Optional: Upload to S3 or other storage
# aws s3 cp "$BACKUP_DIR/celers_$TIMESTAMP.sql.gz" s3://my-bucket/celers-backups/
else
echo "Backup failed!" >&2
exit 1
fi
Before Disaster:
During Recovery:
After Recovery:
# Export from production
mysqldump -u user -p prod_db \
--where="state IN ('pending', 'processing')" \
celers_tasks > prod_tasks.sql
# Import to staging
mysql -u user -p staging_db < prod_tasks.sql
# Or use programmatic approach
// Programmatic migration example
async fn migrate_pending_tasks(
source_broker: &MysqlBroker,
target_broker: &MysqlBroker,
) -> Result<u64> {
let pending_tasks = source_broker
.list_tasks(Some(DbTaskState::Pending), 10000, 0)
.await?;
let mut migrated = 0u64;
for task_info in pending_tasks {
// Fetch task payload and metadata
// Enqueue to target broker
// Mark as migrated in source
migrated += 1;
}
Ok(migrated)
}
See the examples directory for complete working examples:
Each example includes detailed documentation and can be run with:
cargo run --example <example_name>
For detailed usage instructions, see examples/README.md.
MIT OR Apache-2.0
Contributions welcome! Please ensure:
cargo testcargo clippycargo fmt