| Crates.io | celers-canvas |
| lib.rs | celers-canvas |
| version | 0.1.0 |
| created_at | 2026-01-18 17:59:50.410462+00 |
| updated_at | 2026-01-18 17:59:50.410462+00 |
| description | Workflow primitives for CeleRS (Chain, Chord, Group, Map) |
| homepage | |
| repository | https://github.com/cool-japan/celers |
| max_upload_size | |
| id | 2052748 |
| size | 613,173 |
Distributed workflow primitives for CeleRS task orchestration. Build complex task dependencies with Chain, Group, Chord, Map, and Starmap patterns.
Production-ready workflow patterns inspired by Celery's Canvas:
use celers_canvas::{Chain, Group, Chord, Map, Signature};
use celers_broker_redis::RedisBroker;
// Create broker
let broker = RedisBroker::new("redis://localhost:6379", "celery")?;
// Chain: Sequential execution
let workflow = Chain::new()
.then("download_data", vec![serde_json::json!("https://example.com")])
.then("process_data", vec![])
.then("upload_result", vec![]);
let task_id = workflow.apply(&broker).await?;
println!("Chain started: {}", task_id);
Execute tasks one after another, passing results as arguments:
use celers_canvas::Chain;
// task1(args1) -> task2(result1) -> task3(result2)
let chain = Chain::new()
.then("fetch_user", vec![serde_json::json!(user_id)])
.then("validate_user", vec![])
.then("save_user", vec![]);
// Start the chain
let first_task_id = chain.apply(&broker).await?;
How it works:
Use cases:
Execute multiple tasks in parallel:
use celers_canvas::Group;
// (task1 | task2 | task3)
let group = Group::new()
.add("process_chunk_1", vec![serde_json::json!(data1)])
.add("process_chunk_2", vec![serde_json::json!(data2)])
.add("process_chunk_3", vec![serde_json::json!(data3)]);
// Start all tasks in parallel
let group_id = group.apply(&broker).await?;
Features:
Use cases:
Execute tasks in parallel, then run callback with aggregated results:
use celers_canvas::{Chord, Group, Signature};
use celers_backend_redis::RedisResultBackend;
// (task1 | task2 | task3) -> callback([result1, result2, result3])
let header = Group::new()
.add("compute_partial", vec![serde_json::json!(1)])
.add("compute_partial", vec![serde_json::json!(2)])
.add("compute_partial", vec![serde_json::json!(3)]);
let callback = Signature::new("aggregate_results".to_string());
let chord = Chord::new(header, callback);
// Requires result backend for coordination
let mut backend = RedisResultBackend::new("redis://localhost:6379")?;
let chord_id = chord.apply(&broker, &mut backend).await?;
How it works:
Requirements:
backend-redis feature enabledUse cases:
Apply the same task to multiple argument sets:
use celers_canvas::{Map, Signature};
// map(task, [args1, args2, args3])
let task = Signature::new("process_image".to_string());
let argsets = vec![
vec![serde_json::json!("image1.jpg")],
vec![serde_json::json!("image2.jpg")],
vec![serde_json::json!("image3.jpg")],
];
let map = Map::new(task, argsets);
let group_id = map.apply(&broker).await?;
Equivalent to:
Group::new()
.add("process_image", vec![serde_json::json!("image1.jpg")])
.add("process_image", vec![serde_json::json!("image2.jpg")])
.add("process_image", vec![serde_json::json!("image3.jpg")])
Use cases:
Like Map but unpacks argument tuples:
use celers_canvas::{Starmap, Signature};
// starmap(task, [(a1, b1), (a2, b2)])
let task = Signature::new("add".to_string());
let argsets = vec![
vec![serde_json::json!(10), serde_json::json!(20)], // add(10, 20)
vec![serde_json::json!(30), serde_json::json!(40)], // add(30, 40)
];
let starmap = Starmap::new(task, argsets);
let group_id = starmap.apply(&broker).await?;
Difference from Map:
Use cases:
use celers_canvas::Signature;
use std::collections::HashMap;
// Basic signature
let sig = Signature::new("my_task".to_string());
// With positional arguments
let sig = Signature::new("process".to_string())
.with_args(vec![
serde_json::json!("input.txt"),
serde_json::json!(100),
]);
// With keyword arguments
let mut kwargs = HashMap::new();
kwargs.insert("timeout".to_string(), serde_json::json!(300));
kwargs.insert("retries".to_string(), serde_json::json!(3));
let sig = Signature::new("process".to_string())
.with_kwargs(kwargs);
// With priority
let sig = Signature::new("urgent_task".to_string())
.with_priority(9); // Higher = more urgent
// Immutable (args cannot be replaced in chain)
let sig = Signature::new("critical_task".to_string())
.with_args(vec![serde_json::json!("data")])
.immutable();
Combine workflows for complex patterns:
// Process groups in sequence
let group1 = Group::new()
.add("task_a", vec![])
.add("task_b", vec![]);
let group2 = Group::new()
.add("task_c", vec![])
.add("task_d", vec![]);
// Execute groups sequentially (not directly supported, use manual coordination)
Assign priorities to workflow tasks:
let high_priority_chain = Chain::new()
.then_signature(
Signature::new("urgent_task".to_string())
.with_priority(9) // Highest priority
)
.then_signature(
Signature::new("followup_task".to_string())
.with_priority(8)
);
Handle partial results with error handling:
// In callback task, check which tasks completed successfully
// Results array may contain errors for failed tasks
async fn aggregate_results(results: Vec<Option<String>>) -> Result<String, String> {
let successful: Vec<_> = results.into_iter()
.filter_map(|r| r)
.collect();
if successful.len() >= 2 {
Ok(format!("Processed {} items", successful.len()))
} else {
Err("Too many failures".to_string())
}
}
Optimize workflows before execution using the WorkflowCompiler:
use celers_canvas::{Chain, Group, WorkflowCompiler, OptimizationPass};
// Create a workflow with redundant tasks
let chain = Chain::new()
.then_signature(Signature::new("process".to_string()).with_args(vec![json!(1)]))
.then_signature(Signature::new("validate".to_string()))
.then_signature(Signature::new("process".to_string()).with_args(vec![json!(1)])); // Duplicate
// Optimize the workflow
let compiler = WorkflowCompiler::new().aggressive();
let optimized = compiler.optimize_chain(&chain);
// The optimized chain has duplicates removed
assert_eq!(optimized.tasks.len(), 2); // Was 3, now 2
Available Optimization Passes:
Common Subexpression Elimination (CSE): Removes duplicate task signatures
let compiler = WorkflowCompiler::new().aggressive();
// Deduplicates identical tasks (same name, args, kwargs)
Dead Code Elimination (DCE): Removes unreachable or invalid tasks
let compiler = WorkflowCompiler::new();
// Removes tasks with empty names or no effect
Task Fusion: Combines sequential tasks with the same name
let compiler = WorkflowCompiler::new().aggressive();
// Combines immutable tasks with same name and priority
Parallel Scheduling: Optimizes task execution order
let compiler = WorkflowCompiler::new()
.add_pass(OptimizationPass::ParallelScheduling);
// Sorts group tasks by priority (highest first)
Resource Optimization: Improves resource utilization
let compiler = WorkflowCompiler::new()
.add_pass(OptimizationPass::ResourceOptimization);
// Groups tasks by queue for better locality
Example: Combined Optimizations
use celers_canvas::{Group, WorkflowCompiler, OptimizationPass};
let group = Group::new()
.add_signature(Signature::new("task1".to_string()).with_priority(1).with_args(vec![json!(1)]))
.add_signature(Signature::new("".to_string())) // Dead code
.add_signature(Signature::new("task2".to_string()).with_priority(9).with_args(vec![json!(2)]))
.add_signature(Signature::new("task1".to_string()).with_priority(1).with_args(vec![json!(1)])); // Duplicate
let compiler = WorkflowCompiler::new()
.aggressive()
.add_pass(OptimizationPass::ParallelScheduling);
let optimized = compiler.optimize_group(&group);
// Result: 2 tasks (dead code removed, duplicate removed, sorted by priority)
// Task order: task2 (priority 9), task1 (priority 1)
Performance Benefits:
examples/workflow_optimization.rs for detailed examplesThe Chord primitive uses Redis atomic operations for barrier synchronization:
// 1. Initialize chord state
chord_init(ChordState {
chord_id,
total: 3, // Number of header tasks
completed: 0, // Counter (Redis INCR)
callback: Some("aggregate_results"),
task_ids: vec![],
});
// 2. Each worker completion increments counter
let count = chord_complete_task(chord_id).await?; // Atomic INCR
// 3. When count == total, enqueue callback
if count >= state.total {
broker.enqueue(callback_task).await?;
}
[dependencies]
celers-canvas = { version = "0.1", features = ["backend-redis"] }
Available features:
backend-redis: Enable Chord support with Redis backend (required for barrier synchronization)Without backend-redis:
use celers_canvas::{Chain, CanvasError};
match Chain::new().then("task", vec![]).apply(&broker).await {
Ok(task_id) => println!("Started: {}", task_id),
Err(CanvasError::Invalid(msg)) => eprintln!("Invalid workflow: {}", msg),
Err(CanvasError::Broker(msg)) => eprintln!("Broker error: {}", msg),
Err(CanvasError::Serialization(msg)) => eprintln!("Serialization error: {}", msg),
}
Error Types:
Invalid: Empty workflow, missing callback, etc.Broker: Enqueue failures, connection errorsSerialization: JSON encoding errors| Feature | CeleRS Canvas | Celery Canvas |
|---|---|---|
| Chain | ✅ | ✅ |
| Group | ✅ | ✅ |
| Chord | ✅ | ✅ |
| Map | ✅ | ✅ |
| Starmap | ✅ | ✅ |
| Immutability | ✅ | ✅ |
| Priority | ✅ | ✅ |
| Nested Workflows | ⚠️ Manual | ✅ Automatic |
| Result Backend Required | Chord only | All (optional) |
Compatibility:
| Workflow | Time Complexity | Space Complexity | Notes |
|---|---|---|---|
| Chain | O(n) sequential | O(1) per task | Executes serially |
| Group | O(1) enqueue | O(n) tasks | Parallel execution |
| Chord | O(1) + callback | O(n) + state | Atomic counter overhead |
| Map | O(1) enqueue | O(n) tasks | Same as Group |
| Starmap | O(1) enqueue | O(n) tasks | Same as Group |
Throughput:
// ETL workflow
let pipeline = Chain::new()
.then("extract_data", vec![serde_json::json!("source.db")])
.then("transform_data", vec![])
.then("load_data", vec![serde_json::json!("dest.db")]);
pipeline.apply(&broker).await?;
// Batch resize images
let task = Signature::new("resize_image".to_string());
let images = vec![
vec![serde_json::json!("img1.jpg"), serde_json::json!("800x600")],
vec![serde_json::json!("img2.jpg"), serde_json::json!("800x600")],
vec![serde_json::json!("img3.jpg"), serde_json::json!("800x600")],
];
let workflow = Starmap::new(task, images);
workflow.apply(&broker).await?;
// Process log files in parallel, then aggregate
let header = Group::new()
.add("analyze_log", vec![serde_json::json!("log1.txt")])
.add("analyze_log", vec![serde_json::json!("log2.txt")])
.add("analyze_log", vec![serde_json::json!("log3.txt")]);
let callback = Signature::new("summarize_stats".to_string());
let workflow = Chord::new(header, callback);
let mut backend = RedisResultBackend::new("redis://localhost:6379")?;
workflow.apply(&broker, &mut backend).await?;
// High-priority urgent tasks
let urgent = Group::new()
.add_signature(
Signature::new("process_alert".to_string())
.with_args(vec![serde_json::json!("alert1")])
.with_priority(9)
)
.add_signature(
Signature::new("process_alert".to_string())
.with_args(vec![serde_json::json!("alert2")])
.with_priority(9)
);
urgent.apply(&broker).await?;
backend-redis)┌─────────────────────────────────────────────────────────────┐
│ Canvas Workflow │
└─────────────────────────────────────────────────────────────┘
│
├─ Chain ──> Sequential execution
├─ Group ──> Parallel execution
├─ Chord ──> Map-reduce (Group + callback)
├─ Map ───> Batch same task
└─ Starmap > Batch with unpacking
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Broker (Queue) │
│ (Redis, PostgreSQL) │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Workers (Executors) │
│ (Process tasks + update state) │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Result Backend (Chord coordination) │
│ (Redis atomic INCR) │
└─────────────────────────────────────────────────────────────┘
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_chain_workflow() {
let broker = MockBroker::new();
let chain = Chain::new()
.then("task1", vec![])
.then("task2", vec![]);
let task_id = chain.apply(&broker).await.unwrap();
assert!(broker.has_task(task_id));
}
}
examples/canvas_workflows.rs - Comprehensive workflow examplescelers-core - Task registry and executioncelers-worker - Worker runtime with workflow supportcelers-backend-redis - Result backend for ChordMIT OR Apache-2.0