celers-protocol

Crates.iocelers-protocol
lib.rscelers-protocol
version0.1.0
created_at2026-01-18 14:57:44.854618+00
updated_at2026-01-18 14:57:44.854618+00
descriptionCelery Protocol v2/v5 implementation for CeleRS
homepage
repositoryhttps://github.com/cool-japan/celers
max_upload_size
id2052447
size598,579
KitaSan (cool-japan)

documentation

README

celers-protocol

Celery protocol v2/v5 implementation for CeleRS. Ensures wire-level compatibility with Python Celery workers and brokers.

Overview

Production-ready protocol implementation with:

  • Celery Protocol v2: Compatible with Celery 4.x+
  • Celery Protocol v5: Compatible with Celery 5.x+
  • JSON Serialization: Default, universally compatible
  • MessagePack: Optional high-performance binary format
  • AMQP Properties: Correlation ID, reply-to, delivery mode
  • Workflow Headers: Parent ID, root ID, group ID
  • Base64 Encoding: Binary-safe message bodies
  • Full Metadata: ETA, expiration, retries, priority

Quick Start

use celers_protocol::{Message, MessageHeaders, ContentType};
use uuid::Uuid;

// Create a simple task message
let task_id = Uuid::new_v4();
let body = serde_json::to_vec(&serde_json::json!({
    "args": [1, 2],
    "kwargs": {}
})).unwrap();

let message = Message::new("tasks.add".to_string(), task_id, body);

// Serialize to JSON for transport
let serialized = serde_json::to_string(&message).unwrap();

Protocol Structure

Complete Message Format

pub struct Message {
    /// Message headers (task metadata)
    pub headers: MessageHeaders,

    /// Message properties (AMQP-like)
    pub properties: MessageProperties,

    /// Serialized body (task arguments)
    pub body: Vec<u8>,

    /// Content type ("application/json", "application/x-msgpack")
    pub content_type: String,

    /// Content encoding ("utf-8", "binary")
    pub content_encoding: String,
}

JSON representation:

{
  "headers": {
    "task": "tasks.add",
    "id": "550e8400-e29b-41d4-a716-446655440000",
    "lang": "rust",
    "retries": 3
  },
  "properties": {
    "delivery_mode": 2,
    "priority": 5
  },
  "body": "eyJhcmdzIjogWzEsIDJdLCAia3dhcmdzIjoge319",
  "content-type": "application/json",
  "content-encoding": "utf-8"
}

Message Headers

pub struct MessageHeaders {
    /// Task name (e.g., "tasks.add")
    pub task: String,

    /// Task ID (UUID)
    pub id: Uuid,

    /// Programming language ("rust", "py")
    pub lang: String,

    /// Root task ID (for workflow tracking)
    pub root_id: Option<Uuid>,

    /// Parent task ID (for nested tasks)
    pub parent_id: Option<Uuid>,

    /// Group ID (for grouped tasks)
    pub group: Option<Uuid>,

    /// Maximum retries
    pub retries: Option<u32>,

    /// ETA (Estimated Time of Arrival) for delayed tasks
    pub eta: Option<DateTime<Utc>>,

    /// Task expiration timestamp
    pub expires: Option<DateTime<Utc>>,

    /// Additional custom headers
    pub extra: HashMap<String, serde_json::Value>,
}

Message Properties

pub struct MessageProperties {
    /// Correlation ID for RPC-style calls
    pub correlation_id: Option<String>,

    /// Reply-to queue for results
    pub reply_to: Option<String>,

    /// Delivery mode (1 = non-persistent, 2 = persistent)
    pub delivery_mode: u8,

    /// Priority (0-9, higher = more priority)
    pub priority: Option<u8>,
}

Creating Messages

Basic Task

use celers_protocol::Message;
use uuid::Uuid;

let task_id = Uuid::new_v4();
let body = serde_json::to_vec(&serde_json::json!({
    "args": ["hello", "world"],
    "kwargs": {}
})).unwrap();

let message = Message::new("tasks.greet".to_string(), task_id, body);

With Priority

