| Crates.io | pgqrs-macros |
| lib.rs | pgqrs-macros |
| version | 0.13.0 |
| created_at | 2026-01-05 17:08:17.442366+00 |
| updated_at | 2026-01-17 08:19:28.252843+00 |
| description | Procedural macro support crate for pgqrs |
| homepage | https://github.com/vrajat/pgqrs |
| repository | https://github.com/vrajat/pgqrs |
| max_upload_size | |
| id | 2024256 |
| size | 20,807 |
[!CAUTION] Internal Crate: This crate (
pgqrs-macros) is a support crate containing procedural macros for the mainpgqrslibrary. You should likely use thepgqrscrate directly, which re-exports these macros.See the pgqrs crate for the main documentation.
pgqrs is a PostgreSQL-backed durable workflow engine and job queue. Written in Rust with Python bindings.
SKIP LOCKED for concurrent job fetching.Simple, reliable message queue for background processing:
use pgqrs;
use serde_json::json;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Connect to PostgreSQL
let store = pgqrs::connect("postgresql://localhost/mydb").await?;
// Setup (run once)
pgqrs::admin(&store).install().await?;
pgqrs::admin(&store).create_queue("tasks").await?;
// Producer: enqueue a job
let ids = pgqrs::enqueue()
.message(&json!({"task": "send_email", "to": "user@example.com"}))
.to("tasks")
.execute(&store)
.await?;
println!("Enqueued: {:?}", ids);
// Consumer: process jobs
pgqrs::dequeue()
.from("tasks")
.handle(|msg| async move {
println!("Processing: {:?}", msg.payload);
// Your processing logic here
Ok(())
})
.execute(&store)
.await?;
Ok(())
}
import pgqrs
import asyncio
async def main():
# Connect to PostgreSQL
store = await pgqrs.connect("postgresql://localhost/mydb")
# Setup (run once)
admin = pgqrs.admin(store)
await admin.install()
await admin.create_queue("tasks")
# Producer: enqueue a job
msg_id = await pgqrs.produce(store, "tasks", {
"task": "send_email",
"to": "user@example.com"
})
print(f"Enqueued: {msg_id}")
# Consumer: process jobs
async def handler(msg):
print(f"Processing: {msg.payload}")
return True
await pgqrs.consume(store, "tasks", handler)
asyncio.run(main())
Orchestrate multi-step processes that survive crashes:
use pgqrs;
use pgqrs_macros::{pgqrs_workflow, pgqrs_step};
#[pgqrs_step]
async fn fetch_data(ctx: &pgqrs::Workflow, url: &str) -> Result<String, anyhow::Error> {
Ok(reqwest::get(url).await?.text().await?)
}
#[pgqrs_step]
async fn process_data(ctx: &pgqrs::Workflow, data: String) -> Result<i32, anyhow::Error> {
Ok(data.lines().count() as i32)
}
#[pgqrs_workflow]
async fn data_pipeline(ctx: &pgqrs::Workflow, url: &str) -> Result<String, anyhow::Error> {
let data = fetch_data(ctx, url).await?;
let count = process_data(ctx, data).await?;
Ok(format!("Processed {} lines", count))
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let store = pgqrs::connect("postgresql://localhost/mydb").await?;
pgqrs::admin(&store).install().await?;
let url = "https://example.com/data.txt";
let workflow = pgqrs::admin(&store)
.create_workflow("data_pipeline", &url)
.await?;
let result = data_pipeline(&workflow, url).await?;
println!("Result: {}", result);
Ok(())
}
import pgqrs
from pgqrs.decorators import workflow, step
@step
async def fetch_data(ctx, url: str) -> dict:
# Fetch data from API
return {"lines": 100, "data": "..."}
@step
async def process_data(ctx, data: dict) -> dict:
return {"processed": True, "count": data["lines"]}
@workflow
async def data_pipeline(ctx, url: str):
data = await fetch_data(ctx, url)
result = await process_data(ctx, data)
return result
async def main():
store = await pgqrs.connect("postgresql://localhost/mydb")
admin = pgqrs.admin(store)
await admin.install()
url = "https://example.com/data"
ctx = await admin.create_workflow("data_pipeline", url)
result = await data_pipeline(ctx, url)
print(f"Result: {result}")
import asyncio
asyncio.run(main())
pip install pgqrs
[dependencies]
pgqrs = "0.5"
pgqrs-macros = "0.5" # For workflow macros
Prerequisites:
# Setup environment and install dependencies
make requirements
# Build both Rust core and Python bindings
make build
# Run all tests (Rust + Python)
make test