celers-beat

Crates.iocelers-beat
lib.rscelers-beat
version0.1.0
created_at2026-01-18 18:21:01.688285+00
updated_at2026-01-18 18:21:01.688285+00
descriptionPeriodic task scheduler for CeleRS (Celery Beat equivalent)
homepage
repositoryhttps://github.com/cool-japan/celers
max_upload_size
id2052797
size632,228
KitaSan (cool-japan)

documentation

README

celers-beat

Periodic task scheduler for CeleRS, equivalent to Celery Beat. Schedule tasks to run at regular intervals or specific times using interval or crontab expressions.

Overview

Production-ready task scheduler with comprehensive features:

Core Scheduling:

  • Interval Schedules: Execute every N seconds
  • Crontab Schedules: Unix cron-style scheduling with timezone support
  • Solar Schedules: Sunrise/sunset/twilight/golden hour events
  • One-Time Schedules: Run once at specific time (auto-cleanup)
  • Custom Schedules: User-defined scheduling logic via closures
  • Composite Schedules: Combine schedules with AND/OR logic

Task Management:

  • Persistent State: Track execution history across restarts
  • Schedule Versioning: Track and rollback schedule modifications
  • Task Dependencies: Define execution order with dependency chains
  • Groups & Tags: Organize tasks with hierarchical groups and tags
  • Batch Operations: Efficient bulk add/remove operations
  • Priority Support: High-priority task scheduling
  • Retry Policies: Exponential backoff and fixed delay strategies

Reliability & Monitoring:

  • Crash Recovery: Automatic recovery from scheduler crashes
  • Schedule Locking: Prevent duplicate execution across instances
  • Alerting System: Configurable alerts for failures and performance issues
  • Health Checks: Task health validation and stuck detection
  • Conflict Detection: Analyze overlapping schedules
  • Execution History: Track success/failure/duration statistics

Advanced Features:

  • Calendar Integration: Business hours, holidays, blackout periods
  • Webhook Alerts: Send alerts to external systems
  • Jitter & Catch-up: Avoid thundering herd, handle missed schedules
  • Schedule Indexing: Fast O(log n) due task lookup

Quick Start

Installation

[dependencies]
celers-beat = "0.1"
celers = { version = "0.1", features = ["redis"] }

Basic Example

use celers_beat::{BeatScheduler, Schedule, ScheduledTask};
use celers_broker_redis::RedisBroker;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create broker
    let broker = RedisBroker::new("redis://localhost:6379", "celery")?;

    // Create scheduler
    let mut scheduler = BeatScheduler::new();

    // Schedule task every 60 seconds
    let task = ScheduledTask::new(
        "send_report".to_string(),
        Schedule::interval(60)
    );
    scheduler.add_task("report", task);

    // Run scheduler
    scheduler.run(&broker).await?;

    Ok(())
}

Schedule Types

Interval Schedule

Execute tasks at fixed intervals:

use celers_beat::Schedule;

// Every 10 seconds
let schedule = Schedule::interval(10);

// Every 5 minutes
let schedule = Schedule::interval(300);

// Every hour
let schedule = Schedule::interval(3600);

// Every day
let schedule = Schedule::interval(86400);

Crontab Schedule (Optional)

Unix cron-style scheduling:

[dependencies]
celers-beat = { version = "0.1", features = ["cron"] }
use celers_beat::Schedule;

// Every minute
let schedule = Schedule::crontab("*", "*", "*", "*", "*");

// Every hour at minute 0
let schedule = Schedule::crontab("0", "*", "*", "*", "*");

// Every day at midnight
let schedule = Schedule::crontab("0", "0", "*", "*", "*");

// Every Monday at 9 AM
let schedule = Schedule::crontab("0", "9", "1", "*", "*");

// Weekdays at 9 AM
let schedule = Schedule::crontab("0", "9", "1-5", "*", "*");

Crontab format:

┌───────────── minute (0-59)
│ ┌───────────── hour (0-23)
│ │ ┌───────────── day of week (0-6, 0=Sunday)
│ │ │ ┌───────────── day of month (1-31)
│ │ │ │ ┌───────────── month (1-12)
│ │ │ │ │
* * * * *