let message = Message::new("urgent_task".to_string(), task_id, body)
    .with_priority(9);  // Highest priority

Priority levels:

  • 0-3: Low priority
  • 4-6: Normal priority
  • 7-9: High priority

With Parent/Root ID (Workflows)

let parent_id = Uuid::new_v4();
let root_id = Uuid::new_v4();

let message = Message::new("child_task".to_string(), task_id, body)
    .with_parent(parent_id)
    .with_root(root_id);

Use cases:

  • Chain workflows (parent → child)
  • Workflow tracking (all tasks share root_id)
  • Result aggregation by root_id

With Group ID

let group_id = Uuid::new_v4();

let message = Message::new("parallel_task".to_string(), task_id, body)
    .with_group(group_id);

Use cases:

  • Group/Chord workflows
  • Parallel task tracking
  • Bulk operations

With ETA (Delayed Execution)

use chrono::{Duration, Utc};

// Execute in 1 hour
let eta = Utc::now() + Duration::hours(1);
let message = Message::new("delayed_task".to_string(), task_id, body)
    .with_eta(eta);

With Expiration

use chrono::{Duration, Utc};

// Task expires in 5 minutes
let expires = Utc::now() + Duration::minutes(5);
let message = Message::new("time_sensitive".to_string(), task_id, body)
    .with_expires(expires);

Task Arguments

Standard Format

use celers_protocol::TaskArgs;

let args = TaskArgs {
    args: vec![
        serde_json::json!(10),
        serde_json::json!(20),
    ],
    kwargs: HashMap::from([
        ("timeout".to_string(), serde_json::json!(300)),
        ("retries".to_string(), serde_json::json!(3)),
    ]),
};

let body = serde_json::to_vec(&args).unwrap();
let message = Message::new("tasks.add".to_string(), task_id, body);

JSON representation:

{
  "args": [10, 20],
  "kwargs": {
    "timeout": 300,
    "retries": 3
  }
}

Complex Arguments

let args = TaskArgs {
    args: vec![
        serde_json::json!({
            "user_id": 123,
            "data": [1, 2, 3]
        }),
    ],
    kwargs: HashMap::from([
        ("options".to_string(), serde_json::json!({
            "format": "json",
            "compress": true
        })),
    ]),
};

Protocol Versions

Version 2 (Default)

use celers_protocol::ProtocolVersion;

let version = ProtocolVersion::V2;  // Celery 4.x+

Features:

  • JSON/MessagePack serialization
  • Basic workflow support
  • AMQP-style properties
  • Task metadata

Compatible with:

  • Celery 4.0+
  • Celery 5.0+ (backward compatible)

Version 5

let version = ProtocolVersion::V5;  // Celery 5.x+

Additional features:

  • Extended workflow metadata
  • Improved error handling
  • Enhanced tracing

Content Types

JSON (Default)

use celers_protocol::ContentType;

let content_type = ContentType::Json;
assert_eq!(content_type.as_str(), "application/json");

Pros:

  • Human-readable
  • Universally supported
  • Easy debugging

Cons:

  • Larger message size
  • Slower serialization

MessagePack (Optional)

[dependencies]
celers-protocol = { version = "0.1", features = ["msgpack"] }
use celers_protocol::ContentType;

let content_type = ContentType::MessagePack;
assert_eq!(content_type.as_str(), "application/x-msgpack");

Pros:

  • Compact binary format
  • Fast serialization
  • Smaller message size

Cons:

  • Not human-readable
  • Requires msgpack feature

Binary (Custom)

[dependencies]
celers-protocol = { version = "0.1", features = ["binary"] }
let content_type = ContentType::Binary;
assert_eq!(content_type.as_str(), "application/octet-stream");

Custom Content Type

let content_type = ContentType::Custom("application/protobuf".to_string());

Serialization

Message Serialization

use celers_protocol::Message;

let message = Message::new("task".to_string(), task_id, body);

// To JSON
let json = serde_json::to_string(&message)?;

// To bytes (for broker)
let bytes = serde_json::to_vec(&message)?;

