| Crates.io | rust-task-queue-macro |
| lib.rs | rust-task-queue-macro |
| version | 0.1.0 |
| created_at | 2025-06-19 06:57:07.705847+00 |
| updated_at | 2025-06-19 06:57:07.705847+00 |
| description | Procedural macros for rust-task-queue automatic task registration |
| homepage | https://github.com/longbowou/rust-task-queue |
| repository | https://github.com/longbowou/rust-task-queue |
| max_upload_size | |
| id | 1717900 |
| size | 35,788 |
A high-performance, Redis-backed task queue framework with enhanced auto-scaling, intelligent async task spawning, multidimensional scaling triggers, and advanced backpressure management for async Rust applications.
| Feature | Rust Task Queue | Celery | Sidekiq | Bull |
|---|---|---|---|---|
| Language | Rust | Python | Ruby | Node.js |
| Auto-scaling | Multi-dimensional | ❌ | ❌ | ❌ |
| Performance | <40ns serialization | ~ms | ~ms | ~ms |
| Type Safety | Compile-time | ❌ Runtime | ❌ Runtime | ❌ Runtime |
| Memory Safety | Zero-copy | ❌ | ❌ | ❌ |
| Async/Await | Native | ❌ | ❌ | ✅ |
Recent benchmark results demonstrate exceptional performance:
| Operation | Time | Status |
|---|---|---|
| Task Serialization | ~39.15 ns | Excellent |
| Task Deserialization | ~31.51 ns | Excellent |
| Queue Config Lookup | ~39.76 ns | Excellent |
| Queue Management | ~1.38 µs | Very Good |
| Enhanced AutoScaler Config | ~617 ps | Outstanding |
Benchmarks run on optimized release builds with statistical analysis
default: tracing + auto-register + config + cli (recommended)full: All features enabled for maximum functionalitytracing: enterprise-grade structured logging and observability
actix-integration: Actix Web framework integration with built-in endpointsaxum-integration: Axum framework integration with comprehensive metrics and CORS supportcli: Standalone worker binaries with logging configuration supportauto-register: Automatic task discovery via procedural macrosconfig: External TOML/YAML configuration files# Web application with Actix Web (recommended)
rust-task-queue = { version = "0.1", features = ["tracing", "auto-register", "actix-integration", "config", "cli"] }
# Web application with Axum framework
rust-task-queue = { version = "0.1", features = ["tracing", "auto-register", "axum-integration", "config", "cli"] }
# Standalone worker processes
rust-task-queue = { version = "0.1", features = ["tracing", "auto-register", "cli", "config"] }
# Minimal embedded systems
rust-task-queue = { version = "0.1", default-features = false, features = ["tracing"] }
# Development/testing
rust-task-queue = { version = "0.1", features = ["full"] }
# Library integration (no CLI tools)
rust-task-queue = { version = "0.1", features = ["tracing", "auto-register", "config"] }
This pattern provides the best separation of concerns and scalability.
Recommended: Use external configuration files (task-queue.toml or task-queue.yaml) for production deployments:
1. Create the configuration file:
Create a configuration file task-queue.toml at the root of your project. Copy & past the content from this
template task-queue.toml. The easy way to configure your worker is through
your configuration file. Adjust it according to your need. Use the default values if you don't know how to adjust it.
2. Worker configuration:
Copy task-worker.rs to your src/bin/ folder, and update it by importing your tasks to
ensure they are discoverable by the auto register.
use rust_task_queue::cli::start_worker;
// Import tasks to ensure they're compiled into this binary
// This is ESSENTIAL for auto-registration to work with the inventory pattern
// Without this import, the AutoRegisterTask derive macros won't be executed
// and the tasks won't be submitted to the inventory for auto-discovery
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("Starting Task Worker with Auto-Configuration");
println!("Looking for task-queue.toml, task-queue.yaml, or environment variables...");
// Use the simplified consumer helper which handles all the configuration automatically
start_worker().await
}
Update your Cargo.toml to include the worker cli
# Bin declaration required to launch the worker.
[[bin]]
name = "task-worker"
path = "src/bin/task-worker.rs"
3. Web Application (web-only mode with Actix):
use rust_task_queue::prelude::*;
use rust_task_queue::queue::queue_names;
use actix_web::{web, App, HttpServer, HttpResponse, Result as ActixResult};
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize, Default, AutoRegisterTask)]
struct ProcessOrderTask {
order_id: String,
customer_email: String,
amount: f64,
}
#[async_trait]
impl Task for ProcessOrderTask {
async fn execute(&self) -> Result<serde_json::Value, Box<dyn std::error::Error + Send + Sync>> {
println!("Processing order: {}", self.order_id);
// Your processing logic here
Ok(serde_json::json!({"status": "completed", "order_id": self.order_id}))
}
fn name(&self) -> &str {
"process_order"
}
}
async fn create_order(
task_queue: web::Data<Arc<TaskQueue>>,
order_data: web::Json<ProcessOrderTask>,
) -> ActixResult<HttpResponse> {
match task_queue.enqueue(order_data.into_inner(), queue_names::DEFAULT).await {
Ok(task_id) => Ok(HttpResponse::Ok().json(serde_json::json!({
"task_id": task_id,
"status": "queued"
}))),
Err(e) => Ok(HttpResponse::InternalServerError().json(serde_json::json!({
"error": e.to_string()
})))
}
}
#[tokio::main]
async fn main() -> std::io::Result<()> {
// Create the task_queue automatically (based on your environment variables or your task-queue.toml)
let task_queue = TaskQueueBuilder::auto().build().await?;
println!("Starting web server at http://localhost:3000");
println!("Start workers separately with configuration file:");
println!("cargo run --bin task-worker");
HttpServer::new(move || {
App::new()
.app_data(web::Data::new(task_queue.clone()))
.route("/order", web::post().to(create_order))
.configure(rust_task_queue::actix::configure_task_queue_routes)
})
.bind("0.0.0.0:3000")?
.run()
.await
}
Now you can start web server
# Start the web server
cargo run
4. Start Workers in Separate Terminal:
# Start workers based
cargo run --bin task-worker
The same pattern works with Axum, providing a modern async web framework option:
1. Create the worker configuration file same as the Actix example above
2. Do the worker configuration same as the Actix example above
3. Web Application (web-only mode using Axum):
use rust_task_queue::prelude::*;
use rust_task_queue::queue::queue_names;
use axum::{extract::State, response::Json, Router};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
#[derive(Debug, Serialize, Deserialize, Default, AutoRegisterTask)]
struct ProcessOrderTask {
order_id: String,
customer_email: String,
amount: f64,
}
#[async_trait]
impl Task for ProcessOrderTask {
async fn execute(&self) -> Result<Vec<u8>, Box<dyn std::error::Error + Send + Sync>> {
println!("Processing order: {}", self.order_id);
// Your processing logic here
let response = serde_json::json!({"status": "completed", "order_id": self.order_id});
Ok(rmp_serde::to_vec(&response)?)
}
fn name(&self) -> &str {
"process_order"
}
}
async fn create_order(
State(task_queue): State<Arc<TaskQueue>>,
Json(order_data): Json<ProcessOrderTask>,
) -> Json<serde_json::Value> {
match task_queue.enqueue(order_data, queue_names::DEFAULT).await {
Ok(task_id) => Json(serde_json::json!({
"task_id": task_id,
"status": "queued"
})),
Err(e) => Json(serde_json::json!({
"error": e.to_string()
}))
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create the task_queue automatically (based on your environment variables or your task-queue.toml)
let task_queue = TaskQueueBuilder::auto().build().await?;
println!("🌐 Starting Axum web server at http://localhost:3000");
println!("💡 Start workers separately with: cargo run --bin task-worker");
// Build our application with routes
let app = Router::new()
.route("/order", axum::routing::post(create_order))
.merge(rust_task_queue::axum::configure_task_queue_routes())
.with_state(task_queue);
// Run the server
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await?;
axum::serve(listener, app).await?;
Ok(())
}
Now you can start web server
# Start the web server
cargo run
2. Use the same worker commands as the Actix example above
The repository includes comprehensive examples:
For simpler deployments, you can run everything in one process:
use rust_task_queue::prelude::*;
use rust_task_queue::queue::queue_names;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let task_queue = TaskQueueBuilder::new("redis://localhost:6379")
.auto_register_tasks()
.initial_workers(4) // Workers start automatically
.with_scheduler()
.with_autoscaler()
.build()
.await?;
// Your application logic here
Ok(())
}
use rust_task_queue::prelude::*;
use rust_task_queue::queue::queue_names;
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize)]
struct MyTask {
data: String,
}
#[async_trait]
impl Task for MyTask {
async fn execute(&self) -> Result<serde_json::Value, Box<dyn std::error::Error + Send + Sync>> {
println!("Processing: {}", self.data);
Ok(serde_json::json!({"status": "completed"}))
}
fn name(&self) -> &str {
"my_task"
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create and configure task queue with enhanced auto-scaling
let task_queue = TaskQueueBuilder::new("redis://localhost:6379")
.initial_workers(4)
.with_scheduler()
.with_autoscaler() // Uses enhanced multi-dimensional auto-scaling
.build()
.await?;
// Enqueue a task using predefined queue constants
let task = MyTask { data: "Hello, World!".to_string() };
let task_id = task_queue.enqueue(task, queue_names::DEFAULT).await?;
println!("Enqueued task: {}", task_id);
Ok(())
}
For programmatic control, use the enhanced configuration API:
use rust_task_queue::prelude::*;
use rust_task_queue::{ScalingTriggers, SLATargets, AutoScalerConfig};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Enhanced auto-scaling configuration
let autoscaler_config = AutoScalerConfig {
min_workers: 2,
max_workers: 50,
scale_up_count: 3,
scale_down_count: 1,
// Multi-dimensional scaling triggers
scaling_triggers: ScalingTriggers {
queue_pressure_threshold: 1.5,
worker_utilization_threshold: 0.80,
task_complexity_threshold: 2.0,
error_rate_threshold: 0.05,
memory_pressure_threshold: 512.0,
},
// Adaptive learning settings
enable_adaptive_thresholds: true,
learning_rate: 0.1,
adaptation_window_minutes: 30,
// Stability controls
scale_up_cooldown_seconds: 120,
scale_down_cooldown_seconds: 600,
consecutive_signals_required: 2,
// SLA targets for optimization
target_sla: SLATargets {
max_p95_latency_ms: 5000.0,
min_success_rate: 0.95,
max_queue_wait_time_ms: 10000.0,
target_worker_utilization: 0.70,
},
};
let task_queue = TaskQueueBuilder::new("redis://localhost:6379")
.auto_register_tasks()
.initial_workers(4)
.with_scheduler()
.with_autoscaler_config(autoscaler_config)
.build()
.await?;
// Monitor enhanced auto-scaling
let recommendations = task_queue.autoscaler()
.get_scaling_recommendations()
.await?;
println!("Auto-scaling recommendations:\n{}", recommendations);
Ok(())
}
The framework provides predefined queue constants for type safety and consistency:
use rust_task_queue::queue::queue_names;
// Available queue constants
queue_names::DEFAULT // "default" - Standard priority tasks
queue_names::HIGH_PRIORITY // "high_priority" - High priority tasks
queue_names::LOW_PRIORITY // "low_priority" - Background tasks
With the auto-register feature, tasks can be automatically discovered and registered:
use rust_task_queue::prelude::*;
use rust_task_queue::queue::queue_names;
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize, Default, AutoRegisterTask)]
struct MyTask {
data: String,
}
#[async_trait]
impl Task for MyTask {
async fn execute(&self) -> Result<serde_json::Value, Box<dyn std::error::Error + Send + Sync>> {
println!("Processing: {}", self.data);
Ok(serde_json::json!({"status": "completed"}))
}
fn name(&self) -> &str {
"my_task"
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Tasks with AutoRegisterTask are automatically discovered!
let task_queue = TaskQueueBuilder::new("redis://localhost:6379")
.auto_register_tasks()
.initial_workers(4)
.with_scheduler()
.with_autoscaler()
.build()
.await?;
// No manual registration needed!
let task = MyTask { data: "Hello, World!".to_string() };
let task_id = task_queue.enqueue(task, queue_names::DEFAULT).await?;
println!("Enqueued task: {}", task_id);
Ok(())
}
You can also use the attribute macro for custom task names:
#[register_task("custom_name")]
#[derive(Debug, Serialize, Deserialize)]
struct MyTask {
data: String,
}
impl Default for MyTask {
fn default() -> Self {
Self { data: String::new() }
}
}
The framework includes a powerful CLI tool for running workers in separate processes, now with enhanced auto-scaling support.
# Start workers based on your root task-queue.toml configuration file (recommended)
cargo run --bin task-worker
# Workers with custom auto-scaling thresholds
cargo run --bin task-worker -- \
--workers 4 \
--enable-autoscaler \
--autoscaler-min-workers 2 \
--autoscaler-max-workers 20 \
--autoscaler-scale-up-threshold 1.5 \
--autoscaler-consecutive-signals 3
# Monitor auto-scaling in real-time
cargo run --bin task-worker -- \
--workers 6 \
--enable-autoscaler \
--enable-scheduler \
--log-level debug # See detailed auto-scaling decisions
--redis-url, -r: Redis connection URL (default: redis://127.0.0.1:6379)--workers, -w: Number of initial workers (default: 4)--enable-autoscaler, -a: Enable enhanced multi-dimensional auto-scaling--enable-scheduler, -s: Enable task scheduler for delayed tasks--queues, -q: Comma-separated list of queue names to process--worker-prefix: Custom prefix for worker names--config, -c: Path to enhanced configuration file (recommended)--log-level: Logging level (trace, debug, info, warn, error)--log-format: Log output format (json, compact, pretty)--autoscaler-min-workers: Minimum workers for auto-scaling--autoscaler-max-workers: Maximum workers for auto-scaling--autoscaler-consecutive-signals: Required consecutive signals for scalingOur enhanced auto-scaling system analyzes 5 key metrics simultaneously for intelligent scaling decisions:
The system automatically adjusts scaling triggers based on actual performance vs. your SLA targets:
[autoscaler.target_sla]
max_p95_latency_ms = 3000.0 # 3 second P95 latency target
min_success_rate = 0.99 # 99% success rate target
max_queue_wait_time_ms = 5000.0 # 5-second max queue wait
target_worker_utilization = 0.75 # optimal 75% worker utilization
Advanced hysteresis and cooldown mechanisms prevent scaling oscillations:
The project maintains comprehensive test coverage across multiple dimensions:
Total: 192 tests ensuring reliability and performance
The framework includes comprehensive structured logging and tracing capabilities for production systems:
use rust_task_queue::prelude::*;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Configure structured logging for production
configure_production_logging(
rust_task_queue::LogLevel::Info,
rust_task_queue::LogFormat::Json
);
let task_queue = TaskQueueBuilder::new("redis://localhost:6379")
.auto_register_tasks()
.initial_workers(4)
.build()
.await?;
Ok(())
}
# Configure logging via environment variables
export LOG_LEVEL=info # trace, debug, info, warn, error
export LOG_FORMAT=json # json, compact, pretty
# Start worker with production logging
cargo run --bin task-worker
The tracing system provides detailed performance insights:
The framework includes a production-ready metrics API with 15+ endpoints for monitoring and diagnostics:
/tasks/health - Detailed health check with component status (Redis, workers, scheduler)/tasks/status - System status with health metrics and worker information/tasks/metrics - Comprehensive metrics combining all available data/tasks/metrics/system - Enhanced system metrics with memory and performance data/tasks/metrics/performance - Performance report with task execution metrics and SLA data/tasks/metrics/autoscaler - AutoScaler metrics and scaling recommendations/tasks/metrics/queues - Individual queue metrics for all queues/tasks/metrics/workers - Worker-specific metrics and status/tasks/metrics/memory - Memory usage metrics and tracking/tasks/metrics/summary - Quick metrics summary for debugging/tasks/registered - Auto-registered tasks information/tasks/registry/info - Detailed task registry information and features/tasks/alerts - Active alerts from the metrics system/tasks/sla - SLA status and violations with performance percentages/tasks/diagnostics - Comprehensive diagnostics with queue health analysis/tasks/uptime - System uptime and runtime informationqueue_names::*)max_concurrent_tasks based on workload characteristicstask-queue.toml) instead of hardcoded valuesmax_concurrent_tasks or optimize task execution timeActively Developed - Regular releases, responsive to issues, feature requests welcome.
Compatibility:
We welcome contributions! Please see our Development Guide for:
Licensed under either of Apache License, Version 2.0 or MIT License at your option.