Special characters:

  • * - Any value
  • */n - Every n units (e.g., */15 = every 15 minutes)
  • n-m - Range (e.g., 9-17 = 9 AM to 5 PM)
  • n,m - List (e.g., 1,15 = 1st and 15th)

Solar Schedule (Optional)

Execute tasks at sunrise/sunset:

[dependencies]
celers-beat = { version = "0.1", features = ["solar"] }
use celers_beat::Schedule;

// Run at sunrise in Tokyo
let schedule = Schedule::solar("sunrise", 35.6762, 139.6503);

// Run at sunset in New York
let schedule = Schedule::solar("sunset", 40.7128, -74.0060);

// Run at sunrise in London
let schedule = Schedule::solar("sunrise", 51.5074, -0.1278);

Supported events:

  • "sunrise" - Task runs at sunrise
  • "sunset" - Task runs at sunset

Notes:

  • Latitude/longitude must be in decimal degrees
  • Times are calculated in UTC
  • Next occurrence is searched up to 365 days ahead

Scheduled Tasks

Creating Tasks

use celers_beat::{ScheduledTask, Schedule};
use serde_json::json;

// Basic task
let task = ScheduledTask::new(
    "cleanup_old_files".to_string(),
    Schedule::interval(3600)
);

// Task with arguments
let task = ScheduledTask::new(
    "send_email".to_string(),
    Schedule::interval(300)
)
.with_args(vec![
    json!("admin@example.com"),
    json!("Daily Report"),
]);

// Task with keyword arguments
let mut kwargs = HashMap::new();
kwargs.insert("subject".to_string(), json!("Report"));
kwargs.insert("priority".to_string(), json!("high"));

let task = ScheduledTask::new(
    "send_email".to_string(),
    Schedule::interval(300)
)
.with_kwargs(kwargs);

// Disabled task (won't run)
let task = ScheduledTask::new(
    "maintenance".to_string(),
    Schedule::interval(3600)
)
.disabled();

Task Structure

pub struct ScheduledTask {
    /// Task name (must match registered task)
    pub name: String,

    /// Execution schedule
    pub schedule: Schedule,

    /// Positional arguments
    pub args: Vec<serde_json::Value>,

    /// Keyword arguments
    pub kwargs: HashMap<String, serde_json::Value>,

    /// Task options (queue, priority, expires)
    pub options: TaskOptions,

    /// Last execution timestamp
    pub last_run_at: Option<DateTime<Utc>>,

    /// Total number of executions
    pub total_run_count: u64,

    /// Enable/disable flag
    pub enabled: bool,
}

Beat Scheduler

Basic Usage

use celers_beat::{BeatScheduler, Schedule, ScheduledTask};

let mut scheduler = BeatScheduler::new();

// Add tasks
scheduler.add_task("report", ScheduledTask::new(
    "generate_report".to_string(),
    Schedule::interval(3600)
));

scheduler.add_task("cleanup", ScheduledTask::new(
    "cleanup_temp_files".to_string(),
    Schedule::interval(86400)
));

// Run scheduler (blocks until shutdown)
scheduler.run(&broker).await?;

Managing Tasks

// Add task
scheduler.add_task("my_task", task);

// Remove task
scheduler.remove_task("my_task");

// Enable/disable task
scheduler.enable_task("my_task");
scheduler.disable_task("my_task");

// Get task info
if let Some(task) = scheduler.get_task("my_task") {
    println!("Last run: {:?}", task.last_run_at);
    println!("Run count: {}", task.total_run_count);
}

// List all tasks
for (name, task) in scheduler.list_tasks() {
    println!("{}: enabled={}", name, task.enabled);
}

Persistent Schedules

Load/save schedule state to persist across restarts:

use std::fs;

// Save state
let state = scheduler.export_state()?;
fs::write("schedule_state.json", state)?;

// Load state
let state = fs::read_to_string("schedule_state.json")?;
scheduler.import_state(&state)?;

Configuration Examples

Periodic Reports

// Daily report at midnight
let daily_report = ScheduledTask::new(
    "daily_report".to_string(),
    Schedule::crontab("0", "0", "*", "*", "*")
);

// Weekly report every Monday at 9 AM
let weekly_report = ScheduledTask::new(
    "weekly_report".to_string(),
    Schedule::crontab("0", "9", "1", "*", "*")
);

