| Crates.io | celers-core |
| lib.rs | celers-core |
| version | 0.1.0 |
| created_at | 2026-01-18 14:55:55.388709+00 |
| updated_at | 2026-01-18 14:55:55.388709+00 |
| description | Core traits and types for CeleRS distributed task queue |
| homepage | |
| repository | https://github.com/cool-japan/celers |
| max_upload_size | |
| id | 2052446 |
| size | 546,342 |
Core abstractions and traits for the CeleRS distributed task queue system.
This crate provides the fundamental building blocks for CeleRS:
use celers_core::{Broker, SerializedTask, TaskRegistry};
// Create a task
let task = SerializedTask::new(
"my_task".to_string(),
serde_json::to_vec(&args)?,
);
// Enqueue to broker
let task_id = broker.enqueue(task).await?;
// Batch enqueue for better performance
let task_ids = broker.enqueue_batch(tasks).await?;
// Dequeue and execute
if let Some(msg) = broker.dequeue().await? {
let result = registry.execute(&msg.task).await?;
broker.ack(&msg.task.metadata.id, msg.receipt_handle.as_deref()).await?;
}
The core abstraction for message brokers:
#[async_trait]
pub trait Broker: Send + Sync {
// Single operations
async fn enqueue(&self, task: SerializedTask) -> Result<TaskId>;
async fn dequeue(&self) -> Result<Option<BrokerMessage>>;
async fn ack(&self, task_id: &TaskId, receipt_handle: Option<&str>) -> Result<()>;
async fn reject(&self, task_id: &TaskId, receipt_handle: Option<&str>, requeue: bool) -> Result<()>;
// Batch operations (10-100x faster)
async fn enqueue_batch(&self, tasks: Vec<SerializedTask>) -> Result<Vec<TaskId>>;
async fn dequeue_batch(&self, count: usize) -> Result<Vec<BrokerMessage>>;
async fn ack_batch(&self, tasks: &[(TaskId, Option<String>)]) -> Result<()>;
// Management
async fn queue_size(&self) -> Result<usize>;
async fn cancel(&self, task_id: &TaskId) -> Result<bool>;
}
pub struct SerializedTask {
pub metadata: TaskMetadata,
pub args: Vec<u8>, // JSON-serialized arguments
}
impl SerializedTask {
pub fn new(name: String, args: Vec<u8>) -> Self;
pub fn with_priority(self, priority: i32) -> Self;
pub fn with_max_retries(self, max_retries: u32) -> Self;
pub fn with_timeout(self, timeout_secs: u64) -> Self;
}
pub struct TaskMetadata {
pub id: Uuid,
pub name: String,
pub state: TaskState,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub max_retries: u32,
pub timeout_secs: Option<u64>,
pub priority: i32,
// Workflow support
pub group_id: Option<Uuid>, // Group ID for parallel execution
pub chord_id: Option<Uuid>, // Chord ID for barrier synchronization
}
pub enum TaskState {
Pending,
Running,
Retrying(u32), // retry count
Success,
Failure,
Cancelled,
}
Execute registered tasks by name:
use celers_core::TaskRegistry;
let mut registry = TaskRegistry::new();
// Register a task executor
registry.register("my_task", |args| async move {
// Task implementation
Ok(result)
});
// Execute by name
let result = registry.execute(&task).await?;
pub enum CelersError {
Broker(String),
Serialization(String),
Deserialization(String),
TaskNotFound(String),
TaskTimeout,
Other(String),
}
pub type Result<T> = std::result::Result<T, CelersError>;
Batch operations provide significant performance improvements:
| Operation | Individual | Batch (10) | Batch (100) |
|---|---|---|---|
| Throughput | 1K/sec | 10K/sec | 50K/sec |
| Latency | 1ms/task | 0.1ms/task | 0.01ms/task |
| Network RTT | N | 1 | 1 |
// Good: Batch enqueue for bulk operations
let tasks = create_many_tasks(100);
broker.enqueue_batch(tasks).await?;
// Less efficient: Individual enqueue
for task in tasks {
broker.enqueue(task).await?;
}
TaskMetadata includes fields for Canvas workflow primitives:
// Group tasks
task.metadata.group_id = Some(group_id);
// Chord tasks (map-reduce)
task.metadata.chord_id = Some(chord_id);
See celers-canvas for high-level workflow APIs.
[dependencies]
async-trait = "0.1"
uuid = { version = "1.0", features = ["v4", "serde"] }
serde = { version = "1.0", features = ["derive"] }
chrono = { version = "0.4", features = ["serde"] }
thiserror = "2.0"
MIT OR Apache-2.0