pgboss

Crates.iopgboss
lib.rspgboss
version0.1.0-rc5
created_at2024-10-03 20:07:36.612993+00
updated_at2025-12-17 18:27:03.74896+00
descriptionRust implementation of PgBoss job queueing service
homepage
repositoryhttps://github.com/rustworthy/pgboss-rs.git
max_upload_size
id1395635
size193,513
Pavieł Michalkievič (rustworthy)

documentation

README

pgboss-rs

Queue jobs with Rust and PostgreSQL like a boss

Crates.io Documentation Codecov dependency status

Inspired by, compatible with and partially ported from pg-boss Node.js package.

Heavily influenced by decisions and approaches in faktory-rs crate.

use std::time::Duration;
use serde_json::json;
use pgboss::{Client, Job, JobState, Queue};

// Create a client first.
let client = Client::builder()
    .schema("desired_schema_name")
    .connect()
    .await
    .unwrap();

// Then create a dlq (optional) and a queue.
c.create_standard_queue("image_processing_dlq").await.unwrap();

// NB! queue should be created before pushing jobs
let queue = Queue::builder()
    .name("image_processing")
    .dead_letter("image_processing_dlq")
    .partition(true)
    .build();

c.create_queue(&queue).await.unwrap();

// Build a job ...
let job = Job::builder()
    .queue_name("image_processing")                // which queue this job should be sent to
    .data(json!({"image_url": "https://..."}))     // arbitrary json, your job's payload
    .priority(10)                                  // will be consumer prior to those with lower priorities
    .retry_limit(1)                                // only retry this job once
    .retry_delay(Duration::from_secs(60 * 5))      // do not retry immediately after failure
    .expire_in(Duration::from_secs(60 * 5))        // only give the worker 5 minutes to complete the job
    .retain_for(Duration::from_secs(60 * 60 * 24)) // do not archive for at least 1 day
    .delay_for(Duration::from_secs(5))             // make it visible to consumers after 5 seconds
    .singleton_for(Duration::from_secs(7))         // only allow one job for at least 7 seconds
    .singleton_key("buzz")                         // allow more than one job if their key is different from this
    .build();

// ... and enqueue it.
let _id = c.send_job(&job).await.expect("no error");

// Consume from the queue.
let fetched_job = c
     .fetch_job("image_processing")
     .await
     .expect("fetched  w/o errors")
     .expect("some job");

assert_eq!(fetched_job.data, job.data);
assert_eq!(fetched_job.state, JobState::Active);

// report on the processing results
c
  .complete_job("image_processing", fetched_job.id, json!({"result": "success"}))
  .await
  .unwrap();
Commit count: 0

cargo fmt