| Crates.io | apalis-postgres |
| lib.rs | apalis-postgres |
| version | 1.0.0-rc.2 |
| created_at | 2025-10-25 17:05:58.527765+00 |
| updated_at | 2026-01-10 08:57:10.326774+00 |
| description | Background task processing for rust using apalis and postgres |
| homepage | https://github.com/apalis-dev/apalis-postgres |
| repository | https://github.com/apalis-dev/apalis-postgres |
| max_upload_size | |
| id | 1900433 |
| size | 285,327 |
Background task processing in rust using apalis and postgres
trigger based storages.apalis workers and middleware.PostgresStorage]: Standard polling-based storage.PostgresStorageWithListener]: Event-driven storage using Postgres NOTIFY for low-latency job fetching.SharedPostgresStorage]: Shared storage for multiple job types, uses Postgres NOTIFY.The naming is designed to clearly indicate the storage mechanism and its capabilities, but under the hood the result is the PostgresStorage struct with different configurations.
use std::time::Duration;
use apalis::prelude::*;
use apalis_postgres::*;
use futures::stream::{self, StreamExt};
#[tokio::main]
async fn main() {
let pool = PgPool::connect(env!("DATABASE_URL")).await.unwrap();
PostgresStorage::setup(&pool).await.unwrap();
let mut backend = PostgresStorage::new(&pool);
let mut start = 0usize;
let mut items = stream::repeat_with(move || {
start += 1;
let task = Task::builder(start)
.run_after(Duration::from_secs(1))
.with_ctx(PgContext::new().with_priority(1))
.build();
task
})
.take(10);
backend.push_all(&mut items).await.unwrap();
async fn send_reminder(item: usize, wrk: WorkerContext) -> Result<(), BoxDynError> {
Ok(())
}
let worker = WorkerBuilder::new("worker-1")
.backend(backend)
.build(send_reminder);
worker.run().await.unwrap();
}
NOTIFY listener exampleuse std::time::Duration;
use apalis::prelude::*;
use apalis_postgres::*;
use futures::stream::{self, StreamExt};
#[tokio::main]
async fn main() {
let pool = PgPool::connect(env!("DATABASE_URL")).await.unwrap();
PostgresStorage::setup(&pool).await.unwrap();
let lazy_strategy = StrategyBuilder::new()
.apply(IntervalStrategy::new(Duration::from_secs(5)))
.build();
let config = Config::new("queue")
.with_poll_interval(lazy_strategy)
.set_buffer_size(5);
let backend = PostgresStorage::new_with_notify(&pool, &config);
tokio::spawn({
let pool = pool.clone();
let config = config.clone();
async move {
tokio::time::sleep(Duration::from_secs(2)).await;
let mut start = 0;
let items = stream::repeat_with(move || {
start += 1;
// Construct compact task
Task::builder(serde_json::to_vec(&start).unwrap())
.with_ctx(PgContext::new().with_priority(start))
.build()
})
.take(20)
.collect::<Vec<_>>()
.await;
// You can still use backend.push
// This example shows how to do it with just a pool
apalis_postgres::sink::push_tasks(&pool, config, items).await.unwrap();
}
});
async fn send_reminder(item: usize, wrk: WorkerContext) -> Result<(), BoxDynError> {
Ok(())
}
let worker = WorkerBuilder::new("worker-2")
.backend(backend)
.build(send_reminder);
worker.run().await.unwrap();
}
use std::time::Duration;
use apalis::prelude::*;
use apalis_postgres::*;
use apalis_workflow::*;
use futures::stream::{self, StreamExt};
#[tokio::main]
async fn main() {
let workflow = Workflow::new("odd-numbers-workflow")
.and_then(|a: usize| async move {
Ok::<_, BoxDynError>((0..=a).collect::<Vec<_>>())
})
.filter_map(|x| async move {
if x % 2 != 0 { Some(x) } else { None }
})
.filter_map(|x| async move {
if x % 3 != 0 { Some(x) } else { None }
})
.filter_map(|x| async move {
if x % 5 != 0 { Some(x) } else { None }
})
.delay_for(Duration::from_millis(1000))
.and_then(|a: Vec<usize>| async move {
println!("Sum: {}", a.iter().sum::<usize>());
Ok::<(), BoxDynError>(())
});
let pool = PgPool::connect(env!("DATABASE_URL")).await.unwrap();
PostgresStorage::setup(&pool).await.unwrap();
let mut backend = PostgresStorage::new_with_config(&pool, &Config::new("test-workflow"));
backend.push_start(100usize).await.unwrap();
let worker = WorkerBuilder::new("rango-tango")
.backend(backend)
.on_event(|ctx, ev| {
println!("On Event = {:?}", ev);
if matches!(ev, Event::Error(_)) {
ctx.stop().unwrap();
}
})
.build(workflow);
worker.run().await.unwrap();
}
This shows an example of multiple backends using the same connection. This can improve performance if you have many types of jobs.
use std::{collections::HashMap, time::Duration};
use apalis::prelude::*;
use apalis_postgres::{shared::SharedPostgresStorage, *};
use futures::stream;
#[tokio::main]
async fn main() {
let pool = PgPool::connect(&std::env::var("DATABASE_URL").unwrap())
.await
.unwrap();
PostgresStorage::setup(&pool).await.unwrap();
let mut store = SharedPostgresStorage::new(pool);
let mut map_store = store.make_shared().unwrap();
let mut int_store = store.make_shared().unwrap();
map_store
.push_stream(&mut stream::iter(vec![HashMap::<String, String>::new()]))
.await
.unwrap();
int_store.push(99).await.unwrap();
async fn send_reminder<T, I>(
_: T,
_task_id: TaskId<I>,
wrk: WorkerContext,
) -> Result<(), BoxDynError> {
tokio::time::sleep(Duration::from_secs(2)).await;
wrk.stop().unwrap();
Ok(())
}
let int_worker = WorkerBuilder::new("rango-tango-2")
.backend(int_store)
.build(send_reminder);
let map_worker = WorkerBuilder::new("rango-tango-1")
.backend(map_store)
.build(send_reminder);
tokio::try_join!(int_worker.run(), map_worker.run()).unwrap();
}
Track your jobs using apalis-board.

Licensed under either of Apache License, Version 2.0 or MIT license at your option.