Message Deserialization

// From JSON string
let message: Message = serde_json::from_str(&json)?;

// From bytes
let message: Message = serde_json::from_slice(&bytes)?;

Base64 Encoding

Message bodies are automatically base64-encoded when serializing to JSON:

let body = vec![0xFF, 0xFE, 0xFD];  // Binary data
let message = Message::new("task".to_string(), task_id, body);

let json = serde_json::to_string(&message)?;
// body field in JSON: "//79" (base64)

Celery Compatibility

Python Celery Interoperability

Send from Rust, receive in Python:

// Rust: Send task
let message = Message::new("python_task".to_string(), task_id, body);
broker.enqueue(message).await?;
# Python: Receive and execute
from celery import Celery

app = Celery('myapp', broker='redis://localhost:6379')

@app.task(name='python_task')
def python_task(arg1, arg2):
    return arg1 + arg2

Send from Python, receive in Rust:

# Python: Send task
from celery import Celery

app = Celery('myapp', broker='redis://localhost:6379')
app.send_task('rust_task', args=[1, 2])
// Rust: Receive and execute
use celers_core::TaskRegistry;

let mut registry = TaskRegistry::new();
registry.register("rust_task", |args: Vec<i32>| async move {
    Ok(args[0] + args[1])
});

Wire Format Compatibility

CeleRS messages are 100% compatible with Celery wire format:

Component CeleRS Celery Compatible?
Headers ✅ Yes
Properties ✅ Yes
Body format JSON/MessagePack JSON/MessagePack/Pickle ✅ Yes*
UUIDs ✅ Yes
Timestamps ISO8601 ISO8601 ✅ Yes

*Pickle not supported in CeleRS (security reasons)

Message Examples

Simple Task

{
  "headers": {
    "task": "tasks.add",
    "id": "550e8400-e29b-41d4-a716-446655440000",
    "lang": "rust"
  },
  "properties": {
    "delivery_mode": 2
  },
  "body": "eyJhcmdzIjogWzEsIDJdLCAia3dhcmdzIjoge319",
  "content-type": "application/json",
  "content-encoding": "utf-8"
}

Priority Task

{
  "headers": {
    "task": "urgent_task",
    "id": "...",
    "lang": "rust",
    "retries": 3
  },
  "properties": {
    "delivery_mode": 2,
    "priority": 9
  },
  "body": "...",
  "content-type": "application/json",
  "content-encoding": "utf-8"
}

Workflow Task (Chain)

{
  "headers": {
    "task": "child_task",
    "id": "...",
    "lang": "rust",
    "parent_id": "parent-uuid",
    "root_id": "root-uuid"
  },
  "properties": {
    "delivery_mode": 2
  },
  "body": "...",
  "content-type": "application/json",
  "content-encoding": "utf-8"
}

Group Task

{
  "headers": {
    "task": "parallel_task",
    "id": "...",
    "lang": "rust",
    "group": "group-uuid"
  },
  "properties": {
    "delivery_mode": 2
  },
  "body": "...",
  "content-type": "application/json",
  "content-encoding": "utf-8"
}

Delayed Task (ETA)

{
  "headers": {
    "task": "delayed_task",
    "id": "...",
    "lang": "rust",
    "eta": "2023-12-31T23:59:59Z"
  },
  "properties": {
    "delivery_mode": 2
  },
  "body": "...",
  "content-type": "application/json",
  "content-encoding": "utf-8"
}

Best Practices

1. Always Set Task ID

// Good: Unique ID
let task_id = Uuid::new_v4();
let message = Message::new("task".to_string(), task_id, body);

// Bad: Reused ID (don't do this)
// let message = Message::new("task".to_string(), old_id, body);

2. Use Priority Sparingly

// Good: Reserve high priority for urgent tasks
let message = Message::new("critical_alert".to_string(), task_id, body)
    .with_priority(9);

// Bad: Everything is high priority (defeats the purpose)
// let message = Message::new("regular_task".to_string(), task_id, body)
//     .with_priority(9);

