runat

Crates.iorunat
lib.rsrunat
version0.2.2
created_at2025-12-26 23:23:55.586508+00
updated_at2026-01-20 23:32:36.763592+00
descriptionA distributed job scheduler for Rust
homepage
repositoryhttps://github.com/swervv/runat
max_upload_size
id2006418
size146,848
Swervv (swervv)

documentation

README

RunAt

A distributed job scheduler for Rust with PostgreSQL backend support.

Features

  • Distributed Job Scheduling: Run jobs across multiple workers with PostgreSQL-backed coordination
  • Application Context: Pass database connections, config, and other state to job handlers
  • Cron Support: Schedule recurring jobs using cron expressions
  • Failed Jobs Queue: Failed jobs are moved to a separate queue
  • Retry Mechanisms: Built-in exponential backoff and custom retry strategies
  • Type-Safe Jobs: Leverage Rust's type system for job definitions
  • Async/Await: Built on Tokio for efficient async job execution

Installation

Add this to your Cargo.toml:

[dependencies]
runat = "0.2.2"

For PostgreSQL support (enabled by default):

[dependencies]
runat = { version = "0.2.2", features = ["postgres"] }

Optional features:

  • postgres - PostgreSQL backend (enabled by default)
  • tracing - Tracing support for observability

Quick Start

Define Your Application Context

The context allows you to pass shared state (database pools, config, HTTP clients, etc.) to your job handlers:

use sqlx::PgPool;

#[derive(Clone)]
pub struct AppContext {
    pub db: PgPool,
    pub api_key: String,
}

Define a Job

use runat::{BackgroundJob, Executable, JobResult};
use async_trait::async_trait;
use serde::{Deserialize, Serialize};

// Define your job struct
#[derive(Debug, Clone, Serialize, Deserialize, BackgroundJob)]
pub struct SendEmailJob {
    pub to: String,
    pub subject: String,
    pub body: String,
}

// Implement the job logic with your context type
#[async_trait]
impl Executable<AppContext> for SendEmailJob {
    async fn execute(&mut self, ctx: &AppContext) -> JobResult<()> {
        // Access your database, config, etc. through ctx
        println!("Sending email to {}: {}", self.to, self.subject);

        // Example: use ctx.db for database operations
        // sqlx::query("INSERT INTO sent_emails ...").execute(&ctx.db).await?;

        Ok(())
    }
}

Create the Queue and Run Workers

use runat::{IntoJob, JobQueue, JobQueueConfig, PostgresDatastore, JobResult};
use sqlx::postgres::PgPoolOptions;
use std::sync::Arc;

#[tokio::main]
async fn main() -> JobResult<()> {
    // Connect to PostgreSQL
    let pool = PgPoolOptions::new()
        .max_connections(10)
        .connect("postgres://user:pass@localhost/db")
        .await?;

    // Initialize datastore and run migrations
    let datastore = PostgresDatastore::new(pool.clone()).await?;
    datastore.migrate().await?;

    // Create your application context
    let ctx = AppContext {
        db: pool,
        api_key: "secret".to_string(),
    };

    // Create job queue with context
    let queue = JobQueue::new(
        Arc::new(datastore),
        JobQueueConfig::default(),
        ctx,
    );

    // Register job handlers before starting workers
    queue.register::<SendEmailJob>()?;

    // Enqueue a job
    queue.enqueue(
        SendEmailJob {
            to: "user@example.com".to_string(),
            subject: "Welcome!".to_string(),
            body: "Thanks for signing up!".to_string(),
        }.job()?
    ).await?;

    // Start worker in background
    let queue_clone = queue.clone();
    tokio::spawn(async move {
        queue_clone.start_worker().await
    });

    Ok(())
}

Scheduled Jobs with Cron

// Schedule a job to run every 10 seconds
queue.enqueue(
    SendEmailJob {
        to: "admin@example.com".to_string(),
        subject: "Daily Report".to_string(),
        body: "Here's your daily report".to_string(),
    }
    .job()?
    .cron("*/10 * * * * * *")?
).await?;

Using Context in Jobs

Jobs receive a reference to the context during execution, giving access to shared resources:

use runat::{BackgroundJob, Executable, JobResult};
use serde::{Deserialize, Serialize};
use async_trait::async_trait;

#[derive(Debug, Clone, Serialize, Deserialize, BackgroundJob)]
pub struct ProcessPayment {
    pub user_id: String,
    pub amount: f64,
}

#[async_trait]
impl Executable<AppContext> for ProcessPayment {
    async fn execute(&mut self, ctx: &AppContext) -> JobResult<()> {
        // Use the database from context
        sqlx::query("INSERT INTO payments (user_id, amount) VALUES ($1, $2)")
            .bind(&self.user_id)
            .bind(self.amount)
            .execute(&ctx.db)
            .await?;

        // Use other context fields
        println!("Using API key: {}", ctx.api_key);

        Ok(())
    }

    // Optional: pre/post execution hooks also receive context
    async fn pre_execute(&mut self, ctx: &AppContext) {
        println!("About to process payment using db pool");
    }

    async fn post_execute(&mut self, ctx: &AppContext, result: JobResult<()>) -> JobResult<()> {
        if result.is_err() {
            // Log failure to database
        }
        result
    }
}

Job Registration

Important: You must register job handlers with the queue before workers can process them.

// Register all job types your workers will process
queue.register::<SendEmailJob>()?;
queue.register::<ProcessPayment>()?;

// Then start your workers
queue.start_worker().await?;

If a worker encounters a job type that hasn't been registered, it will fail the job with:

No handler registered for job type: SendEmailJob. Call queue.register::<T>() before starting workers.

Running Workers

Option 1: Run worker directly from the queue (recommended)

let queue_clone = queue.clone();
tokio::spawn(async move {
    queue_clone.start_worker().await
});

Option 2: Create a worker instance

let worker = queue.worker();
tokio::spawn(async move {
    worker.run().await
});

Retry Strategies

use runat::Retry;
use chrono::Duration;

// Create a job with retry on failure
queue.enqueue(
    SendEmailJob {
        to: "user@example.com".to_string(),
        subject: "Welcome!".to_string(),
        body: "Thanks for signing up!".to_string(),
    }
    .job()?
    .set_max_attempts(3)
    .retry(Retry::fixed(Duration::seconds(30)))
).await?;

Simple Jobs Without Context

If your jobs don't need shared state, use () as the context type:

#[derive(Debug, Clone, Serialize, Deserialize, BackgroundJob)]
pub struct SimpleJob {
    pub message: String,
}

#[async_trait]
impl Executable<()> for SimpleJob {
    async fn execute(&mut self, _ctx: &()) -> JobResult<()> {
        println!("{}", self.message);
        Ok(())
    }
}

// Create queue with unit context
let queue = JobQueue::with_datastore(Arc::new(datastore), ());

Running Tests

cargo test
Commit count: 0

cargo fmt