celers

Crates.iocelers
lib.rscelers
version0.1.0
created_at2025-12-06 04:31:03.780451+00
updated_at2026-01-18 18:45:57.197794+00
descriptionCelery-compatible distributed task queue for Rust (Facade crate)
homepage
repositoryhttps://github.com/cool-japan/celers
max_upload_size
id1969612
size495,473
KitaSan (cool-japan)

documentation

README

celers

Production-ready, Celery-compatible distributed task queue library for Rust. Binary-level protocol compatibility with Python Celery while delivering superior performance, type safety, and reliability.

Overview

CeleRS provides:

  • Celery Compatibility: Binary protocol compatible with Python Celery 4.x/5.x
  • Type Safety: Compile-time guarantees for task signatures
  • High Performance: 10-100x throughput vs Python Celery
  • Multiple Brokers: Redis, PostgreSQL, RabbitMQ, AWS SQS
  • Workflow Primitives: Chain, Group, Chord, Map, Starmap
  • Observability: Prometheus metrics, structured logging
  • Production Ready: Graceful shutdown, retries, dead letter queues
  • Memory Safe: No garbage collection, predictable performance

Quick Start

Installation

[dependencies]
celers = { version = "0.1", features = ["redis"] }
tokio = { version = "1", features = ["full"] }
serde = { version = "1", features = ["derive"] }

Basic Example

use celers::prelude::*;

// Define task with automatic serialization
#[derive(Serialize, Deserialize, Debug)]
struct AddArgs {
    x: i32,
    y: i32,
}

// Define task function
#[celers::task]
async fn add(args: AddArgs) -> Result<i32, Box<dyn std::error::Error>> {
    Ok(args.x + args.y)
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    // Create broker
    let broker = RedisBroker::new("redis://localhost:6379", "celery")?;

    // Create task registry
    let mut registry = celers_core::TaskRegistry::new();
    registry.register("tasks.add", |args: AddArgs| async move {
        Ok(args.x + args.y)
    });

    // Configure worker
    let config = WorkerConfig {
        concurrency: 4,
        graceful_shutdown: true,
        ..Default::default()
    };

    // Start worker
    let worker = Worker::new(broker, registry, config);
    worker.run().await?;

    Ok(())
}

Enqueue Tasks

use celers::prelude::*;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let broker = RedisBroker::new("redis://localhost:6379", "celery")?;

    // Create task
    let args = serde_json::json!({"x": 10, "y": 20});
    let task = SerializedTask::new("tasks.add", serde_json::to_vec(&args)?);

    // Enqueue
    let task_id = broker.enqueue(task).await?;
    println!("Enqueued task: {}", task_id);

    Ok(())
}

Features

Core Features (Always Available)

  • Task registry and execution
  • Message serialization (JSON)
  • Basic broker interface
  • Type-safe task definitions

Optional Features

[dependencies]
celers = { version = "0.1", features = [
    "redis",           # Redis broker support
    "postgres",        # PostgreSQL broker support
    "backend-redis",   # Redis result backend
    "metrics",         # Prometheus metrics
    "workflows",       # Canvas workflow primitives
    "beat",            # Periodic task scheduler
] }
Feature Description Enables
redis Redis broker celers-broker-redis
postgres PostgreSQL broker celers-broker-postgres
amqp RabbitMQ broker celers-broker-amqp
sqs AWS SQS broker celers-broker-sqs
backend-redis Redis result backend celers-backend-redis
backend-db Database result backend celers-backend-db
metrics Prometheus metrics celers-metrics
workflows Canvas workflows celers-canvas
beat Periodic tasks celers-beat

Architecture

┌─────────────────────────────────────────────────────┐
│              Application Layer                       │
│         (Your Tasks & Workflows)                     │
└─────────────────────────────────────────────────────┘
                        │
┌─────────────────────────────────────────────────────┐
│              Runtime Layer                           │
│    Worker │ Canvas │ Beat │ Metrics                 │
└─────────────────────────────────────────────────────┘
                        │
┌─────────────────────────────────────────────────────┐
│           Messaging Layer (Kombu)                    │
│   Producer │ Consumer │ Transport                    │
└─────────────────────────────────────────────────────┘
                        │
