| Crates.io | celers-beat |
| lib.rs | celers-beat |
| version | 0.1.0 |
| created_at | 2026-01-18 18:21:01.688285+00 |
| updated_at | 2026-01-18 18:21:01.688285+00 |
| description | Periodic task scheduler for CeleRS (Celery Beat equivalent) |
| homepage | |
| repository | https://github.com/cool-japan/celers |
| max_upload_size | |
| id | 2052797 |
| size | 632,228 |
Periodic task scheduler for CeleRS, equivalent to Celery Beat. Schedule tasks to run at regular intervals or specific times using interval or crontab expressions.
Production-ready task scheduler with comprehensive features:
Core Scheduling:
Task Management:
Reliability & Monitoring:
Advanced Features:
[dependencies]
celers-beat = "0.1"
celers = { version = "0.1", features = ["redis"] }
use celers_beat::{BeatScheduler, Schedule, ScheduledTask};
use celers_broker_redis::RedisBroker;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create broker
let broker = RedisBroker::new("redis://localhost:6379", "celery")?;
// Create scheduler
let mut scheduler = BeatScheduler::new();
// Schedule task every 60 seconds
let task = ScheduledTask::new(
"send_report".to_string(),
Schedule::interval(60)
);
scheduler.add_task("report", task);
// Run scheduler
scheduler.run(&broker).await?;
Ok(())
}
Execute tasks at fixed intervals:
use celers_beat::Schedule;
// Every 10 seconds
let schedule = Schedule::interval(10);
// Every 5 minutes
let schedule = Schedule::interval(300);
// Every hour
let schedule = Schedule::interval(3600);
// Every day
let schedule = Schedule::interval(86400);
Unix cron-style scheduling:
[dependencies]
celers-beat = { version = "0.1", features = ["cron"] }
use celers_beat::Schedule;
// Every minute
let schedule = Schedule::crontab("*", "*", "*", "*", "*");
// Every hour at minute 0
let schedule = Schedule::crontab("0", "*", "*", "*", "*");
// Every day at midnight
let schedule = Schedule::crontab("0", "0", "*", "*", "*");
// Every Monday at 9 AM
let schedule = Schedule::crontab("0", "9", "1", "*", "*");
// Weekdays at 9 AM
let schedule = Schedule::crontab("0", "9", "1-5", "*", "*");
Crontab format:
┌───────────── minute (0-59)
│ ┌───────────── hour (0-23)
│ │ ┌───────────── day of week (0-6, 0=Sunday)
│ │ │ ┌───────────── day of month (1-31)
│ │ │ │ ┌───────────── month (1-12)
│ │ │ │ │
* * * * *
Special characters:
* - Any value*/n - Every n units (e.g., */15 = every 15 minutes)n-m - Range (e.g., 9-17 = 9 AM to 5 PM)n,m - List (e.g., 1,15 = 1st and 15th)Execute tasks at sunrise/sunset:
[dependencies]
celers-beat = { version = "0.1", features = ["solar"] }
use celers_beat::Schedule;
// Run at sunrise in Tokyo
let schedule = Schedule::solar("sunrise", 35.6762, 139.6503);
// Run at sunset in New York
let schedule = Schedule::solar("sunset", 40.7128, -74.0060);
// Run at sunrise in London
let schedule = Schedule::solar("sunrise", 51.5074, -0.1278);
Supported events:
"sunrise" - Task runs at sunrise"sunset" - Task runs at sunsetNotes:
use celers_beat::{ScheduledTask, Schedule};
use serde_json::json;
// Basic task
let task = ScheduledTask::new(
"cleanup_old_files".to_string(),
Schedule::interval(3600)
);
// Task with arguments
let task = ScheduledTask::new(
"send_email".to_string(),
Schedule::interval(300)
)
.with_args(vec![
json!("admin@example.com"),
json!("Daily Report"),
]);
// Task with keyword arguments
let mut kwargs = HashMap::new();
kwargs.insert("subject".to_string(), json!("Report"));
kwargs.insert("priority".to_string(), json!("high"));
let task = ScheduledTask::new(
"send_email".to_string(),
Schedule::interval(300)
)
.with_kwargs(kwargs);
// Disabled task (won't run)
let task = ScheduledTask::new(
"maintenance".to_string(),
Schedule::interval(3600)
)
.disabled();
pub struct ScheduledTask {
/// Task name (must match registered task)
pub name: String,
/// Execution schedule
pub schedule: Schedule,
/// Positional arguments
pub args: Vec<serde_json::Value>,
/// Keyword arguments
pub kwargs: HashMap<String, serde_json::Value>,
/// Task options (queue, priority, expires)
pub options: TaskOptions,
/// Last execution timestamp
pub last_run_at: Option<DateTime<Utc>>,
/// Total number of executions
pub total_run_count: u64,
/// Enable/disable flag
pub enabled: bool,
}
use celers_beat::{BeatScheduler, Schedule, ScheduledTask};
let mut scheduler = BeatScheduler::new();
// Add tasks
scheduler.add_task("report", ScheduledTask::new(
"generate_report".to_string(),
Schedule::interval(3600)
));
scheduler.add_task("cleanup", ScheduledTask::new(
"cleanup_temp_files".to_string(),
Schedule::interval(86400)
));
// Run scheduler (blocks until shutdown)
scheduler.run(&broker).await?;
// Add task
scheduler.add_task("my_task", task);
// Remove task
scheduler.remove_task("my_task");
// Enable/disable task
scheduler.enable_task("my_task");
scheduler.disable_task("my_task");
// Get task info
if let Some(task) = scheduler.get_task("my_task") {
println!("Last run: {:?}", task.last_run_at);
println!("Run count: {}", task.total_run_count);
}
// List all tasks
for (name, task) in scheduler.list_tasks() {
println!("{}: enabled={}", name, task.enabled);
}
Load/save schedule state to persist across restarts:
use std::fs;
// Save state
let state = scheduler.export_state()?;
fs::write("schedule_state.json", state)?;
// Load state
let state = fs::read_to_string("schedule_state.json")?;
scheduler.import_state(&state)?;
// Daily report at midnight
let daily_report = ScheduledTask::new(
"daily_report".to_string(),
Schedule::crontab("0", "0", "*", "*", "*")
);
// Weekly report every Monday at 9 AM
let weekly_report = ScheduledTask::new(
"weekly_report".to_string(),
Schedule::crontab("0", "9", "1", "*", "*")
);
// Monthly report on 1st at midnight
let monthly_report = ScheduledTask::new(
"monthly_report".to_string(),
Schedule::crontab("0", "0", "*", "1", "*")
);
// Cleanup every hour
let cleanup = ScheduledTask::new(
"cleanup_temp_files".to_string(),
Schedule::interval(3600)
);
// Database backup daily at 3 AM
let backup = ScheduledTask::new(
"backup_database".to_string(),
Schedule::crontab("0", "3", "*", "*", "*")
);
// Log rotation every 6 hours
let rotate_logs = ScheduledTask::new(
"rotate_logs".to_string(),
Schedule::interval(21600)
);
// Health check every 30 seconds
let health_check = ScheduledTask::new(
"health_check".to_string(),
Schedule::interval(30)
);
// Metrics collection every 5 minutes
let collect_metrics = ScheduledTask::new(
"collect_metrics".to_string(),
Schedule::interval(300)
);
// Alert check every minute
let check_alerts = ScheduledTask::new(
"check_alerts".to_string(),
Schedule::interval(60)
);
Create schedules with a chainable, intuitive API:
use celers_beat::ScheduleBuilder;
// Simple intervals
let schedule = ScheduleBuilder::new()
.every_n_minutes(30)
.build();
// Business hours only (Mon-Fri, 9 AM - 5 PM)
let schedule = ScheduleBuilder::new()
.every_n_minutes(15)
.business_hours_only()
.build();
// Weekends only
let schedule = ScheduleBuilder::new()
.every_n_hours(2)
.weekends_only()
.build();
// Weekdays with timezone
let schedule = ScheduleBuilder::new()
.every_n_hours(1)
.weekdays_only()
.in_timezone("America/New_York")
.build();
// Business hours in Tokyo
let schedule = ScheduleBuilder::new()
.every_n_minutes(30)
.business_hours_only()
.in_timezone("Asia/Tokyo")
.build();
Builder Methods:
every_n_seconds(n) - Every N secondsevery_n_minutes(n) - Every N minutesevery_n_hours(n) - Every N hoursevery_n_days(n) - Every N daysbusiness_hours_only() - Mon-Fri, 9 AM - 5 PMweekdays_only() - Mon-Friweekends_only() - Sat-Sunin_timezone(tz) - Set timezonebuild() - Create the schedulePre-built schedules for common patterns:
use celers_beat::ScheduleTemplates;
// Common intervals
ScheduleTemplates::every_minute();
ScheduleTemplates::every_5_minutes();
ScheduleTemplates::every_15_minutes();
ScheduleTemplates::every_30_minutes();
ScheduleTemplates::hourly();
ScheduleTemplates::every_2_hours();
ScheduleTemplates::every_6_hours();
ScheduleTemplates::every_12_hours();
// Daily schedules
ScheduleTemplates::daily_at_midnight();
ScheduleTemplates::daily_at_hour(3); // 3 AM
// Weekly schedules
ScheduleTemplates::weekdays_at(9, 0); // Weekdays at 9:00 AM
ScheduleTemplates::weekly_on_monday(9, 0); // Monday at 9:00 AM
ScheduleTemplates::weekend_mornings(); // Sat/Sun at 8 AM
// Monthly schedules
ScheduleTemplates::monthly_first_day(); // 1st of month
ScheduleTemplates::monthly_last_day(); // 28-31st of month
// Business hours
ScheduleTemplates::business_hours_hourly();
ScheduleTemplates::business_hours_every_15_minutes();
// Quarterly
ScheduleTemplates::quarterly(); // Jan/Apr/Jul/Oct 1st
Practical Examples:
// Health check monitoring
let task = ScheduledTask::new(
"health_check".to_string(),
ScheduleTemplates::every_minute()
);
// Daily report at 3 AM
let task = ScheduledTask::new(
"daily_report".to_string(),
ScheduleTemplates::daily_at_hour(3)
);
// Business hours API sync
let task = ScheduledTask::new(
"api_sync".to_string(),
ScheduleTemplates::business_hours_every_15_minutes()
);
// Weekend batch processing
let task = ScheduledTask::new(
"weekend_batch".to_string(),
ScheduleBuilder::new()
.every_n_hours(2)
.weekends_only()
.build()
);
See examples/schedule_builders.rs for comprehensive demonstrations.
Track and rollback schedule modifications:
use celers_beat::ScheduledTask;
let mut task = ScheduledTask::new("my_task".to_string(), Schedule::interval(60));
// Update schedule (creates version 2)
task.update_schedule(Schedule::interval(120));
// Update again (creates version 3)
task.update_schedule(Schedule::interval(180));
// Rollback to version 2
task.rollback_to_version(1)?;
// View version history
for version in task.get_version_history() {
println!("Version {} at {}", version.version, version.timestamp);
}
Define task execution order with dependency chains:
let extract = ScheduledTask::new("extract_data".to_string(), Schedule::interval(3600))
.with_group("etl");
let transform = ScheduledTask::new("transform_data".to_string(), Schedule::interval(3600))
.with_dependencies(vec!["extract_data".to_string()])
.with_group("etl");
let load = ScheduledTask::new("load_data".to_string(), Schedule::interval(3600))
.with_dependencies(vec!["transform_data".to_string()])
.with_group("etl");
scheduler.add_task(extract)?;
scheduler.add_task(transform)?;
scheduler.add_task(load)?;
// Validate dependencies (detects circular dependencies)
scheduler.validate_dependencies()?;
// Get dependency chain
let chain = scheduler.resolve_dependency_chain("load_data")?;
Prevent duplicate task execution across multiple scheduler instances:
// Set unique instance ID
scheduler.set_instance_id("scheduler-001");
// Try to acquire lock
if scheduler.try_acquire_lock("my_task", 300)? {
// Lock acquired, safe to execute
// Renew lock if task runs longer
scheduler.renew_lock("my_task", 300)?;
// Release when done
scheduler.release_lock("my_task")?;
}
Configure alerts for task failures and performance issues:
use celers_beat::AlertConfig;
let alert_config = AlertConfig {
enabled: true,
consecutive_failures_threshold: 3,
failure_rate_threshold: 0.5, // 50%
slow_execution_threshold_ms: Some(5000),
alert_on_missed_schedule: true,
alert_on_stuck_task: true,
};
let task = ScheduledTask::new("critical_task".to_string(), Schedule::interval(60))
.with_alert_config(alert_config);
// Register alert callback
scheduler.on_alert(Arc::new(|alert| {
eprintln!("[{}] {} - {}", alert.level, alert.task_name, alert.message);
}));
// Check alerts
scheduler.check_all_alerts();
let critical = scheduler.get_critical_alerts();
Automatic recovery from scheduler crashes:
// On restart, detect interrupted tasks
let crashed_tasks = scheduler.detect_crashed_tasks();
// Recover from crash
scheduler.recover_from_crash()?;
// Get tasks ready for retry
let retry_tasks = scheduler.get_tasks_ready_for_crash_retry();
Detect and analyze overlapping schedules:
// Detect conflicts (tasks scheduled too close together)
let conflicts = scheduler.detect_conflicts(60)?; // 60 second window
// Get high-severity conflicts only
let high_severity = scheduler.get_high_severity_conflicts(&conflicts);
for conflict in high_severity {
println!("Conflict: {} and {} overlap by {}s",
conflict.task1_name, conflict.task2_name, conflict.overlap_seconds);
}
Organize tasks with groups and tags:
let task = ScheduledTask::new("report".to_string(), Schedule::interval(3600))
.with_group("reports")
.with_tags(vec!["daily".to_string(), "email".to_string()]);
// Query by group
let reports = scheduler.get_tasks_by_group("reports");
// Query by tag
let daily = scheduler.get_tasks_by_tag("daily");
// Bulk operations
scheduler.enable_group("reports");
scheduler.disable_tag("cleanup");
Efficient bulk task management:
// Add multiple tasks in one operation
let tasks = vec![
ScheduledTask::new("task1".to_string(), Schedule::interval(60)),
ScheduledTask::new("task2".to_string(), Schedule::interval(120)),
];
scheduler.add_tasks_batch(tasks)?;
// Remove multiple tasks
scheduler.remove_tasks_batch(&["task1", "task2"])?;
Combine multiple schedules with AND/OR logic:
use celers_beat::CompositeSchedule;
// OR: Run when ANY schedule is due (earliest)
let composite_or = CompositeSchedule::or(vec![
Schedule::interval(60),
Schedule::interval(120),
]);
// AND: Run when ALL schedules are due (latest)
let composite_and = CompositeSchedule::and(vec![
Schedule::interval(60),
Schedule::interval(120),
]);
Define custom scheduling logic:
use celers_beat::CustomSchedule;
let custom = CustomSchedule::new(
"business_hours",
|last_run| {
let mut next = last_run.unwrap_or_else(Utc::now);
// Custom logic to find next business hour
// ...
Ok(next)
}
);
let task = ScheduledTask::new(
"critical_task".to_string(),
Schedule::interval(60)
);
task.options.priority = Some(9); // Highest priority
let task = ScheduledTask::new(
"background_task".to_string(),
Schedule::interval(300)
);
task.options.queue = Some("low_priority".to_string());
let task = ScheduledTask::new(
"time_sensitive".to_string(),
Schedule::interval(60)
);
task.options.expires = Some(300); // Expire after 5 minutes
// Check if task is due before running
if task.is_due()? {
println!("Task is due, executing...");
// Execute task
} else {
println!("Task not due yet");
}
[Unit]
Description=CeleRS Beat Scheduler
After=network.target redis.service
[Service]
Type=simple
User=celery
WorkingDirectory=/opt/celery
ExecStart=/opt/celery/beat
Restart=on-failure
RestartSec=10
[Install]
WantedBy=multi-user.target
FROM rust:1.70 AS builder
WORKDIR /app
COPY . .
RUN cargo build --release --bin beat
FROM debian:bullseye-slim
COPY --from=builder /app/target/release/beat /usr/local/bin/
CMD ["beat"]
Important: Only run one beat scheduler instance to avoid duplicate task execution!
// Use leader election or singleton pattern
use tokio::sync::Mutex;
use std::sync::Arc;
let lock = Arc::new(Mutex::new(()));
// Acquire lock before starting
let _guard = lock.lock().await;
scheduler.run(&broker).await?;
// Track execution
for (name, task) in scheduler.list_tasks() {
println!("Task: {}", name);
println!(" Last run: {:?}", task.last_run_at);
println!(" Total runs: {}", task.total_run_count);
println!(" Enabled: {}", task.enabled);
}
use tokio::time::{interval, Duration};
// Periodic health check
let mut ticker = interval(Duration::from_secs(60));
loop {
ticker.tick().await;
// Check scheduler is running
if !scheduler.is_running() {
eprintln!("WARNING: Scheduler not running!");
// Alert or restart
}
}
// ❌ Bad: Multiple schedulers (duplicates tasks)
// Worker 1: scheduler.run()
// Worker 2: scheduler.run()
// ✅ Good: Single scheduler instance
// Beat server: scheduler.run()
// Workers: Only execute tasks
// Save state on shutdown
use tokio::signal;
tokio::select! {
_ = scheduler.run(&broker) => {}
_ = signal::ctrl_c() => {
println!("Saving scheduler state...");
let state = scheduler.export_state()?;
fs::write("state.json", state)?;
}
}
// Always use UTC internally
use chrono::Utc;
let now = Utc::now();
let next_run = schedule.next_run(Some(now))?;
// Convert to local time for display
use chrono_tz::America::New_York;
let local = next_run.with_timezone(&New_York);
println!("Next run: {}", local);
The crate provides comprehensive timezone utilities for working with schedules across different timezones:
use celers_beat::TimezoneUtils;
// Automatic system timezone detection
let system_tz = TimezoneUtils::detect_system_timezone();
// Validate timezone names
assert!(TimezoneUtils::is_valid_timezone("America/New_York"));
// List and search timezones
let all_timezones = TimezoneUtils::list_all_timezones(); // 600+ timezones
let us_zones = TimezoneUtils::search_timezones("america");
// Get current time in multiple timezones
let zones = vec!["America/New_York", "Europe/London", "Asia/Tokyo"];
let times = TimezoneUtils::current_time_in_zones(&zones);
// Check DST status
let is_dst = TimezoneUtils::is_dst_active("America/New_York", None)?;
// Get UTC offset
let offset = TimezoneUtils::get_utc_offset("Asia/Tokyo", None)?;
// Get detailed timezone info
let info = TimezoneUtils::get_timezone_info("Europe/London", None)?;
println!("{}", info); // Europe/London (UTC+0.0h, GMT, DST: No): ...
// Convert between timezones
let tokyo_time = TimezoneUtils::convert_between_timezones(
Utc::now(),
"America/New_York",
"Asia/Tokyo"
)?;
// Get common timezone abbreviations
let abbrevs = TimezoneUtils::get_common_timezone_abbreviations();
let ny_tz = abbrevs.get("EST"); // Returns "America/New_York"
// Calculate time until next occurrence in timezone
let duration = TimezoneUtils::time_until_next_occurrence(9, 0, "America/New_York")?;
See examples/timezone_utilities.rs for a comprehensive demonstration.
// Graceful error handling
loop {
match scheduler.tick(&broker).await {
Ok(executed) => {
println!("Executed {} tasks", executed);
}
Err(e) => {
eprintln!("Scheduler error: {}", e);
// Log but continue
}
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
// Ensure tasks are idempotent (safe to run multiple times)
registry.register("generate_report", |args| async move {
let report_id = generate_unique_id();
// Check if already generated
if report_exists(report_id).await? {
return Ok("Already generated".to_string());
}
// Generate report
generate_report(report_id).await?;
Ok(format!("Generated report {}", report_id))
});
Check:
scheduler.is_running()task.enabled == truetask.schedule.next_run()broker.ping()Cause: Multiple scheduler instances running Solution: Ensure only one scheduler instance
Cause: Scheduler was down during scheduled time Solution: Implement catchup logic or use persistent state
Cause: Mixing UTC and local time Solution: Always use UTC internally, convert for display only
| Schedule Type | Overhead | Memory |
|---|---|---|
| Interval | <1ms | ~100B per task |
| Crontab | <5ms | ~200B per task |
| Solar | <10ms | ~300B per task |
| Feature | Celery Beat | CeleRS Beat |
|---|---|---|
| Interval schedules | ✅ | ✅ |
| Crontab schedules | ✅ | ✅ (with timezone support) |
| Solar schedules | ✅ | ✅ (with twilight & golden hour) |
| One-time schedules | ❌ | ✅ |
| Custom schedules | ❌ | ✅ |
| Schedule versioning | ❌ | ✅ |
| Task dependencies | ❌ | ✅ |
| Crash recovery | ❌ | ✅ |
| Alerting system | ❌ | ✅ |
| Schedule locking | ❌ | ✅ |
| Conflict detection | ❌ | ✅ |
| Business calendars | ❌ | ✅ |
| Persistent state | File/DB | JSON (auto-save) |
| Performance | ~100 tasks/sec | ~1000 tasks/sec |
| Memory | 50MB+ | <10MB |
celers-worker - Task execution runtimecelers-broker-redis - Message brokercelers-core - Task registryMIT OR Apache-2.0