// Monthly report on 1st at midnight
let monthly_report = ScheduledTask::new(
    "monthly_report".to_string(),
    Schedule::crontab("0", "0", "*", "1", "*")
);

Maintenance Tasks

// Cleanup every hour
let cleanup = ScheduledTask::new(
    "cleanup_temp_files".to_string(),
    Schedule::interval(3600)
);

// Database backup daily at 3 AM
let backup = ScheduledTask::new(
    "backup_database".to_string(),
    Schedule::crontab("0", "3", "*", "*", "*")
);

// Log rotation every 6 hours
let rotate_logs = ScheduledTask::new(
    "rotate_logs".to_string(),
    Schedule::interval(21600)
);

Monitoring Tasks

// Health check every 30 seconds
let health_check = ScheduledTask::new(
    "health_check".to_string(),
    Schedule::interval(30)
);

// Metrics collection every 5 minutes
let collect_metrics = ScheduledTask::new(
    "collect_metrics".to_string(),
    Schedule::interval(300)
);

// Alert check every minute
let check_alerts = ScheduledTask::new(
    "check_alerts".to_string(),
    Schedule::interval(60)
);

Schedule Builders and Templates

Schedule Builder - Fluent API

Create schedules with a chainable, intuitive API:

use celers_beat::ScheduleBuilder;

// Simple intervals
let schedule = ScheduleBuilder::new()
    .every_n_minutes(30)
    .build();

// Business hours only (Mon-Fri, 9 AM - 5 PM)
let schedule = ScheduleBuilder::new()
    .every_n_minutes(15)
    .business_hours_only()
    .build();

// Weekends only
let schedule = ScheduleBuilder::new()
    .every_n_hours(2)
    .weekends_only()
    .build();

// Weekdays with timezone
let schedule = ScheduleBuilder::new()
    .every_n_hours(1)
    .weekdays_only()
    .in_timezone("America/New_York")
    .build();

// Business hours in Tokyo
let schedule = ScheduleBuilder::new()
    .every_n_minutes(30)
    .business_hours_only()
    .in_timezone("Asia/Tokyo")
    .build();

Builder Methods:

  • every_n_seconds(n) - Every N seconds
  • every_n_minutes(n) - Every N minutes
  • every_n_hours(n) - Every N hours
  • every_n_days(n) - Every N days
  • business_hours_only() - Mon-Fri, 9 AM - 5 PM
  • weekdays_only() - Mon-Fri
  • weekends_only() - Sat-Sun
  • in_timezone(tz) - Set timezone
  • build() - Create the schedule

Schedule Templates

Pre-built schedules for common patterns:

use celers_beat::ScheduleTemplates;

// Common intervals
ScheduleTemplates::every_minute();
ScheduleTemplates::every_5_minutes();
ScheduleTemplates::every_15_minutes();
ScheduleTemplates::every_30_minutes();
ScheduleTemplates::hourly();
ScheduleTemplates::every_2_hours();
ScheduleTemplates::every_6_hours();
ScheduleTemplates::every_12_hours();

// Daily schedules
ScheduleTemplates::daily_at_midnight();
ScheduleTemplates::daily_at_hour(3); // 3 AM

// Weekly schedules
ScheduleTemplates::weekdays_at(9, 0); // Weekdays at 9:00 AM
ScheduleTemplates::weekly_on_monday(9, 0); // Monday at 9:00 AM
ScheduleTemplates::weekend_mornings(); // Sat/Sun at 8 AM

// Monthly schedules
ScheduleTemplates::monthly_first_day(); // 1st of month
ScheduleTemplates::monthly_last_day(); // 28-31st of month

// Business hours
ScheduleTemplates::business_hours_hourly();
ScheduleTemplates::business_hours_every_15_minutes();

// Quarterly
ScheduleTemplates::quarterly(); // Jan/Apr/Jul/Oct 1st

Practical Examples:

// Health check monitoring
let task = ScheduledTask::new(
    "health_check".to_string(),
    ScheduleTemplates::every_minute()
);

// Daily report at 3 AM
let task = ScheduledTask::new(
    "daily_report".to_string(),
    ScheduleTemplates::daily_at_hour(3)
);