┌─────────────────────────────────────────────────────┐
│        Broker Implementations                        │
│   Redis │ PostgreSQL │ RabbitMQ │ SQS               │
└─────────────────────────────────────────────────────┘
                        │
┌─────────────────────────────────────────────────────┐
│              Protocol Layer                          │
│          (Celery v2/v5 Format)                       │
└─────────────────────────────────────────────────────┘

Workflow Primitives

Chain - Sequential Execution

use celers::prelude::*;

let workflow = Chain::new()
    .then("download_data", vec![json!("https://example.com")])
    .then("process_data", vec![])
    .then("save_result", vec![]);

let task_id = workflow.apply(&broker).await?;

Group - Parallel Execution

let workflow = Group::new()
    .add("process_chunk_1", vec![json!(data1)])
    .add("process_chunk_2", vec![json!(data2)])
    .add("process_chunk_3", vec![json!(data3)]);

let group_id = workflow.apply(&broker).await?;

Chord - Map-Reduce

let header = Group::new()
    .add("compute_partial", vec![json!(1)])
    .add("compute_partial", vec![json!(2)])
    .add("compute_partial", vec![json!(3)]);

let callback = Signature::new("aggregate_results".to_string());
let chord = Chord::new(header, callback);

let chord_id = chord.apply(&broker, &mut backend).await?;

Broker Support

Redis (Recommended)

use celers::RedisBroker;

let broker = RedisBroker::new("redis://localhost:6379", "celery")?;

// With priority queue
let broker = RedisBroker::with_mode(
    "redis://localhost:6379",
    "celery",
    celers::QueueMode::Priority
)?;

Pros:

  • Fast (50K+ tasks/sec)
  • Simple setup
  • Priority queues
  • Batch operations

Cons:

  • In-memory (data loss on crash)
  • Limited durability

PostgreSQL

use celers::PostgresBroker;

let broker = PostgresBroker::new("postgresql://localhost/celery", "celery").await?;

Pros:

  • Durable (ACID guarantees)
  • Transactional
  • Already using PostgreSQL

Cons:

  • Slower than Redis
  • Higher latency

Performance

Throughput Comparison

Implementation Throughput Latency Memory
Python Celery 1K tasks/sec 10ms 50MB
CeleRS 50K tasks/sec 0.2ms 10MB
CeleRS (batch) 100K tasks/sec 0.1ms 10MB

Performance Tips

  1. Enable Batch Operations

    let config = WorkerConfig {
        enable_batch_dequeue: true,
        batch_size: 50,
        ..Default::default()
    };
    
  2. Tune Concurrency

    // CPU-bound: concurrency = cores
    // I/O-bound: concurrency = cores * 4
    let config = WorkerConfig {
        concurrency: num_cpus::get() * 4,
        ..Default::default()
    };
    
  3. Use Redis for High Throughput

    [dependencies]
    celers = { version = "0.1", features = ["redis"] }
    

Monitoring

Prometheus Metrics

use celers::gather_metrics;
use tokio::net::TcpListener;

// Expose metrics endpoint
let listener = TcpListener::bind("0.0.0.0:9090").await?;
loop {
    let (mut socket, _) = listener.accept().await?;
    tokio::spawn(async move {
        let metrics = gather_metrics();
        let response = format!("HTTP/1.1 200 OK\r\n\r\n{}", metrics);
        socket.write_all(response.as_bytes()).await.unwrap();
    });
}

Available metrics:

  • celers_tasks_enqueued_total
  • celers_tasks_completed_total
  • celers_tasks_failed_total
  • celers_task_execution_seconds
  • celers_queue_size
  • 20+ more metrics

Structured Logging

use tracing_subscriber;

// Initialize logging
tracing_subscriber::fmt::init();

// Worker automatically logs:
// - Task start/completion
// - Errors and retries
// - Queue operations

Python Celery Interoperability

Send from Rust, Execute in Python

// Rust: Enqueue task for Python worker
let task = SerializedTask::new("python_tasks.process", args);
broker.enqueue(task).await?;
# Python: Execute task
from celery import Celery

app = Celery('myapp', broker='redis://localhost:6379')

@app.task(name='python_tasks.process')
def process(data):
    return {"result": data}

