| Crates.io | graphile_worker |
| lib.rs | graphile_worker |
| version | 0.8.6 |
| created_at | 2024-01-31 17:31:36.752196+00 |
| updated_at | 2025-09-19 09:20:04.371424+00 |
| description | High performance Rust/PostgreSQL job queue (also suitable for getting jobs generated by PostgreSQL triggers/functions out into a different work queue) |
| homepage | https://docs.rs/graphile_worker |
| repository | https://github.com/leo91000/graphile_worker |
| max_upload_size | |
| id | 1121996 |
| size | 390,692 |
A powerful PostgreSQL-backed job queue for Rust applications, based on Graphile Worker. This is a complete Rust rewrite that offers excellent performance, reliability, and a convenient API.
Graphile Worker RS allows you to run jobs (such as sending emails, performing calculations, generating PDFs) in the background, so your HTTP responses and application code remain fast and responsive. It's ideal for any PostgreSQL-backed Rust application.
Key highlights:
SKIP LOCKED for efficient job fetchingLISTEN/NOTIFYThis port is mostly compatible with the original Graphile Worker, meaning you can run it side by side with the Node.js version. The key differences are:
Add the library to your project:
cargo add graphile_worker
A task consists of a struct that implements the TaskHandler trait. Each task has:
Serialize/Deserialize for the payloadrun method that contains the task's logicuse serde::{Deserialize, Serialize};
use graphile_worker::{WorkerContext, TaskHandler, IntoTaskHandlerResult};
#[derive(Deserialize, Serialize)]
struct SendEmail {
to: String,
subject: String,
body: String,
}
impl TaskHandler for SendEmail {
const IDENTIFIER: &'static str = "send_email";
async fn run(self, _ctx: WorkerContext) -> impl IntoTaskHandlerResult {
println!("Sending email to {} with subject '{}'", self.to, self.subject);
// Email sending logic would go here
Ok::<(), String>(())
}
}
Set up the worker with your configuration options and run it:
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create a PostgreSQL connection pool
let pg_pool = sqlx::postgres::PgPoolOptions::new()
.max_connections(5)
.connect("postgres://postgres:password@localhost/mydb")
.await?;
// Initialize and run the worker
graphile_worker::WorkerOptions::default()
.concurrency(5) // Process up to 5 jobs concurrently
.schema("graphile_worker") // Use this PostgreSQL schema
.define_job::<SendEmail>() // Register the task handler
.pg_pool(pg_pool) // Provide the database connection
.init() // Initialize the worker
.await?
.run() // Start processing jobs
.await?;
Ok(())
}
Connect to your database and run the following SQL:
SELECT graphile_worker.add_job(
'send_email',
json_build_object(
'to', 'user@example.com',
'subject', 'Welcome to our app!',
'body', 'Thanks for signing up.'
)
);
// Get a WorkerUtils instance to manage jobs
let utils = worker.create_utils();
// Type-safe method (recommended):
utils.add_job(
SendEmail {
to: "user@example.com".to_string(),
subject: "Welcome to our app!".to_string(),
body: "Thanks for signing up.".to_string(),
},
Default::default(), // Use default job options
).await?;
// Or use the raw method when type isn't available:
utils.add_raw_job(
"send_email",
serde_json::json!({
"to": "user@example.com",
"subject": "Welcome to our app!",
"body": "Thanks for signing up."
}),
Default::default(),
).await?;
You can provide shared state to all your tasks using extensions:
use serde::{Deserialize, Serialize};
use graphile_worker::{WorkerContext, TaskHandler, IntoTaskHandlerResult};
use std::sync::{Arc, atomic::{AtomicUsize, Ordering::SeqCst}};
// Define your shared state
#[derive(Clone, Debug)]
struct AppState {
db_client: Arc<DatabaseClient>,
api_key: String,
counter: Arc<AtomicUsize>,
}
// Example database client (just for demonstration)
struct DatabaseClient;
impl DatabaseClient {
fn new() -> Self { Self }
async fn find_user(&self, _user_id: &str) -> Result<(), String> { Ok(()) }
}
#[derive(Deserialize, Serialize)]
struct ProcessUserTask {
user_id: String,
}
impl TaskHandler for ProcessUserTask {
const IDENTIFIER: &'static str = "process_user";
async fn run(self, ctx: WorkerContext) -> impl IntoTaskHandlerResult {
// Access the shared state in your task
let app_state = ctx.get_ext::<AppState>().unwrap();
let count = app_state.counter.fetch_add(1, SeqCst);
// Use shared resources
app_state.db_client.find_user(&self.user_id).await?;
println!("Processed user {}, task count: {}", self.user_id, count);
Ok::<(), String>(())
}
}
// Add the extension when configuring the worker
let app_state = AppState {
db_client: Arc::new(DatabaseClient::new()),
api_key: "secret_key".to_string(),
counter: Arc::new(AtomicUsize::new(0)),
};
graphile_worker::WorkerOptions::default()
.add_extension(app_state)
.define_job::<ProcessUserTask>()
// ... other configuration
.init()
.await?;
You can customize how and when jobs run with the JobSpec builder:
use graphile_worker::{JobSpecBuilder, JobKeyMode};
use chrono::Utc;
// Schedule a job to run after 5 minutes with high priority
let job_spec = JobSpecBuilder::new()
.run_at(Utc::now() + chrono::Duration::minutes(5))
.priority(10)
.job_key("welcome_email_user_123") // Unique identifier for deduplication
.job_key_mode(JobKeyMode::Replace) // Replace existing jobs with this key
.max_attempts(5) // Max retry attempts (default is 25)
.build();
utils.add_job(SendEmail { /* ... */ }, job_spec).await?;
Jobs with the same queue name run in series (one after another) rather than in parallel:
// These jobs will run one after another, not concurrently
let spec1 = JobSpecBuilder::new()
.queue_name("user_123_operations")
.build();
let spec2 = JobSpecBuilder::new()
.queue_name("user_123_operations")
.build();
utils.add_job(UpdateProfile { /* ... */ }, spec1).await?;
utils.add_job(SendEmail { /* ... */ }, spec2).await?;
You can schedule recurring jobs using crontab syntax:
// Run a task daily at 8:00 AM
let worker = WorkerOptions::default()
.with_crontab("0 8 * * * send_daily_report")?
.define_job::<SendDailyReport>()
// ... other configuration
.init()
.await?;
The WorkerUtils class provides methods for managing jobs:
// Get a WorkerUtils instance
let utils = worker.create_utils();
// Remove a job by its key
utils.remove_job("job_key_123").await?;
// Mark jobs as completed
utils.complete_jobs(&[job_id1, job_id2]).await?;
// Permanently fail jobs with a reason
utils.permanently_fail_jobs(&[job_id3, job_id4], "Invalid data").await?;
// Reschedule jobs
let options = RescheduleJobOptions {
run_at: Some(Utc::now() + chrono::Duration::minutes(60)),
priority: Some(5),
max_attempts: Some(3),
..Default::default()
};
utils.reschedule_jobs(&[job_id5, job_id6], options).await?;
// Run database cleanup tasks
utils.cleanup(&[
CleanupTask::DeletePermenantlyFailedJobs,
CleanupTask::GcTaskIdentifiers,
CleanupTask::GcJobQueues,
]).await?;
LISTEN/NOTIFY for immediate job notificationsSKIP LOCKED for efficient job fetchingrun_atjob_keygenerated always as (expression) featureProduction ready but the API may continue to evolve. If you encounter any issues or have feature requests, please open an issue on GitHub.
This library is a Rust port of the excellent Graphile Worker by Benjie Gillam. If you find this library useful, please consider sponsoring Benjie's work, as all the research and architecture design was done by him.
MIT License - See LICENSE.md