Crates.io | underway |
lib.rs | underway |
version | |
source | src |
created_at | 2024-09-21 15:03:33.193081 |
updated_at | 2024-10-27 17:40:56.637516 |
description | ⏳ Durable step functions via Postgres |
homepage | |
repository | https://github.com/maxcountryman/underway |
max_upload_size | |
id | 1382287 |
Cargo.toml error: | TOML parse error at line 17, column 1 | 17 | autolib = false | ^^^^^^^ unknown field `autolib`, expected one of `name`, `version`, `edition`, `authors`, `description`, `readme`, `license`, `repository`, `homepage`, `documentation`, `build`, `resolver`, `links`, `default-run`, `default_dash_run`, `rust-version`, `rust_dash_version`, `rust_version`, `license-file`, `license_dash_file`, `license_file`, `licenseFile`, `license_capital_file`, `forced-target`, `forced_dash_target`, `autobins`, `autotests`, `autoexamples`, `autobenches`, `publish`, `metadata`, `keywords`, `categories`, `exclude`, `include` |
size | 0 |
⏳ Durable step functions via Postgres.
Underway provides durable background jobs over Postgres. Jobs are composed of a sequence of one more steps. Each step takes the output of the previous step as its input. These simple workflows provide a powerful interface to common deferred work use cases.
Key Features:
FOR UPDATE SKIP LOCKED
for reliable task storage and coordination.Underway is suitable for many different use cases, ranging from simple single-step jobs to more sophisticated multi-step jobs, where dependencies are built up between steps.
A common use case is deferring work that can be processed later. For instance, during user registration, we might want to send a welcome email to new users. Rather than handling this within the registration process (e.g., form validation, database insertion), we can offload it to run "out-of-band" using Underway. By defining a job for sending the welcome email, Underway ensures it gets processed in the background, without slowing down the user registration flow.
use std::env;
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use underway::{Job, To};
// This is the input we'll provide to the job when we enqueue it.
#[derive(Deserialize, Serialize)]
struct WelcomeEmail {
user_id: i32,
email: String,
name: String,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Set up the database connection pool.
let database_url = &env::var("DATABASE_URL").expect("DATABASE_URL should be set");
let pool = PgPool::connect(database_url).await?;
// Run migrations.
underway::MIGRATOR.run(&pool).await?;
// Build the job.
let job = Job::builder()
.step(
|_cx,
WelcomeEmail {
user_id,
email,
name,
}| async move {
// Simulate sending an email.
println!("Sending welcome email to {name} <{email}> (user_id: {user_id})");
// Returning this indicates this is the final step.
To::done()
},
)
.name("welcome-email")
.pool(pool)
.build()
.await?;
// Here we enqueue a new job to be processed later.
job.enqueue(WelcomeEmail {
user_id: 42,
email: "ferris@example.com".to_string(),
name: "Ferris".to_string(),
})
.await?;
// Start processing enqueued jobs.
job.start().await??;
Ok(())
}
Another common use case is defining dependencies between discrete steps of a job. For instance, we might generate PDF receipts for orders and then email these to customers. With Underway, each step is handled separately, making it easy to create a job that first generates the PDF and, once completed, proceeds to send the email.
This separation provides significant value: if the email sending service is temporarily unavailable, we can retry the email step without having to regenerate the PDF, avoiding unnecessary repeated work.
use std::env;
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use underway::{Job, To};
#[derive(Deserialize, Serialize)]
struct GenerateReceipt {
// An order we want to generate a receipt for.
order_id: i32,
}
#[derive(Deserialize, Serialize)]
struct EmailReceipt {
// An object store key to our receipt PDF.
receipt_key: String,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Set up the database connection pool.
let database_url = &env::var("DATABASE_URL").expect("DATABASE_URL should be set");
let pool = PgPool::connect(database_url).await?;
// Run migrations.
underway::MIGRATOR.run(&pool).await?;
// Build the job.
let job = Job::builder()
.step(|_cx, GenerateReceipt { order_id }| async move {
// Use the order ID to build a receipt PDF...
let receipt_key = format!("receipts_bucket/{order_id}-receipt.pdf");
// ...store the PDF in an object store.
// We proceed to the next step with the receipt_key as its input.
To::next(EmailReceipt { receipt_key })
})
.step(|_cx, EmailReceipt { receipt_key }| async move {
// Retrieve the PDF from the object store, and send the email.
println!("Emailing receipt for {receipt_key}");
To::done()
})
.name("order-receipt")
.pool(pool)
.build()
.await?;
// Enqueue the job for the given order.
job.enqueue(GenerateReceipt { order_id: 42 }).await?;
// Start processing enqueued jobs.
job.start().await??;
Ok(())
}
With this setup, if the email service is down, the EmailReceipt
step can
be retried without redoing the PDF generation, saving time and resources by
not repeating the expensive step of generating the PDF.
Jobs may also be run on a schedule. This makes them useful for situations where we want to do things on a regular cadence, such as creating a daily business report.
use std::env;
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use underway::{Job, To};
#[derive(Deserialize, Serialize)]
struct DailyReport;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Set up the database connection pool.
let database_url = &env::var("DATABASE_URL").expect("DATABASE_URL should be set");
let pool = PgPool::connect(database_url).await?;
// Run migrations.
underway::MIGRATOR.run(&pool).await?;
// Build the job.
let job = Job::builder()
.step(|_cx, _| async move {
// Here we would generate and store the report.
To::done()
})
.name("daily-report")
.pool(pool)
.build()
.await?;
// Set a daily schedule with the given input.
let daily = "@daily[America/Los_Angeles]".parse()?;
job.schedule(daily, DailyReport).await?;
// Start processing enqueued jobs.
job.start().await??;
Ok(())
}
We've put together a number of examples to help get you started. You're also welcome to open a discussion and ask additional questions you might have.
We appreciate all kinds of contributions, thank you!