Send from Python, Execute in Rust

# Python: Enqueue task for Rust worker
from celery import Celery

app = Celery('myapp', broker='redis://localhost:6379')
app.send_task('rust_tasks.process', args=[{"data": "value"}])
// Rust: Execute task
registry.register("rust_tasks.process", |args: serde_json::Value| async move {
    // Process args
    Ok(serde_json::json!({"result": "processed"}))
});

Error Handling

Automatic Retries

let config = WorkerConfig {
    max_retries: 3,
    retry_base_delay_ms: 1000,
    retry_max_delay_ms: 60000,
    ..Default::default()
};

Retry strategy:

  • Retry 0: 1 second
  • Retry 1: 2 seconds
  • Retry 2: 4 seconds
  • Retry 3: 8 seconds
  • After max retries → Dead Letter Queue

Dead Letter Queue

// Tasks exceeding max_retries automatically moved to DLQ
// Inspect DLQ
let dlq_size = broker.dlq_size().await?;

// Replay failed tasks
broker.replay_dlq(vec![task_id1, task_id2]).await?;

// Clear DLQ
broker.clear_dlq().await?;

Graceful Shutdown

use celers::worker::wait_for_signal;

let worker = Worker::new(broker, registry, config);
let handle = worker.run_with_shutdown().await?;

// Wait for SIGTERM/SIGINT
wait_for_signal().await;

// Gracefully shutdown
handle.shutdown().await?;

Production Deployment

Recommended Configuration

let config = WorkerConfig {
    // Concurrency
    concurrency: 16,

    // Batch operations
    enable_batch_dequeue: true,
    batch_size: 20,

    // Memory limits
    max_result_size_bytes: 10_000_000,
    track_memory_usage: true,

    // Retries
    max_retries: 3,
    retry_base_delay_ms: 1000,

    // Timeouts
    default_timeout_secs: 300,

    // Shutdown
    graceful_shutdown: true,

    ..Default::default()
};

Docker Deployment

FROM rust:1.70 AS builder
WORKDIR /app
COPY . .
RUN cargo build --release --features redis,metrics

FROM debian:bullseye-slim
RUN apt-get update && apt-get install -y ca-certificates
COPY --from=builder /app/target/release/worker /usr/local/bin/
CMD ["worker"]

Kubernetes

apiVersion: apps/v1
kind: Deployment
metadata:
  name: celers-worker
spec:
  replicas: 4
  template:
    spec:
      containers:
      - name: worker
        image: myapp/celers-worker:latest
        env:
        - name: REDIS_URL
          value: redis://redis:6379
        resources:
          limits:
            memory: "512Mi"
            cpu: "1000m"

Examples

See examples/ directory:

  • phase1_complete.rs - Complete worker setup
  • graceful_shutdown.rs - Graceful shutdown
  • priority_queue.rs - Priority queues
  • dead_letter_queue.rs - DLQ management
  • task_cancellation.rs - Task cancellation
  • prometheus_metrics.rs - Metrics export
  • canvas_workflows.rs - Workflow primitives

Modules

Module Description
prelude Common imports (use celers::prelude::*)
error Error types
protocol Protocol types (advanced)
canvas Workflow primitives
worker Worker runtime

Comparison with Celery

Feature Python Celery CeleRS
Language Python Rust
Performance 1K tasks/sec 50K+ tasks/sec
Memory 50MB+ 10MB
Type Safety Runtime Compile-time
Concurrency Threading/multiprocessing Async/await (Tokio)
Protocol Celery v2/v5 ✅ Compatible
Workflows Chain/Group/Chord ✅ Compatible
Brokers Redis/RabbitMQ/SQS ✅ Compatible

Roadmap

  • Core task execution
  • Redis broker
  • PostgreSQL broker
  • Canvas workflows (Chain, Group, Chord)
  • Prometheus metrics
  • Batch operations
  • Memory optimization
  • RabbitMQ broker
  • AWS SQS broker
  • OpenTelemetry tracing
  • Web UI dashboard
  • Distributed tracing

Contributing

See CONTRIBUTING.md for contribution guidelines.

Community

License

MIT OR Apache-2.0

See Also

Commit count: 1

cargo fmt