| Crates.io | celers-backend-redis |
| lib.rs | celers-backend-redis |
| version | 0.1.0 |
| created_at | 2026-01-18 15:54:19.446414+00 |
| updated_at | 2026-01-18 15:54:19.446414+00 |
| description | Redis result backend for CeleRS |
| homepage | |
| repository | https://github.com/cool-japan/celers |
| max_upload_size | |
| id | 2052540 |
| size | 529,157 |
Redis-based result backend for CeleRS task result storage and workflow state management. Provides atomic operations for Chord barrier synchronization.
Production-ready result backend with:
use celers_backend_redis::{RedisResultBackend, ResultBackend, TaskMeta, TaskResult};
use uuid::Uuid;
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create backend
let mut backend = RedisResultBackend::new("redis://localhost:6379")?;
// Store task result
let task_id = Uuid::new_v4();
let mut meta = TaskMeta::new(task_id, "my_task".to_string());
meta.result = TaskResult::Success(serde_json::json!({"value": 42}));
backend.store_result(task_id, &meta).await?;
// Retrieve result
if let Some(stored_meta) = backend.get_result(task_id).await? {
println!("Task result: {:?}", stored_meta.result);
}
// Set expiration (results auto-delete after 1 hour)
backend.set_expiration(task_id, Duration::from_secs(3600)).await?;
Ok(())
}
use celers_backend_redis::TaskResult;
// Task is pending execution
let pending = TaskResult::Pending;
// Task is currently running
let started = TaskResult::Started;
// Task completed successfully
let success = TaskResult::Success(serde_json::json!({
"result": "data",
"count": 100
}));
// Task failed with error
let failure = TaskResult::Failure("Division by zero".to_string());
// Task was cancelled/revoked
let revoked = TaskResult::Revoked;
// Task retry scheduled (retry count = 3)
let retry = TaskResult::Retry(3);
Pending ──> Started ──> Success ✓
└──> Failure ✗
└──> Retry ↻ ──> Started (again)
└──> Revoked ✗
✓ = Final state (success)
✗ = Final state (error)
↻ = Retry loop
use celers_backend_redis::TaskMeta;
use chrono::Utc;
let task_id = Uuid::new_v4();
let mut meta = TaskMeta::new(task_id, "process_data".to_string());
// Update metadata throughout lifecycle
meta.started_at = Some(Utc::now());
meta.worker = Some("worker-1".to_string());
meta.result = TaskResult::Started;
// On completion
meta.completed_at = Some(Utc::now());
meta.result = TaskResult::Success(serde_json::json!({"processed": 1000}));
| Field | Type | Description |
|---|---|---|
task_id |
Uuid | Unique task identifier |
task_name |
String | Task name (e.g., "process_image") |
result |
TaskResult | Current task state/result |
created_at |
DateTime |
When task was created |
started_at |
Option<DateTime |
When task started executing |
completed_at |
Option<DateTime |
When task completed |
worker |
Option |
Worker that executed the task |
use celers_backend_redis::{RedisResultBackend, TaskMeta, TaskResult};
let mut backend = RedisResultBackend::new("redis://localhost:6379")?;
let task_id = Uuid::new_v4();
let mut meta = TaskMeta::new(task_id, "my_task".to_string());
meta.result = TaskResult::Success(serde_json::json!("result data"));
backend.store_result(task_id, &meta).await?;
Redis key: celery-task-meta-{task_id}
match backend.get_result(task_id).await? {
Some(meta) => {
match meta.result {
TaskResult::Success(value) => println!("Success: {:?}", value),
TaskResult::Failure(err) => eprintln!("Failed: {}", err),
TaskResult::Pending => println!("Still pending..."),
_ => println!("Other state: {:?}", meta.result),
}
}
None => println!("Result not found"),
}
backend.delete_result(task_id).await?;
use std::time::Duration;
// Result expires after 1 hour
backend.set_expiration(task_id, Duration::from_secs(3600)).await?;
// Result expires after 24 hours
backend.set_expiration(task_id, Duration::from_secs(86400)).await?;
Redis command: EXPIRE celery-task-meta-{task_id} {seconds}
The backend provides atomic operations for Chord (map-reduce) patterns.
Header Tasks (parallel):
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Task 1 │ │ Task 2 │ │ Task 3 │
└────┬────┘ └────┬────┘ └────┬────┘
│ │ │
│ ┌───────▼────────┐ │
│ │ Redis Counter │ │ (Atomic INCR)
│ │ 0 → 1 → 2 │ │
│ └───────┬────────┘ │
│ │ │
└────────────┼────────────┘
│
▼ (When count == 3)
┌──────────────┐
│ Callback Task│ (Aggregate results)
└──────────────┘
use celers_backend_redis::{ChordState, RedisResultBackend};
use uuid::Uuid;
let mut backend = RedisResultBackend::new("redis://localhost:6379")?;
let chord_id = Uuid::new_v4();
let state = ChordState {
chord_id,
total: 3, // 3 tasks in header
completed: 0, // None completed yet
callback: Some("aggregate".to_string()), // Callback task
task_ids: vec![], // Optional task ID tracking
};
backend.chord_init(state).await?;
Redis keys created:
celery-chord-{chord_id}: Chord state (JSON)celery-chord-counter-{chord_id}: Atomic counter (integer, initialized to 0)// Called by worker when task completes
let count = backend.chord_complete_task(chord_id).await?;
println!("Tasks completed: {}", count);
// When count == state.total, enqueue callback
if count >= state.total {
// Trigger callback task
println!("Chord complete! Enqueuing callback...");
}
Redis command: INCR celery-chord-counter-{chord_id} (atomic)
Thread-safety: Multiple workers can complete tasks simultaneously without race conditions.
if let Some(state) = backend.chord_get_state(chord_id).await? {
println!("Total tasks: {}", state.total);
println!("Callback: {:?}", state.callback);
}
pub struct ChordState {
/// Chord ID (group ID)
pub chord_id: Uuid,
/// Total number of tasks in chord
pub total: usize,
/// Number of completed tasks
pub completed: usize,
/// Callback task to execute when chord completes
pub callback: Option<String>,
/// Task IDs in the chord
pub task_ids: Vec<Uuid>,
}
let backend = RedisResultBackend::new("redis://localhost:6379")?
.with_prefix("myapp-task-".to_string());
// Results stored at: myapp-task-{task_id}
// Basic
let backend = RedisResultBackend::new("redis://localhost:6379")?;
// With password
let backend = RedisResultBackend::new("redis://:password@localhost:6379")?;
// TLS
let backend = RedisResultBackend::new("rediss://localhost:6379")?;
// Specific database
let backend = RedisResultBackend::new("redis://localhost:6379/2")?;
// Unix socket
let backend = RedisResultBackend::new("redis+unix:///tmp/redis.sock")?;
use tokio::time::{sleep, Duration};
async fn wait_for_result(
backend: &mut RedisResultBackend,
task_id: Uuid,
) -> Result<serde_json::Value, String> {
loop {
if let Some(meta) = backend.get_result(task_id).await? {
match meta.result {
TaskResult::Success(value) => return Ok(value),
TaskResult::Failure(err) => return Err(err),
TaskResult::Pending | TaskResult::Started => {
// Keep polling
}
TaskResult::Retry(count) => {
println!("Task retrying (attempt {})", count);
}
TaskResult::Revoked => return Err("Task was cancelled".to_string()),
}
}
sleep(Duration::from_millis(500)).await;
}
}
async fn track_task_lifecycle(task_id: Uuid) {
let mut backend = RedisResultBackend::new("redis://localhost:6379").unwrap();
// Create task
let mut meta = TaskMeta::new(task_id, "long_running_task".to_string());
backend.store_result(task_id, &meta).await.unwrap();
// Mark started
meta.started_at = Some(Utc::now());
meta.worker = Some("worker-01".to_string());
meta.result = TaskResult::Started;
backend.store_result(task_id, &meta).await.unwrap();
// Mark completed
meta.completed_at = Some(Utc::now());
meta.result = TaskResult::Success(serde_json::json!({"count": 1000}));
backend.store_result(task_id, &meta).await.unwrap();
// Set expiration
backend.set_expiration(task_id, Duration::from_secs(3600)).await.unwrap();
}
use celers_backend_redis::{RedisResultBackend, ChordState};
use celers_core::Broker;
async fn map_reduce_workflow<B: Broker>(
broker: &B,
backend: &mut RedisResultBackend,
) -> Result<(), Box<dyn std::error::Error>> {
// 1. Initialize chord
let chord_id = Uuid::new_v4();
let state = ChordState {
chord_id,
total: 3,
completed: 0,
callback: Some("aggregate_results".to_string()),
task_ids: vec![],
};
backend.chord_init(state).await?;
// 2. Enqueue header tasks (parallel)
for i in 0..3 {
let mut task = celers_core::SerializedTask::new(
"compute_partial".to_string(),
serde_json::to_vec(&serde_json::json!({"chunk": i}))?,
);
task.metadata.chord_id = Some(chord_id);
broker.enqueue(task).await?;
}
// 3. Workers complete tasks and call chord_complete_task()
// 4. When count == 3, callback is enqueued
Ok(())
}
use std::time::Duration;
async fn smart_expiration(
backend: &mut RedisResultBackend,
task_id: Uuid,
meta: &TaskMeta,
) {
match meta.result {
// Keep successful results for 24 hours
TaskResult::Success(_) => {
backend.set_expiration(task_id, Duration::from_secs(86400)).await.unwrap();
}
// Keep failures for 7 days (for debugging)
TaskResult::Failure(_) => {
backend.set_expiration(task_id, Duration::from_secs(604800)).await.unwrap();
}
// Temporary states expire quickly
TaskResult::Pending | TaskResult::Started => {
backend.set_expiration(task_id, Duration::from_secs(3600)).await.unwrap();
}
_ => {}
}
}
use celers_backend_redis::{RedisResultBackend, BackendError};
match backend.store_result(task_id, &meta).await {
Ok(()) => println!("Stored successfully"),
Err(BackendError::Redis(e)) => eprintln!("Redis error: {}", e),
Err(BackendError::Serialization(e)) => eprintln!("Serialization error: {}", e),
Err(BackendError::NotFound(id)) => eprintln!("Task {} not found", id),
Err(BackendError::Connection(e)) => eprintln!("Connection error: {}", e),
}
Error Types:
Redis: Underlying Redis client errorsSerialization: JSON encoding/decoding errorsNotFound: Task result doesn't existConnection: Failed to connect to Redisredis::Client::get_multiplexed_async_connection()INCR command (O(1), atomic)SET/GET commands (O(1))DEL command (O(1))EXPIRE command (O(1))| Operation | Latency | Notes |
|---|---|---|
| Store result | <1ms | Depends on network RTT |
| Get result | <1ms | Depends on network RTT |
| Chord increment | <1ms | Atomic operation |
| Set expiration | <1ms | Same as SET |
Optimization tips:
| Component | Size | Notes |
|---|---|---|
| Task metadata | ~500B - 2KB | Depends on result size |
| Chord state | ~200B | Per chord, not per task |
| Chord counter | ~16B | Integer value |
1 million tasks:
1000 active chords:
// Store result
backend.store_result(task_id, &meta).await?;
// Set expiration (required!)
backend.set_expiration(task_id, Duration::from_secs(86400)).await?;
Why: Prevents unbounded memory growth in Redis.
match backend.get_result(task_id).await? {
Some(meta) => { /* process result */ }
None => {
// Result expired or never stored
eprintln!("Result not available for task {}", task_id);
}
}
// After chord completes, clean up state
backend.delete_result(chord_id).await?;
// Or set TTL on chord state
backend.set_expiration(chord_id, Duration::from_secs(3600)).await?;
// Retry on transient errors
let mut retries = 3;
loop {
match backend.store_result(task_id, &meta).await {
Ok(()) => break,
Err(BackendError::Redis(e)) if retries > 0 => {
retries -= 1;
tokio::time::sleep(Duration::from_millis(100)).await;
}
Err(e) => return Err(e),
}
}
Compatible with Celery's default key naming:
celery-task-meta-{task_id}celery-chord-{chord_id}celery-chord-counter-{chord_id}Task metadata format matches Celery's backend structure:
{
"task_id": "550e8400-e29b-41d4-a716-446655440000",
"task_name": "myapp.tasks.process_data",
"result": {
"Success": {"value": 42}
},
"created_at": "2023-01-01T12:00:00Z",
"started_at": "2023-01-01T12:00:01Z",
"completed_at": "2023-01-01T12:00:05Z",
"worker": "worker-01"
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_store_and_get_result() {
let mut backend = RedisResultBackend::new("redis://localhost:6379").unwrap();
let task_id = Uuid::new_v4();
let mut meta = TaskMeta::new(task_id, "test".to_string());
meta.result = TaskResult::Success(serde_json::json!(42));
backend.store_result(task_id, &meta).await.unwrap();
let retrieved = backend.get_result(task_id).await.unwrap().unwrap();
assert_eq!(retrieved.task_id, task_id);
assert!(matches!(retrieved.result, TaskResult::Success(_)));
}
#[tokio::test]
async fn test_chord_barrier() {
let mut backend = RedisResultBackend::new("redis://localhost:6379").unwrap();
let chord_id = Uuid::new_v4();
let state = ChordState {
chord_id,
total: 3,
completed: 0,
callback: Some("callback".to_string()),
task_ids: vec![],
};
backend.chord_init(state).await.unwrap();
assert_eq!(backend.chord_complete_task(chord_id).await.unwrap(), 1);
assert_eq!(backend.chord_complete_task(chord_id).await.unwrap(), 2);
assert_eq!(backend.chord_complete_task(chord_id).await.unwrap(), 3);
}
}
Cause: TTL expired Solution: Increase TTL or retrieve results faster
Cause: Counter state lost (Redis restart) Solution: Enable Redis persistence (AOF or RDB)
Cause: Network latency Solution: Co-locate Redis with workers, use connection pooling
Cause: Missing TTL on results
Solution: Always call set_expiration() after storing results
celers-canvas - Chord workflow primitivescelers-worker - Worker integration with result backendcelers-core - Task types and traitsMIT OR Apache-2.0