| Crates.io | runat |
| lib.rs | runat |
| version | 0.2.2 |
| created_at | 2025-12-26 23:23:55.586508+00 |
| updated_at | 2026-01-20 23:32:36.763592+00 |
| description | A distributed job scheduler for Rust |
| homepage | |
| repository | https://github.com/swervv/runat |
| max_upload_size | |
| id | 2006418 |
| size | 146,848 |
A distributed job scheduler for Rust with PostgreSQL backend support.
Add this to your Cargo.toml:
[dependencies]
runat = "0.2.2"
For PostgreSQL support (enabled by default):
[dependencies]
runat = { version = "0.2.2", features = ["postgres"] }
Optional features:
postgres - PostgreSQL backend (enabled by default)tracing - Tracing support for observabilityThe context allows you to pass shared state (database pools, config, HTTP clients, etc.) to your job handlers:
use sqlx::PgPool;
#[derive(Clone)]
pub struct AppContext {
pub db: PgPool,
pub api_key: String,
}
use runat::{BackgroundJob, Executable, JobResult};
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
// Define your job struct
#[derive(Debug, Clone, Serialize, Deserialize, BackgroundJob)]
pub struct SendEmailJob {
pub to: String,
pub subject: String,
pub body: String,
}
// Implement the job logic with your context type
#[async_trait]
impl Executable<AppContext> for SendEmailJob {
async fn execute(&mut self, ctx: &AppContext) -> JobResult<()> {
// Access your database, config, etc. through ctx
println!("Sending email to {}: {}", self.to, self.subject);
// Example: use ctx.db for database operations
// sqlx::query("INSERT INTO sent_emails ...").execute(&ctx.db).await?;
Ok(())
}
}
use runat::{IntoJob, JobQueue, JobQueueConfig, PostgresDatastore, JobResult};
use sqlx::postgres::PgPoolOptions;
use std::sync::Arc;
#[tokio::main]
async fn main() -> JobResult<()> {
// Connect to PostgreSQL
let pool = PgPoolOptions::new()
.max_connections(10)
.connect("postgres://user:pass@localhost/db")
.await?;
// Initialize datastore and run migrations
let datastore = PostgresDatastore::new(pool.clone()).await?;
datastore.migrate().await?;
// Create your application context
let ctx = AppContext {
db: pool,
api_key: "secret".to_string(),
};
// Create job queue with context
let queue = JobQueue::new(
Arc::new(datastore),
JobQueueConfig::default(),
ctx,
);
// Register job handlers before starting workers
queue.register::<SendEmailJob>()?;
// Enqueue a job
queue.enqueue(
SendEmailJob {
to: "user@example.com".to_string(),
subject: "Welcome!".to_string(),
body: "Thanks for signing up!".to_string(),
}.job()?
).await?;
// Start worker in background
let queue_clone = queue.clone();
tokio::spawn(async move {
queue_clone.start_worker().await
});
Ok(())
}
// Schedule a job to run every 10 seconds
queue.enqueue(
SendEmailJob {
to: "admin@example.com".to_string(),
subject: "Daily Report".to_string(),
body: "Here's your daily report".to_string(),
}
.job()?
.cron("*/10 * * * * * *")?
).await?;
Jobs receive a reference to the context during execution, giving access to shared resources:
use runat::{BackgroundJob, Executable, JobResult};
use serde::{Deserialize, Serialize};
use async_trait::async_trait;
#[derive(Debug, Clone, Serialize, Deserialize, BackgroundJob)]
pub struct ProcessPayment {
pub user_id: String,
pub amount: f64,
}
#[async_trait]
impl Executable<AppContext> for ProcessPayment {
async fn execute(&mut self, ctx: &AppContext) -> JobResult<()> {
// Use the database from context
sqlx::query("INSERT INTO payments (user_id, amount) VALUES ($1, $2)")
.bind(&self.user_id)
.bind(self.amount)
.execute(&ctx.db)
.await?;
// Use other context fields
println!("Using API key: {}", ctx.api_key);
Ok(())
}
// Optional: pre/post execution hooks also receive context
async fn pre_execute(&mut self, ctx: &AppContext) {
println!("About to process payment using db pool");
}
async fn post_execute(&mut self, ctx: &AppContext, result: JobResult<()>) -> JobResult<()> {
if result.is_err() {
// Log failure to database
}
result
}
}
Important: You must register job handlers with the queue before workers can process them.
// Register all job types your workers will process
queue.register::<SendEmailJob>()?;
queue.register::<ProcessPayment>()?;
// Then start your workers
queue.start_worker().await?;
If a worker encounters a job type that hasn't been registered, it will fail the job with:
No handler registered for job type: SendEmailJob. Call queue.register::<T>() before starting workers.
Option 1: Run worker directly from the queue (recommended)
let queue_clone = queue.clone();
tokio::spawn(async move {
queue_clone.start_worker().await
});
Option 2: Create a worker instance
let worker = queue.worker();
tokio::spawn(async move {
worker.run().await
});
use runat::Retry;
use chrono::Duration;
// Create a job with retry on failure
queue.enqueue(
SendEmailJob {
to: "user@example.com".to_string(),
subject: "Welcome!".to_string(),
body: "Thanks for signing up!".to_string(),
}
.job()?
.set_max_attempts(3)
.retry(Retry::fixed(Duration::seconds(30)))
).await?;
If your jobs don't need shared state, use () as the context type:
#[derive(Debug, Clone, Serialize, Deserialize, BackgroundJob)]
pub struct SimpleJob {
pub message: String,
}
#[async_trait]
impl Executable<()> for SimpleJob {
async fn execute(&mut self, _ctx: &()) -> JobResult<()> {
println!("{}", self.message);
Ok(())
}
}
// Create queue with unit context
let queue = JobQueue::with_datastore(Arc::new(datastore), ());
cargo test