// Business hours API sync
let task = ScheduledTask::new(
    "api_sync".to_string(),
    ScheduleTemplates::business_hours_every_15_minutes()
);

// Weekend batch processing
let task = ScheduledTask::new(
    "weekend_batch".to_string(),
    ScheduleBuilder::new()
        .every_n_hours(2)
        .weekends_only()
        .build()
);

See examples/schedule_builders.rs for comprehensive demonstrations.

Advanced Features

Schedule Versioning

Track and rollback schedule modifications:

use celers_beat::ScheduledTask;

let mut task = ScheduledTask::new("my_task".to_string(), Schedule::interval(60));

// Update schedule (creates version 2)
task.update_schedule(Schedule::interval(120));

// Update again (creates version 3)
task.update_schedule(Schedule::interval(180));

// Rollback to version 2
task.rollback_to_version(1)?;

// View version history
for version in task.get_version_history() {
    println!("Version {} at {}", version.version, version.timestamp);
}

Task Dependencies

Define task execution order with dependency chains:

let extract = ScheduledTask::new("extract_data".to_string(), Schedule::interval(3600))
    .with_group("etl");

let transform = ScheduledTask::new("transform_data".to_string(), Schedule::interval(3600))
    .with_dependencies(vec!["extract_data".to_string()])
    .with_group("etl");

let load = ScheduledTask::new("load_data".to_string(), Schedule::interval(3600))
    .with_dependencies(vec!["transform_data".to_string()])
    .with_group("etl");

scheduler.add_task(extract)?;
scheduler.add_task(transform)?;
scheduler.add_task(load)?;

// Validate dependencies (detects circular dependencies)
scheduler.validate_dependencies()?;

// Get dependency chain
let chain = scheduler.resolve_dependency_chain("load_data")?;

Schedule Locking

Prevent duplicate task execution across multiple scheduler instances:

// Set unique instance ID
scheduler.set_instance_id("scheduler-001");

// Try to acquire lock
if scheduler.try_acquire_lock("my_task", 300)? {
    // Lock acquired, safe to execute

    // Renew lock if task runs longer
    scheduler.renew_lock("my_task", 300)?;

    // Release when done
    scheduler.release_lock("my_task")?;
}

Alerting System

Configure alerts for task failures and performance issues:

use celers_beat::AlertConfig;

let alert_config = AlertConfig {
    enabled: true,
    consecutive_failures_threshold: 3,
    failure_rate_threshold: 0.5,  // 50%
    slow_execution_threshold_ms: Some(5000),
    alert_on_missed_schedule: true,
    alert_on_stuck_task: true,
};

let task = ScheduledTask::new("critical_task".to_string(), Schedule::interval(60))
    .with_alert_config(alert_config);

// Register alert callback
scheduler.on_alert(Arc::new(|alert| {
    eprintln!("[{}] {} - {}", alert.level, alert.task_name, alert.message);
}));

// Check alerts
scheduler.check_all_alerts();
let critical = scheduler.get_critical_alerts();

Crash Recovery

Automatic recovery from scheduler crashes:

// On restart, detect interrupted tasks
let crashed_tasks = scheduler.detect_crashed_tasks();

// Recover from crash
scheduler.recover_from_crash()?;

// Get tasks ready for retry
let retry_tasks = scheduler.get_tasks_ready_for_crash_retry();

Conflict Detection

Detect and analyze overlapping schedules:

// Detect conflicts (tasks scheduled too close together)
let conflicts = scheduler.detect_conflicts(60)?; // 60 second window

// Get high-severity conflicts only
let high_severity = scheduler.get_high_severity_conflicts(&conflicts);

for conflict in high_severity {
    println!("Conflict: {} and {} overlap by {}s",
        conflict.task1_name, conflict.task2_name, conflict.overlap_seconds);
}

Groups and Tags

Organize tasks with groups and tags:

let task = ScheduledTask::new("report".to_string(), Schedule::interval(3600))
    .with_group("reports")
    .with_tags(vec!["daily".to_string(), "email".to_string()]);

// Query by group
let reports = scheduler.get_tasks_by_group("reports");

// Query by tag
let daily = scheduler.get_tasks_by_tag("daily");

