Crates.io | workers |
lib.rs | workers |
version | 0.1.0 |
created_at | 2025-09-10 09:59:13.353661+00 |
updated_at | 2025-09-10 09:59:13.353661+00 |
description | A robust async PostgreSQL-backed background job processing system |
homepage | https://github.com/mre/workers |
repository | https://github.com/mre/workers |
max_upload_size | |
id | 1832319 |
size | 1,069 |
A robust async PostgreSQL-backed background job processing system.
This crate provides an async PostgreSQL-backed job queue system with support for:
The system consists of three main components:
BackgroundJob
trait - Define job types and their execution logicRunner
- High-level orchestrator that manages multiple queues and their worker poolsWorker
- Low-level executor that polls for and processes individual jobsRunner
is the entry point and orchestrator:
Worker
instances per queueWorker
is the actual job processor:
Jobs are stored in the background_jobs
PostgreSQL
table and processed asynchronously by worker instances that poll for available work in their assigned queues.
When a worker picks up a job from the database, the table row is immediately locked to prevent other workers from processing the same job concurrently. This ensures that:
Once job execution completes successfully, the row is deleted from the table. If the job fails, the row remains with updated retry information for future processing attempts.
CREATE TABLE background_jobs (
id BIGSERIAL PRIMARY KEY,
job_type TEXT NOT NULL,
data JSONB NOT NULL,
retries INTEGER NOT NULL DEFAULT 0,
last_retry TIMESTAMP NOT NULL DEFAULT NOW(),
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
priority SMALLINT NOT NULL DEFAULT 0
);
use workers::BackgroundJob;
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
struct SendEmailJob {
to: String,
subject: String,
body: String,
}
impl BackgroundJob for SendEmailJob {
const JOB_NAME: &'static str = "send_email";
const PRIORITY: i16 = 10;
const DEDUPLICATED: bool = false;
const QUEUE: &'static str = "emails";
type Context = AppContext;
async fn run(&self, ctx: Self::Context) -> anyhow::Result<()> {
// Job implementation
ctx.email_service.send(&self.to, &self.subject, &self.body).await?;
Ok(())
}
}
use workers::Runner;
use std::time::Duration;
let runner = Runner::new(connection_pool, app_context)
.register_job_type::<SendEmailJob>()
.configure_queue("emails", |queue| {
queue.num_workers(2).poll_interval(Duration::from_secs(5))
});
let handle = runner.start();
handle.wait_for_shutdown().await;
let job = SendEmailJob {
to: "user@example.com".to_string(),
subject: "Welcome!".to_string(),
body: "Thanks for signing up!".to_string(),
};
job.enqueue(&mut conn).await?;
JOB_NAME
: Unique identifier for the job typePRIORITY
: Execution priority (higher values = higher priority)DEDUPLICATED
: Whether to prevent duplicate jobs with identical dataQUEUE
: Queue name for job execution (defaults to "default")Failed jobs are automatically retried with exponential backoff. The retry count and last retry timestamp are tracked in the database. Jobs that continue to fail will eventually be abandoned after reaching the maximum retry limit.
All job execution is instrumented with tracing and optionally reported to Sentry for error monitoring.
This project uses TestContainers for integration testing, which automatically spins up PostgreSQL containers during test execution.
Simply run the tests - TestContainers handles the database setup automatically:
# Run all tests (PostgreSQL containers managed automatically)
make test
# Run tests with verbose output
make test-verbose
# Run all quality checks (format, lint, test)
make ci
The tests will automatically:
No manual database setup required!
If tests fail with "client error (Connect)" or similar Docker errors:
docker ps
docker system prune
to clean up resourcesThe implementation was originally developed as part of crates.io and was extracted from the separate
swirl
project, then re-integrated and heavily modified
to provide a robust, production-ready job queue system.