| Crates.io | celers |
| lib.rs | celers |
| version | 0.1.0 |
| created_at | 2025-12-06 04:31:03.780451+00 |
| updated_at | 2026-01-18 18:45:57.197794+00 |
| description | Celery-compatible distributed task queue for Rust (Facade crate) |
| homepage | |
| repository | https://github.com/cool-japan/celers |
| max_upload_size | |
| id | 1969612 |
| size | 495,473 |
Production-ready, Celery-compatible distributed task queue library for Rust. Binary-level protocol compatibility with Python Celery while delivering superior performance, type safety, and reliability.
CeleRS provides:
[dependencies]
celers = { version = "0.1", features = ["redis"] }
tokio = { version = "1", features = ["full"] }
serde = { version = "1", features = ["derive"] }
use celers::prelude::*;
// Define task with automatic serialization
#[derive(Serialize, Deserialize, Debug)]
struct AddArgs {
x: i32,
y: i32,
}
// Define task function
#[celers::task]
async fn add(args: AddArgs) -> Result<i32, Box<dyn std::error::Error>> {
Ok(args.x + args.y)
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Create broker
let broker = RedisBroker::new("redis://localhost:6379", "celery")?;
// Create task registry
let mut registry = celers_core::TaskRegistry::new();
registry.register("tasks.add", |args: AddArgs| async move {
Ok(args.x + args.y)
});
// Configure worker
let config = WorkerConfig {
concurrency: 4,
graceful_shutdown: true,
..Default::default()
};
// Start worker
let worker = Worker::new(broker, registry, config);
worker.run().await?;
Ok(())
}
use celers::prelude::*;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let broker = RedisBroker::new("redis://localhost:6379", "celery")?;
// Create task
let args = serde_json::json!({"x": 10, "y": 20});
let task = SerializedTask::new("tasks.add", serde_json::to_vec(&args)?);
// Enqueue
let task_id = broker.enqueue(task).await?;
println!("Enqueued task: {}", task_id);
Ok(())
}
[dependencies]
celers = { version = "0.1", features = [
"redis", # Redis broker support
"postgres", # PostgreSQL broker support
"backend-redis", # Redis result backend
"metrics", # Prometheus metrics
"workflows", # Canvas workflow primitives
"beat", # Periodic task scheduler
] }
| Feature | Description | Enables |
|---|---|---|
redis |
Redis broker | celers-broker-redis |
postgres |
PostgreSQL broker | celers-broker-postgres |
amqp |
RabbitMQ broker | celers-broker-amqp |
sqs |
AWS SQS broker | celers-broker-sqs |
backend-redis |
Redis result backend | celers-backend-redis |
backend-db |
Database result backend | celers-backend-db |
metrics |
Prometheus metrics | celers-metrics |
workflows |
Canvas workflows | celers-canvas |
beat |
Periodic tasks | celers-beat |
┌─────────────────────────────────────────────────────┐
│ Application Layer │
│ (Your Tasks & Workflows) │
└─────────────────────────────────────────────────────┘
│
┌─────────────────────────────────────────────────────┐
│ Runtime Layer │
│ Worker │ Canvas │ Beat │ Metrics │
└─────────────────────────────────────────────────────┘
│
┌─────────────────────────────────────────────────────┐
│ Messaging Layer (Kombu) │
│ Producer │ Consumer │ Transport │
└─────────────────────────────────────────────────────┘
│
┌─────────────────────────────────────────────────────┐
│ Broker Implementations │
│ Redis │ PostgreSQL │ RabbitMQ │ SQS │
└─────────────────────────────────────────────────────┘
│
┌─────────────────────────────────────────────────────┐
│ Protocol Layer │
│ (Celery v2/v5 Format) │
└─────────────────────────────────────────────────────┘
use celers::prelude::*;
let workflow = Chain::new()
.then("download_data", vec![json!("https://example.com")])
.then("process_data", vec![])
.then("save_result", vec![]);
let task_id = workflow.apply(&broker).await?;
let workflow = Group::new()
.add("process_chunk_1", vec![json!(data1)])
.add("process_chunk_2", vec![json!(data2)])
.add("process_chunk_3", vec![json!(data3)]);
let group_id = workflow.apply(&broker).await?;
let header = Group::new()
.add("compute_partial", vec![json!(1)])
.add("compute_partial", vec![json!(2)])
.add("compute_partial", vec![json!(3)]);
let callback = Signature::new("aggregate_results".to_string());
let chord = Chord::new(header, callback);
let chord_id = chord.apply(&broker, &mut backend).await?;
use celers::RedisBroker;
let broker = RedisBroker::new("redis://localhost:6379", "celery")?;
// With priority queue
let broker = RedisBroker::with_mode(
"redis://localhost:6379",
"celery",
celers::QueueMode::Priority
)?;
Pros:
Cons:
use celers::PostgresBroker;
let broker = PostgresBroker::new("postgresql://localhost/celery", "celery").await?;
Pros:
Cons:
| Implementation | Throughput | Latency | Memory |
|---|---|---|---|
| Python Celery | 1K tasks/sec | 10ms | 50MB |
| CeleRS | 50K tasks/sec | 0.2ms | 10MB |
| CeleRS (batch) | 100K tasks/sec | 0.1ms | 10MB |
Enable Batch Operations
let config = WorkerConfig {
enable_batch_dequeue: true,
batch_size: 50,
..Default::default()
};
Tune Concurrency
// CPU-bound: concurrency = cores
// I/O-bound: concurrency = cores * 4
let config = WorkerConfig {
concurrency: num_cpus::get() * 4,
..Default::default()
};
Use Redis for High Throughput
[dependencies]
celers = { version = "0.1", features = ["redis"] }
use celers::gather_metrics;
use tokio::net::TcpListener;
// Expose metrics endpoint
let listener = TcpListener::bind("0.0.0.0:9090").await?;
loop {
let (mut socket, _) = listener.accept().await?;
tokio::spawn(async move {
let metrics = gather_metrics();
let response = format!("HTTP/1.1 200 OK\r\n\r\n{}", metrics);
socket.write_all(response.as_bytes()).await.unwrap();
});
}
Available metrics:
celers_tasks_enqueued_totalcelers_tasks_completed_totalcelers_tasks_failed_totalcelers_task_execution_secondscelers_queue_sizeuse tracing_subscriber;
// Initialize logging
tracing_subscriber::fmt::init();
// Worker automatically logs:
// - Task start/completion
// - Errors and retries
// - Queue operations
// Rust: Enqueue task for Python worker
let task = SerializedTask::new("python_tasks.process", args);
broker.enqueue(task).await?;
# Python: Execute task
from celery import Celery
app = Celery('myapp', broker='redis://localhost:6379')
@app.task(name='python_tasks.process')
def process(data):
return {"result": data}
# Python: Enqueue task for Rust worker
from celery import Celery
app = Celery('myapp', broker='redis://localhost:6379')
app.send_task('rust_tasks.process', args=[{"data": "value"}])
// Rust: Execute task
registry.register("rust_tasks.process", |args: serde_json::Value| async move {
// Process args
Ok(serde_json::json!({"result": "processed"}))
});
let config = WorkerConfig {
max_retries: 3,
retry_base_delay_ms: 1000,
retry_max_delay_ms: 60000,
..Default::default()
};
Retry strategy:
// Tasks exceeding max_retries automatically moved to DLQ
// Inspect DLQ
let dlq_size = broker.dlq_size().await?;
// Replay failed tasks
broker.replay_dlq(vec![task_id1, task_id2]).await?;
// Clear DLQ
broker.clear_dlq().await?;
use celers::worker::wait_for_signal;
let worker = Worker::new(broker, registry, config);
let handle = worker.run_with_shutdown().await?;
// Wait for SIGTERM/SIGINT
wait_for_signal().await;
// Gracefully shutdown
handle.shutdown().await?;
let config = WorkerConfig {
// Concurrency
concurrency: 16,
// Batch operations
enable_batch_dequeue: true,
batch_size: 20,
// Memory limits
max_result_size_bytes: 10_000_000,
track_memory_usage: true,
// Retries
max_retries: 3,
retry_base_delay_ms: 1000,
// Timeouts
default_timeout_secs: 300,
// Shutdown
graceful_shutdown: true,
..Default::default()
};
FROM rust:1.70 AS builder
WORKDIR /app
COPY . .
RUN cargo build --release --features redis,metrics
FROM debian:bullseye-slim
RUN apt-get update && apt-get install -y ca-certificates
COPY --from=builder /app/target/release/worker /usr/local/bin/
CMD ["worker"]
apiVersion: apps/v1
kind: Deployment
metadata:
name: celers-worker
spec:
replicas: 4
template:
spec:
containers:
- name: worker
image: myapp/celers-worker:latest
env:
- name: REDIS_URL
value: redis://redis:6379
resources:
limits:
memory: "512Mi"
cpu: "1000m"
See examples/ directory:
phase1_complete.rs - Complete worker setupgraceful_shutdown.rs - Graceful shutdownpriority_queue.rs - Priority queuesdead_letter_queue.rs - DLQ managementtask_cancellation.rs - Task cancellationprometheus_metrics.rs - Metrics exportcanvas_workflows.rs - Workflow primitives| Module | Description |
|---|---|
prelude |
Common imports (use celers::prelude::*) |
error |
Error types |
protocol |
Protocol types (advanced) |
canvas |
Workflow primitives |
worker |
Worker runtime |
| Feature | Python Celery | CeleRS |
|---|---|---|
| Language | Python | Rust |
| Performance | 1K tasks/sec | 50K+ tasks/sec |
| Memory | 50MB+ | 10MB |
| Type Safety | Runtime | Compile-time |
| Concurrency | Threading/multiprocessing | Async/await (Tokio) |
| Protocol | Celery v2/v5 | ✅ Compatible |
| Workflows | Chain/Group/Chord | ✅ Compatible |
| Brokers | Redis/RabbitMQ/SQS | ✅ Compatible |
See CONTRIBUTING.md for contribution guidelines.
MIT OR Apache-2.0