| Crates.io | celers-protocol |
| lib.rs | celers-protocol |
| version | 0.1.0 |
| created_at | 2026-01-18 14:57:44.854618+00 |
| updated_at | 2026-01-18 14:57:44.854618+00 |
| description | Celery Protocol v2/v5 implementation for CeleRS |
| homepage | |
| repository | https://github.com/cool-japan/celers |
| max_upload_size | |
| id | 2052447 |
| size | 598,579 |
Celery protocol v2/v5 implementation for CeleRS. Ensures wire-level compatibility with Python Celery workers and brokers.
Production-ready protocol implementation with:
use celers_protocol::{Message, MessageHeaders, ContentType};
use uuid::Uuid;
// Create a simple task message
let task_id = Uuid::new_v4();
let body = serde_json::to_vec(&serde_json::json!({
"args": [1, 2],
"kwargs": {}
})).unwrap();
let message = Message::new("tasks.add".to_string(), task_id, body);
// Serialize to JSON for transport
let serialized = serde_json::to_string(&message).unwrap();
pub struct Message {
/// Message headers (task metadata)
pub headers: MessageHeaders,
/// Message properties (AMQP-like)
pub properties: MessageProperties,
/// Serialized body (task arguments)
pub body: Vec<u8>,
/// Content type ("application/json", "application/x-msgpack")
pub content_type: String,
/// Content encoding ("utf-8", "binary")
pub content_encoding: String,
}
JSON representation:
{
"headers": {
"task": "tasks.add",
"id": "550e8400-e29b-41d4-a716-446655440000",
"lang": "rust",
"retries": 3
},
"properties": {
"delivery_mode": 2,
"priority": 5
},
"body": "eyJhcmdzIjogWzEsIDJdLCAia3dhcmdzIjoge319",
"content-type": "application/json",
"content-encoding": "utf-8"
}
pub struct MessageHeaders {
/// Task name (e.g., "tasks.add")
pub task: String,
/// Task ID (UUID)
pub id: Uuid,
/// Programming language ("rust", "py")
pub lang: String,
/// Root task ID (for workflow tracking)
pub root_id: Option<Uuid>,
/// Parent task ID (for nested tasks)
pub parent_id: Option<Uuid>,
/// Group ID (for grouped tasks)
pub group: Option<Uuid>,
/// Maximum retries
pub retries: Option<u32>,
/// ETA (Estimated Time of Arrival) for delayed tasks
pub eta: Option<DateTime<Utc>>,
/// Task expiration timestamp
pub expires: Option<DateTime<Utc>>,
/// Additional custom headers
pub extra: HashMap<String, serde_json::Value>,
}
pub struct MessageProperties {
/// Correlation ID for RPC-style calls
pub correlation_id: Option<String>,
/// Reply-to queue for results
pub reply_to: Option<String>,
/// Delivery mode (1 = non-persistent, 2 = persistent)
pub delivery_mode: u8,
/// Priority (0-9, higher = more priority)
pub priority: Option<u8>,
}
use celers_protocol::Message;
use uuid::Uuid;
let task_id = Uuid::new_v4();
let body = serde_json::to_vec(&serde_json::json!({
"args": ["hello", "world"],
"kwargs": {}
})).unwrap();
let message = Message::new("tasks.greet".to_string(), task_id, body);
let message = Message::new("urgent_task".to_string(), task_id, body)
.with_priority(9); // Highest priority
Priority levels:
let parent_id = Uuid::new_v4();
let root_id = Uuid::new_v4();
let message = Message::new("child_task".to_string(), task_id, body)
.with_parent(parent_id)
.with_root(root_id);
Use cases:
let group_id = Uuid::new_v4();
let message = Message::new("parallel_task".to_string(), task_id, body)
.with_group(group_id);
Use cases:
use chrono::{Duration, Utc};
// Execute in 1 hour
let eta = Utc::now() + Duration::hours(1);
let message = Message::new("delayed_task".to_string(), task_id, body)
.with_eta(eta);
use chrono::{Duration, Utc};
// Task expires in 5 minutes
let expires = Utc::now() + Duration::minutes(5);
let message = Message::new("time_sensitive".to_string(), task_id, body)
.with_expires(expires);
use celers_protocol::TaskArgs;
let args = TaskArgs {
args: vec![
serde_json::json!(10),
serde_json::json!(20),
],
kwargs: HashMap::from([
("timeout".to_string(), serde_json::json!(300)),
("retries".to_string(), serde_json::json!(3)),
]),
};
let body = serde_json::to_vec(&args).unwrap();
let message = Message::new("tasks.add".to_string(), task_id, body);
JSON representation:
{
"args": [10, 20],
"kwargs": {
"timeout": 300,
"retries": 3
}
}
let args = TaskArgs {
args: vec![
serde_json::json!({
"user_id": 123,
"data": [1, 2, 3]
}),
],
kwargs: HashMap::from([
("options".to_string(), serde_json::json!({
"format": "json",
"compress": true
})),
]),
};
use celers_protocol::ProtocolVersion;
let version = ProtocolVersion::V2; // Celery 4.x+
Features:
Compatible with:
let version = ProtocolVersion::V5; // Celery 5.x+
Additional features:
use celers_protocol::ContentType;
let content_type = ContentType::Json;
assert_eq!(content_type.as_str(), "application/json");
Pros:
Cons:
[dependencies]
celers-protocol = { version = "0.1", features = ["msgpack"] }
use celers_protocol::ContentType;
let content_type = ContentType::MessagePack;
assert_eq!(content_type.as_str(), "application/x-msgpack");
Pros:
Cons:
[dependencies]
celers-protocol = { version = "0.1", features = ["binary"] }
let content_type = ContentType::Binary;
assert_eq!(content_type.as_str(), "application/octet-stream");
let content_type = ContentType::Custom("application/protobuf".to_string());
use celers_protocol::Message;
let message = Message::new("task".to_string(), task_id, body);
// To JSON
let json = serde_json::to_string(&message)?;
// To bytes (for broker)
let bytes = serde_json::to_vec(&message)?;
// From JSON string
let message: Message = serde_json::from_str(&json)?;
// From bytes
let message: Message = serde_json::from_slice(&bytes)?;
Message bodies are automatically base64-encoded when serializing to JSON:
let body = vec![0xFF, 0xFE, 0xFD]; // Binary data
let message = Message::new("task".to_string(), task_id, body);
let json = serde_json::to_string(&message)?;
// body field in JSON: "//79" (base64)
Send from Rust, receive in Python:
// Rust: Send task
let message = Message::new("python_task".to_string(), task_id, body);
broker.enqueue(message).await?;
# Python: Receive and execute
from celery import Celery
app = Celery('myapp', broker='redis://localhost:6379')
@app.task(name='python_task')
def python_task(arg1, arg2):
return arg1 + arg2
Send from Python, receive in Rust:
# Python: Send task
from celery import Celery
app = Celery('myapp', broker='redis://localhost:6379')
app.send_task('rust_task', args=[1, 2])
// Rust: Receive and execute
use celers_core::TaskRegistry;
let mut registry = TaskRegistry::new();
registry.register("rust_task", |args: Vec<i32>| async move {
Ok(args[0] + args[1])
});
CeleRS messages are 100% compatible with Celery wire format:
| Component | CeleRS | Celery | Compatible? |
|---|---|---|---|
| Headers | ✅ | ✅ | ✅ Yes |
| Properties | ✅ | ✅ | ✅ Yes |
| Body format | JSON/MessagePack | JSON/MessagePack/Pickle | ✅ Yes* |
| UUIDs | ✅ | ✅ | ✅ Yes |
| Timestamps | ISO8601 | ISO8601 | ✅ Yes |
*Pickle not supported in CeleRS (security reasons)
{
"headers": {
"task": "tasks.add",
"id": "550e8400-e29b-41d4-a716-446655440000",
"lang": "rust"
},
"properties": {
"delivery_mode": 2
},
"body": "eyJhcmdzIjogWzEsIDJdLCAia3dhcmdzIjoge319",
"content-type": "application/json",
"content-encoding": "utf-8"
}
{
"headers": {
"task": "urgent_task",
"id": "...",
"lang": "rust",
"retries": 3
},
"properties": {
"delivery_mode": 2,
"priority": 9
},
"body": "...",
"content-type": "application/json",
"content-encoding": "utf-8"
}
{
"headers": {
"task": "child_task",
"id": "...",
"lang": "rust",
"parent_id": "parent-uuid",
"root_id": "root-uuid"
},
"properties": {
"delivery_mode": 2
},
"body": "...",
"content-type": "application/json",
"content-encoding": "utf-8"
}
{
"headers": {
"task": "parallel_task",
"id": "...",
"lang": "rust",
"group": "group-uuid"
},
"properties": {
"delivery_mode": 2
},
"body": "...",
"content-type": "application/json",
"content-encoding": "utf-8"
}
{
"headers": {
"task": "delayed_task",
"id": "...",
"lang": "rust",
"eta": "2023-12-31T23:59:59Z"
},
"properties": {
"delivery_mode": 2
},
"body": "...",
"content-type": "application/json",
"content-encoding": "utf-8"
}
// Good: Unique ID
let task_id = Uuid::new_v4();
let message = Message::new("task".to_string(), task_id, body);
// Bad: Reused ID (don't do this)
// let message = Message::new("task".to_string(), old_id, body);
// Good: Reserve high priority for urgent tasks
let message = Message::new("critical_alert".to_string(), task_id, body)
.with_priority(9);
// Bad: Everything is high priority (defeats the purpose)
// let message = Message::new("regular_task".to_string(), task_id, body)
// .with_priority(9);
use chrono::{Duration, Utc};
// Task only relevant for 5 minutes
let expires = Utc::now() + Duration::minutes(5);
let message = Message::new("realtime_task".to_string(), task_id, body)
.with_expires(expires);
// Parent task
let parent_id = Uuid::new_v4();
let root_id = parent_id; // Root is the first task
let parent_msg = Message::new("parent".to_string(), parent_id, body1)
.with_root(root_id);
// Child task
let child_id = Uuid::new_v4();
let child_msg = Message::new("child".to_string(), child_id, body2)
.with_parent(parent_id)
.with_root(root_id);
// Small messages: JSON is fine
let json_msg = Message::new("small_task".to_string(), task_id, small_body);
// Large messages or high throughput: Use MessagePack
#[cfg(feature = "msgpack")]
let msgpack_msg = {
let mut msg = Message::new("large_task".to_string(), task_id, large_body);
msg.content_type = ContentType::MessagePack.as_str().to_string();
msg
};
Cause: Content type mismatch
Solution: Ensure content-type is "application/json" or "application/x-msgpack"
Cause: Missing base64 encoding Solution: Body is automatically base64-encoded when serializing to JSON
Cause: Broker doesn't support priorities Solution: Use Redis with sorted sets or RabbitMQ with priority queues
Cause: Worker doesn't check ETA
Solution: Use celers-worker or Celery worker with ETA support
| Content Type | Overhead | Typical Size |
|---|---|---|
| JSON | ~30% | 200-500B + body |
| MessagePack | ~10% | 150-300B + body |
| Binary | Minimal | 100-200B + body |
| Format | Serialize | Deserialize |
|---|---|---|
| JSON | ~100K msg/sec | ~100K msg/sec |
| MessagePack | ~200K msg/sec | ~200K msg/sec |
Recommendation: Use MessagePack for high-throughput systems.
Unlike Python Celery, CeleRS does not support Pickle serialization:
# Python (INSECURE - don't use)
app.conf.task_serializer = 'pickle' # ❌ Arbitrary code execution
# CeleRS (SECURE)
# Only JSON and MessagePack supported # ✅ Safe
Why: Pickle allows arbitrary code execution, making it a security risk.
Always validate content-type before deserializing:
match message.content_type.as_str() {
"application/json" => {
// Safe to deserialize JSON
let args: TaskArgs = serde_json::from_slice(&message.body)?;
}
_ => {
return Err("Unsupported content type");
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_message_creation() {
let task_id = Uuid::new_v4();
let body = vec![1, 2, 3];
let message = Message::new("test".to_string(), task_id, body);
assert_eq!(message.headers.task, "test");
assert_eq!(message.headers.id, task_id);
assert_eq!(message.headers.lang, "rust");
}
#[test]
fn test_message_serialization() {
let task_id = Uuid::new_v4();
let body = serde_json::to_vec(&serde_json::json!({
"args": [1, 2],
"kwargs": {}
})).unwrap();
let message = Message::new("task".to_string(), task_id, body);
let json = serde_json::to_string(&message).unwrap();
assert!(json.contains("\"task\":\"task\""));
assert!(json.contains("\"lang\":\"rust\""));
}
#[test]
fn test_builder_pattern() {
let message = Message::new("task".to_string(), Uuid::new_v4(), vec![])
.with_priority(9)
.with_group(Uuid::new_v4());
assert_eq!(message.properties.priority, Some(9));
assert!(message.headers.group.is_some());
}
}
celers-core - Task execution and registrycelers-broker-redis - Redis broker implementationcelers-worker - Worker runtimeMIT OR Apache-2.0