| Crates.io | chainmq |
| lib.rs | chainmq |
| version | 0.2.0 |
| created_at | 2025-09-23 08:09:12.602167+00 |
| updated_at | 2025-09-26 16:57:35.974632+00 |
| description | A Redis-backed, type-safe job queue for Rust. Provides job registration and execution, delayed jobs, retries with backoff, and scalable workers. |
| homepage | |
| repository | https://github.com/thecre8tor/chainmq |
| max_upload_size | |
| id | 1851132 |
| size | 116,454 |
A Redis-backed, type-safe job queue for Rust. Provides job registration and execution, delayed jobs, retries with backoff, and scalable workers.
This crate is library-first. Runnable examples demonstrate typical patterns (single worker, multiple jobs, multiple workers, delayed jobs, failure/retry).
Add ChainMQ to your Cargo.toml:
[dependencies]
chainmq = "0.2.0"
actix-web = "4.0"
redis = "0.23"
tokio = { version = "1.0", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
use chainmq::{Job, AppContext};
use serde::{Deserialize, Serialize};
use async_trait::async_trait;
use std::sync::Arc;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EmailJob {
pub to: String,
pub subject: String,
pub body: String,
}
#[async_trait]
impl Job for EmailJob {
async fn perform(&self, ctx: &JobContext) -> chainmq::Result<()> {
if let Some(app_ctx) = ctx.app::<AppState>() {
let response = app_ctx
.mail_client
.send_email(&self.to, &self.subject, &self.body)
.await;
match response {
Ok(result) => println!("Email sent successfully: {:#?}", result),
Err(error) => println!("Failed to send email: {}", error),
}
}
Ok(())
}
fn name() -> &'static str {
"EmailJob"
}
fn queue_name() -> &'static str {
"emails"
}
}
use chainmq::AppContext;
use std::sync::Arc;
#[derive(Clone)]
pub struct AppState {
pub mail_client: Arc<MailClient>,
pub redis_client: Arc<redis::Client>,
}
impl AppContext for AppState {
fn clone_context(&self) -> Arc<dyn AppContext> {
Arc::new(self.clone())
}
}
use chainmq::{JobRegistry, WorkerBuilder};
use actix_web::{web::Data, App, HttpServer};
use redis::Client as RedisClient;
use tokio::sync::broadcast;
async fn setup_application() -> Result<(), anyhow::Error> {
// Initialize Redis connection
let redis_client = RedisClient::open("redis://127.0.0.1/")?;
// Create application state
let app_state = Arc::new(AppState {
mail_client: Arc::new(MailClient::new()),
redis_client: Arc::new(redis_client.clone()),
});
// Set up job registry
let mut registry = JobRegistry::new();
registry.register::<EmailJob>();
// Start background workers
tokio::spawn(async move {
let mut worker = WorkerBuilder::new_with_redis_instance(redis_client.clone(), registry)
.with_app_context(app_state.clone())
.with_queue_name(EmailJob::queue_name())
.spawn()
.await
.expect("Failed to initialize workers");
let _ = worker.start().await;
});
// Start web server
HttpServer::new(move || {
App::new()
.app_data(Data::new(app_state.clone()))
.service(your_routes)
})
.bind("127.0.0.1:8000")?
.run()
.await?;
Ok(())
}
use chainmq::{Queue, QueueOptions};
async fn enqueue_email_job(app_state: &AppState) -> chainmq::Result<()> {
let email_job = EmailJob {
to: "user@example.com".to_string(),
subject: "Welcome!".to_string(),
body: "Thank you for signing up!".to_string(),
};
let options = QueueOptions {
redis_instance: Some(app_state.redis_client.clone()),
..Default::default()
};
let queue = Queue::new(options).await?;
// Enqueue the job
match queue.enqueue(email_job).await {
Ok(_) => println!("Email job enqueued successfully"),
Err(error) => eprintln!("Failed to enqueue email job: {}", error),
}
Ok(())
}
This repo provides runnable examples. Build them all:
cargo build --examples
Run Redis first, then in separate terminals run workers/enqueuers:
# Single worker for emails queue
cargo run --example worker_main
# Enqueue email jobs (normal + delayed/high priority)
cargo run --example enqueue_email
# One worker handling multiple job types on a single queue
cargo run --example multi_jobs_single_worker
# Two workers on different queues (emails + reports)
cargo run --example multi_workers
# Failure and retry with backoff demonstration
cargo run --example failure_retry
# Delayed jobs demonstration
cargo run --example delayed_jobs
Notes:
redis://localhost:6379. Adjust to your setup.trait Job { async fn perform(&self, &JobContext) -> Result<()>; fn name() -> &str; fn queue_name() -> &str }ChainMQ works with any Redis instance:
// Local Redis
let redis_client = redis::Client::open("redis://127.0.0.1:6379/")?;
// Redis with authentication
let redis_client = redis::Client::open("redis://:password@127.0.0.1:6379/")?;
// Redis with database selection
let redis_client = redis::Client::open("redis://127.0.0.1:6379/1")?;
// Using Redis instance
let worker = WorkerBuilder::new_with_redis_instance(redis_client, registry)
.with_app_context(app_state)
.with_queue_name("priority_queue")
.with_concurrency(10) // Number of concurrent jobs
.with_poll_interval(Duration::from_secs(5)) // How often to check for jobs
.spawn()
.await?;
// Using Redis URI
let worker = WorkerBuilder::new_with_redis_uri("redis://127.0.0.1:6379/", registry)
.with_app_context(app_state)
.with_queue_name("background_tasks")
.with_concurrency(5)
.spawn()
.await?;
let options = QueueOptions {
name: "default".to_string(),
redis_url: "redis://127.0.0.1:6379".to_string(),
redis_instance: None,
key_prefix: "rbq".to_string(),
default_concurrency: 10,
max_stalled_interval: 30000, // 30 seconds
};
let queue = Queue::new(options).await?;
let job = EmailJob {
to: "user@example.com".into(),
subject: "Urgent".into(),
body: "Please read".into(),
};
let opts = JobOptions {
delay_secs: Some(60),
priority: Priority::High,
attempts: 5,
backoff: BackoffStrategy::Exponential { base: 2, cap: 10 },
timeout_secs: Some(60),
rate_limit_key: Some("i_rater".to_string()),
};
let job_id = queue.enqueue_with_options(job, opts).await?;
Inject your own services (database pools, HTTP clients, caches, etc.) via AppContext. The worker holds an Arc<dyn AppContext> and each job receives it through JobContext.
use chainmq::AppContext;
use std::sync::Arc;
#[derive(Clone)]
struct AppState {
pub database: sqlx::PgPool,
pub http_client: reqwest::Client,
pub cache: Arc<RedisCache>,
pub mail_client: Arc<MailClient>,
}
impl AppContext for AppState {
fn clone_context(&self) -> Arc<dyn AppContext> {
Arc::new(self.clone())
}
}
Use it inside jobs via the helper ctx.app::<T>():
#[async_trait]
impl Job for DatabaseJob {
async fn perform(&self, ctx: &JobContext) -> chainmq::Result<()> {
if let Some(app) = ctx.app::<AppState>() {
// Use database
let user = sqlx::query_as!(User, "SELECT * FROM users WHERE id = $1", self.user_id)
.fetch_one(&app.database)
.await?;
// Use HTTP client
let response = app.http_client
.get("https://api.example.com/data")
.send()
.await?;
// Use cache
app.cache.set(&format!("user:{}", user.id), &user).await?;
}
Ok(())
}
fn name() -> &'static str { "DatabaseJob" }
fn queue_name() -> &'static str { "database" }
}
Register multiple job types in a single registry:
let mut registry = JobRegistry::new();
registry.register::<EmailJob>();
registry.register::<ImageProcessingJob>();
registry.register::<ReportGenerationJob>();
registry.register::<CleanupJob>();
// Single worker can handle all job types
let worker = WorkerBuilder::new_with_redis_instance(redis_client, registry)
.with_queue_name("mixed_jobs")
.spawn()
.await?;
Schedule jobs for future execution:
use chainmq::JobOptions;
use std::time::Duration;
let delayed_job = EmailJob {
to: "user@example.com".to_string(),
subject: "Reminder".to_string(),
body: "Don't forget about your appointment!".to_string(),
};
let options = JobOptions {
delay_secs: Some(3600), // 1 hour delay
..Default::default()
};
queue.enqueue_with_options(delayed_job, options).await?;
Jobs that fail are automatically retried with configurable backoff:
#[async_trait]
impl Job for RiskyJob {
async fn perform(&self, ctx: &JobContext) -> chainmq::Result<()> {
// This job might fail and will be retried
if random::<f32>() < 0.3 {
return Err("Random failure".into());
}
println!("Job succeeded!");
Ok(())
}
fn name() -> &'static str { "RiskyJob" }
fn queue_name() -> &'static str { "risky" }
}
Inject your own services (DB pools, HTTP clients, caches, etc.) via AppContext. The worker holds an Arc<dyn AppContext> and each job receives it through JobContext.
Define your application state:
use chainmq::AppContext;
use std::sync::Arc;
#[derive(Clone)]
struct AppState {
db: sqlx::PgPool,
http: reqwest::Client,
}
impl AppContext for AppState {
fn clone_context(&self) -> Arc<dyn AppContext> { Arc::new(self.clone()) }
}
Pass it to the worker:
let app = Arc::new(AppState { db: pool, http: reqwest::Client::new() });
let mut worker = WorkerBuilder::new_with_redis_uri("redis://localhost:6379", registry)
.with_app_context(app)
.with_queue_name("default")
.spawn()
.await?;
Use it inside jobs via the helper ctx.app::<T>() (preferred) or explicit downcast:
#[async_trait]
impl Job for MyJob {
async fn perform(&self, ctx: &JobContext) -> Result<()> {
// Preferred typed helper
if let Some(app) = ctx.app::<AppState>() {
let row = sqlx::query!("select 1 as one").fetch_one(&app.db).await?;
let _ = app.http.get("https://example.com").send().await?;
println!("db one = {}", row.one);
}
// Or manual downcast if needed
// if let Some(app) = ctx.app_context.as_ref().as_any().downcast_ref::<AppState>() { /* ... */ }
Ok(())
}
fn name() -> &'static str { "MyJob" }
}
ChainMQ uses Lua scripts to ensure atomic operations:
move_delayed.lua: Moves due jobs from delayed sorted set to wait listclaim_job.lua: Atomically pops from wait list and adds to active listRedis keys use a configurable prefix (default rbq):
rbq:queue:{name}:wait - Jobs waiting to be processedrbq:queue:{name}:active - Jobs currently being processedrbq:queue:{name}:delayed - Jobs scheduled for future executionrbq:queue:{name}:failed - Jobs that have failed processingrbq:job:{id} - Individual job metadata and payloadJobs not being processed:
.with_queue_name() matches Job::queue_name()redis-cli LRANGE rbq:queue:{queue}:wait 0 -1Connection issues:
redis-cli pingJobs failing silently:
LRANGE rbq:queue:{queue}:failed 0 -1Performance issues:
.with_concurrency(n).with_poll_interval(duration)# Build the library
cargo build
# Run examples (requires Redis)
cargo run --example worker_main
MIT
Inspired by existing Redis-backed job queues; built for ergonomic, type-safe Rust applications.