pgqrs

Crates.iopgqrs
lib.rspgqrs
version0.13.0
created_at2025-10-04 12:32:20.744463+00
updated_at2026-01-17 08:21:21.368159+00
descriptionA high-performance PostgreSQL-backed job queue for Rust applications
homepagehttps://github.com/vrajat/pgqrs
repositoryhttps://github.com/vrajat/pgqrs
max_upload_size
id1867869
size779,197
Rajat Venkatesh (vrajat)

documentation

README

pgqrs

A PostgreSQL, SQLite, and Turso-backed job queue for Rust applications.

Features

  • Lightweight: No servers to operate. Directly use pgqrs as a library in your Rust applications.
  • Multi-Backend Support: Choose between PostgreSQL for production, or SQLite/Turso for embedded use cases.
  • Compatible with Connection Poolers: Use with pgBouncer or pgcat to scale connections (PostgreSQL).
  • Efficient: Uses PostgreSQL's SKIP LOCKED for concurrent job fetching.
  • Exactly Once Delivery: Guarantees exactly-once delivery within a time range specified by time limit.
  • Message Archiving: Built-in archiving system for audit trails and historical data retention.

Example

Producer

use pgqrs::Producer;
use serde_json::Value;

/// Enqueue a payload to the queue
async fn enqueue_job(producer: &Producer, payload: Value) -> Result<i64, Box<dyn std::error::Error>> {
	let message = producer.enqueue(&payload).await?;
	Ok(message.id)
}

Consumer

use pgqrs::{Consumer, WorkerInfo};
use std::time::Duration;

/// Poll for jobs from the queue and print them as they arrive
async fn poll_and_print_jobs(consumer: &Consumer, worker: &WorkerInfo) -> Result<(), Box<dyn std::error::Error>> {
	loop {
		let messages = consumer.dequeue(worker).await?;
		if messages.is_empty() {
			// No job found, wait before polling again
			tokio::time::sleep(Duration::from_secs(2)).await;
		} else {
			for message in messages {
				println!("Dequeued job: {}", message.payload);
				// Optionally archive or delete the message after processing
				consumer.archive(message.id).await?;
			}
		}
	}
}

Quickstart

Install the binary

cargo install pgqrs

Start a Postgres DB or get the DSN of an existing db.

You'll need a PostgreSQL database to use pgqrs. Here are your options:

Option 1: Using Docker (Recommended for development)

# Start a PostgreSQL container
docker run --name pgqrs-postgres -e POSTGRES_PASSWORD=postgres -p 5432:5432 -d postgres:15

# Your DSN will be:
# postgresql://postgres:postgres@localhost:5432/postgres

Option 2: Using an existing PostgreSQL database

Get your database connection string (DSN) in this format:

postgresql://username:password@hostname:port/database

Option 3: Using a cloud PostgreSQL service

  • AWS RDS: Get the connection string from the RDS console
  • Google Cloud SQL: Get the connection string from the Cloud Console
  • Azure Database: Get the connection string from the Azure portal
  • Heroku Postgres: Use the DATABASE_URL from your Heroku config

Configure pgqrs

Set your database connection using one of these methods (in order of priority):

# Method 1: Command line argument (highest priority)
pgqrs --dsn "postgresql://postgres:postgres@localhost:5432/postgres"

# Method 2: Environment variable
export PGQRS_DSN="postgresql://postgres:postgres@localhost:5432/postgres"
pgqrs ...

Create a pgqrs.yaml file:

dsn: "postgresql://postgres:postgres@localhost:5432/postgres"

Then run:

# Method 3: Use a yaml config file.
pgqrs ...

Install the pgqrs schema

pgqrs requires a few tables to store metadata. It creates these tables as well as queue tables in the specified schema.

Important: You must create the schema before running pgqrs install.

Step 1: Create the schema

Connect to your PostgreSQL database and create the schema:

-- For default 'public' schema (no action needed)
-- For custom schema:
CREATE SCHEMA IF NOT EXISTS pgqrs;

Step 2: Install pgqrs

Once you have your database configured and schema created, install the pgqrs schema:

# Install in default 'public' schema
pgqrs install

# Install in custom schema
pgqrs --schema pgqrs install

# Verify the installation
pgqrs verify
# Or verify custom schema
pgqrs --schema pgqrs verify

Test queue commands from the CLI

Items can be enqueued or dequeued using the CLI. This option is only available for testing or experiments.

# Create a test queue
pgqrs queue create test_queue

# Send a message to the queue
pgqrs message send test_queue '{"message": "Hello, World!", "timestamp": "2023-01-01T00:00:00Z"}'

# Send a delayed message (available after 30 seconds)
pgqrs message send test_queue '{"task": "delayed_task"}' --delay 30

# Read and immediately consume one message
pgqrs message dequeue test_queue

# Delete a specific message by ID
pgqrs message delete test_queue 12345

License

Licensed under either of:

  • Apache License, Version 2.0
  • MIT license

at your option.

Commit count: 124

cargo fmt