// Bulk operations
scheduler.enable_group("reports");
scheduler.disable_tag("cleanup");

Batch Operations

Efficient bulk task management:

// Add multiple tasks in one operation
let tasks = vec![
    ScheduledTask::new("task1".to_string(), Schedule::interval(60)),
    ScheduledTask::new("task2".to_string(), Schedule::interval(120)),
];
scheduler.add_tasks_batch(tasks)?;

// Remove multiple tasks
scheduler.remove_tasks_batch(&["task1", "task2"])?;

Schedule Composition

Combine multiple schedules with AND/OR logic:

use celers_beat::CompositeSchedule;

// OR: Run when ANY schedule is due (earliest)
let composite_or = CompositeSchedule::or(vec![
    Schedule::interval(60),
    Schedule::interval(120),
]);

// AND: Run when ALL schedules are due (latest)
let composite_and = CompositeSchedule::and(vec![
    Schedule::interval(60),
    Schedule::interval(120),
]);

Custom Schedules

Define custom scheduling logic:

use celers_beat::CustomSchedule;

let custom = CustomSchedule::new(
    "business_hours",
    |last_run| {
        let mut next = last_run.unwrap_or_else(Utc::now);
        // Custom logic to find next business hour
        // ...
        Ok(next)
    }
);

Priority Scheduling

let task = ScheduledTask::new(
    "critical_task".to_string(),
    Schedule::interval(60)
);

task.options.priority = Some(9);  // Highest priority

Custom Queue

let task = ScheduledTask::new(
    "background_task".to_string(),
    Schedule::interval(300)
);

task.options.queue = Some("low_priority".to_string());

Task Expiration

let task = ScheduledTask::new(
    "time_sensitive".to_string(),
    Schedule::interval(60)
);

task.options.expires = Some(300);  // Expire after 5 minutes

Conditional Execution

// Check if task is due before running
if task.is_due()? {
    println!("Task is due, executing...");
    // Execute task
} else {
    println!("Task not due yet");
}

Production Deployment

Systemd Service

[Unit]
Description=CeleRS Beat Scheduler
After=network.target redis.service

[Service]
Type=simple
User=celery
WorkingDirectory=/opt/celery
ExecStart=/opt/celery/beat
Restart=on-failure
RestartSec=10

[Install]
WantedBy=multi-user.target

Docker

FROM rust:1.70 AS builder
WORKDIR /app
COPY . .
RUN cargo build --release --bin beat

FROM debian:bullseye-slim
COPY --from=builder /app/target/release/beat /usr/local/bin/
CMD ["beat"]

High Availability

Important: Only run one beat scheduler instance to avoid duplicate task execution!

// Use leader election or singleton pattern
use tokio::sync::Mutex;
use std::sync::Arc;

let lock = Arc::new(Mutex::new(()));

// Acquire lock before starting
let _guard = lock.lock().await;
scheduler.run(&broker).await?;

Monitoring

Scheduled Task Metrics

// Track execution
for (name, task) in scheduler.list_tasks() {
    println!("Task: {}", name);
    println!("  Last run: {:?}", task.last_run_at);
    println!("  Total runs: {}", task.total_run_count);
    println!("  Enabled: {}", task.enabled);
}

Health Checks

use tokio::time::{interval, Duration};

// Periodic health check
let mut ticker = interval(Duration::from_secs(60));
loop {
    ticker.tick().await;

    // Check scheduler is running
    if !scheduler.is_running() {
        eprintln!("WARNING: Scheduler not running!");
        // Alert or restart
    }
}

Best Practices

1. Single Scheduler Instance

// ❌ Bad: Multiple schedulers (duplicates tasks)
// Worker 1: scheduler.run()
// Worker 2: scheduler.run()

// ✅ Good: Single scheduler instance
// Beat server: scheduler.run()
// Workers: Only execute tasks

2. Persistent State

// Save state on shutdown
use tokio::signal;

tokio::select! {
    _ = scheduler.run(&broker) => {}
    _ = signal::ctrl_c() => {
        println!("Saving scheduler state...");
        let state = scheduler.export_state()?;
        fs::write("state.json", state)?;
    }
}

3. Timezone Handling

// Always use UTC internally
use chrono::Utc;