3. Set Expiration for Time-Sensitive Tasks

use chrono::{Duration, Utc};

// Task only relevant for 5 minutes
let expires = Utc::now() + Duration::minutes(5);
let message = Message::new("realtime_task".to_string(), task_id, body)
    .with_expires(expires);

4. Use Workflows for Related Tasks

// Parent task
let parent_id = Uuid::new_v4();
let root_id = parent_id;  // Root is the first task

let parent_msg = Message::new("parent".to_string(), parent_id, body1)
    .with_root(root_id);

// Child task
let child_id = Uuid::new_v4();
let child_msg = Message::new("child".to_string(), child_id, body2)
    .with_parent(parent_id)
    .with_root(root_id);

5. Choose Appropriate Content Type

// Small messages: JSON is fine
let json_msg = Message::new("small_task".to_string(), task_id, small_body);

// Large messages or high throughput: Use MessagePack
#[cfg(feature = "msgpack")]
let msgpack_msg = {
    let mut msg = Message::new("large_task".to_string(), task_id, large_body);
    msg.content_type = ContentType::MessagePack.as_str().to_string();
    msg
};

Troubleshooting

Messages not received by Python workers

Cause: Content type mismatch Solution: Ensure content-type is "application/json" or "application/x-msgpack"

Binary data corruption

Cause: Missing base64 encoding Solution: Body is automatically base64-encoded when serializing to JSON

Priority not working

Cause: Broker doesn't support priorities Solution: Use Redis with sorted sets or RabbitMQ with priority queues

ETA tasks executing immediately

Cause: Worker doesn't check ETA Solution: Use celers-worker or Celery worker with ETA support

Performance

Message Size

Content Type Overhead Typical Size
JSON ~30% 200-500B + body
MessagePack ~10% 150-300B + body
Binary Minimal 100-200B + body

Serialization Speed

Format Serialize Deserialize
JSON ~100K msg/sec ~100K msg/sec
MessagePack ~200K msg/sec ~200K msg/sec

Recommendation: Use MessagePack for high-throughput systems.

Security Considerations

No Pickle Support

Unlike Python Celery, CeleRS does not support Pickle serialization:

# Python (INSECURE - don't use)
app.conf.task_serializer = 'pickle'  # ❌ Arbitrary code execution

# CeleRS (SECURE)
# Only JSON and MessagePack supported  # ✅ Safe

Why: Pickle allows arbitrary code execution, making it a security risk.

Content-Type Validation

Always validate content-type before deserializing:

match message.content_type.as_str() {
    "application/json" => {
        // Safe to deserialize JSON
        let args: TaskArgs = serde_json::from_slice(&message.body)?;
    }
    _ => {
        return Err("Unsupported content type");
    }
}

Testing

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_message_creation() {
        let task_id = Uuid::new_v4();
        let body = vec![1, 2, 3];
        let message = Message::new("test".to_string(), task_id, body);

        assert_eq!(message.headers.task, "test");
        assert_eq!(message.headers.id, task_id);
        assert_eq!(message.headers.lang, "rust");
    }

    #[test]
    fn test_message_serialization() {
        let task_id = Uuid::new_v4();
        let body = serde_json::to_vec(&serde_json::json!({
            "args": [1, 2],
            "kwargs": {}
        })).unwrap();

        let message = Message::new("task".to_string(), task_id, body);
        let json = serde_json::to_string(&message).unwrap();

        assert!(json.contains("\"task\":\"task\""));
        assert!(json.contains("\"lang\":\"rust\""));
    }

    #[test]
    fn test_builder_pattern() {
        let message = Message::new("task".to_string(), Uuid::new_v4(), vec![])
            .with_priority(9)
            .with_group(Uuid::new_v4());

        assert_eq!(message.properties.priority, Some(9));
        assert!(message.headers.group.is_some());
    }
}

See Also

  • Core: celers-core - Task execution and registry
  • Broker: celers-broker-redis - Redis broker implementation
  • Worker: celers-worker - Worker runtime

License

MIT OR Apache-2.0

Commit count: 1

cargo fmt