let now = Utc::now();
let next_run = schedule.next_run(Some(now))?;

// Convert to local time for display
use chrono_tz::America::New_York;
let local = next_run.with_timezone(&New_York);
println!("Next run: {}", local);

Comprehensive Timezone Utilities

The crate provides comprehensive timezone utilities for working with schedules across different timezones:

use celers_beat::TimezoneUtils;

// Automatic system timezone detection
let system_tz = TimezoneUtils::detect_system_timezone();

// Validate timezone names
assert!(TimezoneUtils::is_valid_timezone("America/New_York"));

// List and search timezones
let all_timezones = TimezoneUtils::list_all_timezones(); // 600+ timezones
let us_zones = TimezoneUtils::search_timezones("america");

// Get current time in multiple timezones
let zones = vec!["America/New_York", "Europe/London", "Asia/Tokyo"];
let times = TimezoneUtils::current_time_in_zones(&zones);

// Check DST status
let is_dst = TimezoneUtils::is_dst_active("America/New_York", None)?;

// Get UTC offset
let offset = TimezoneUtils::get_utc_offset("Asia/Tokyo", None)?;

// Get detailed timezone info
let info = TimezoneUtils::get_timezone_info("Europe/London", None)?;
println!("{}", info); // Europe/London (UTC+0.0h, GMT, DST: No): ...

// Convert between timezones
let tokyo_time = TimezoneUtils::convert_between_timezones(
    Utc::now(),
    "America/New_York",
    "Asia/Tokyo"
)?;

// Get common timezone abbreviations
let abbrevs = TimezoneUtils::get_common_timezone_abbreviations();
let ny_tz = abbrevs.get("EST"); // Returns "America/New_York"

// Calculate time until next occurrence in timezone
let duration = TimezoneUtils::time_until_next_occurrence(9, 0, "America/New_York")?;

See examples/timezone_utilities.rs for a comprehensive demonstration.

4. Error Handling

// Graceful error handling
loop {
    match scheduler.tick(&broker).await {
        Ok(executed) => {
            println!("Executed {} tasks", executed);
        }
        Err(e) => {
            eprintln!("Scheduler error: {}", e);
            // Log but continue
        }
    }

    tokio::time::sleep(Duration::from_secs(1)).await;
}

5. Task Idempotency

// Ensure tasks are idempotent (safe to run multiple times)
registry.register("generate_report", |args| async move {
    let report_id = generate_unique_id();

    // Check if already generated
    if report_exists(report_id).await? {
        return Ok("Already generated".to_string());
    }

    // Generate report
    generate_report(report_id).await?;
    Ok(format!("Generated report {}", report_id))
});

Troubleshooting

Tasks not executing

Check:

  1. Scheduler is running: scheduler.is_running()
  2. Task is enabled: task.enabled == true
  3. Schedule is correct: task.schedule.next_run()
  4. Broker connection: broker.ping()

Duplicate executions

Cause: Multiple scheduler instances running Solution: Ensure only one scheduler instance

Missed schedules

Cause: Scheduler was down during scheduled time Solution: Implement catchup logic or use persistent state

Timezone issues

Cause: Mixing UTC and local time Solution: Always use UTC internally, convert for display only

Performance

Scheduling Overhead

Schedule Type Overhead Memory
Interval <1ms ~100B per task
Crontab <5ms ~200B per task
Solar <10ms ~300B per task

Scalability

  • Tasks: 10,000+ scheduled tasks
  • Precision: 1-second granularity
  • Latency: <10ms schedule evaluation

Comparison with Celery Beat

Feature Celery Beat CeleRS Beat
Interval schedules
Crontab schedules ✅ (with timezone support)
Solar schedules ✅ (with twilight & golden hour)
One-time schedules
Custom schedules
Schedule versioning
Task dependencies
Crash recovery
Alerting system
Schedule locking
Conflict detection
Business calendars
Persistent state File/DB JSON (auto-save)
Performance ~100 tasks/sec ~1000 tasks/sec
Memory 50MB+ <10MB

See Also

  • Worker: celers-worker - Task execution runtime
  • Broker: celers-broker-redis - Message broker
  • Core: celers-core - Task registry

License

MIT OR Apache-2.0

Commit count: 1

cargo fmt