| Crates.io | queue_workers |
| lib.rs | queue_workers |
| version | 0.5.1 |
| created_at | 2025-04-12 16:16:50.607341+00 |
| updated_at | 2025-04-20 22:19:30.571539+00 |
| description | A Redis-backed job queue system for Rust applications |
| homepage | |
| repository | https://github.com/armn3t/queue-workers |
| max_upload_size | |
| id | 1631055 |
| size | 184,227 |
⚠️ Work in Progress: This crate is under active development and is not yet ready for production use. The API is unstable and may undergo significant changes. Feel free to experiment and provide feedback, but please do not use in production environments.
A Redis-backed job queue system for Rust applications with support for retries and concurrent workers.
Cargo.toml:[dependencies]
queue_workers = "0.1.0"
use serde::{Serialize, Deserialize};
use async_trait::async_trait;
use queue_workers::job::Job;
#[derive(Debug, Serialize, Deserialize)]
struct EmailJob {
id: String,
to: String,
subject: String,
body: String,
}
#[async_trait]
impl Job for EmailJob {
type Output = String;
type Error = String;
async fn execute(&self) -> Result<Self::Output, Self::Error> {
// Implement your job logic here
Ok(format!("Email sent to {}", self.to))
}
}
use queue_workers::{redis_queue::RedisQueue, worker::{Worker, WorkerConfig}};
use std::time::Duration;
#[tokio::main]
async fn main() {
// Initialize the queue
let queue = RedisQueue::<EmailJob>::new(
"redis://127.0.0.1:6379",
"email_queue"
).expect("Failed to create queue");
// Configure the worker
let config = WorkerConfig {
retry_attempts: 3,
retry_delay: Duration::from_secs(5),
shutdown_timeout: Duration::from_secs(30),
};
// Create and start the worker
let worker = Worker::new(queue.clone(), config);
// Push a job
let job = EmailJob {
id: "email-1".to_string(),
to: "user@example.com".to_string(),
subject: "Hello".to_string(),
body: "World".to_string(),
};
queue.push(job).await.expect("Failed to push job");
// Start processing jobs
worker.start().await.expect("Worker failed");
}
git clone https://github.com/yourusername/queue_workers.git
cd queue_workers
rustup component add rustfmt
rustup component add clippy
chmod +x scripts/setup-git-hooks.sh
./scripts/setup-git-hooks.sh
docker-compose up -d redis
cargo test
This project enforces code quality through:
rustfmtclippyTo manually run the checks:
# Check formatting
cargo fmt -- --check
# Run clippy
cargo clippy -- -D warnings
These checks run automatically:
The WorkerConfig struct allows you to customize worker behavior:
let config = WorkerConfig {
retry_attempts: 3, // Number of retry attempts for failed jobs
retry_delay: Duration::from_secs(5), // Delay between retries
shutdown_timeout: Duration::from_secs(30), // Graceful shutdown timeout
};
The Redis queue can be configured with a Redis URL and queue name:
let queue = RedisQueue::<MyJob>::new(
"redis://username:password@hostname:6379/0", // Redis URL with authentication
"my_queue_name"
).expect("Failed to create queue");
The queue supports both FIFO (First In, First Out) and LIFO (Last In, First Out) behaviors:
use queue_workers::{redis_queue::RedisQueue, queue::QueueType};
// Create a FIFO queue (default behavior)
let fifo_queue = RedisQueue::<MyJob>::new(redis_url, "fifo_queue")?;
// Create a LIFO queue
let lifo_queue = RedisQueue::<MyJob>::with_type(
redis_url,
"lifo_queue",
QueueType::LIFO
)?;
You can run multiple workers processing the same queue:
let queue = RedisQueue::<EmailJob>::new(redis_url, "email_queue")?;
// Spawn multiple workers
for _ in 0..3 {
let worker_queue = queue.clone();
let worker = Worker::new(worker_queue, WorkerConfig::default());
tokio::spawn(async move {
worker.start().await.expect("Worker failed");
});
}
The library provides a custom error type QueueWorkerError that covers various failure scenarios:
The library provides two types of workers:
Processes jobs one at a time, with retry support:
use queue_workers::{
redis_queue::RedisQueue,
worker::{Worker, WorkerConfig}
};
let config = WorkerConfig {
retry_attempts: 3,
retry_delay: Duration::from_secs(5),
shutdown_timeout: Duration::from_secs(30),
};
let worker = Worker::new(queue.clone(), config);
worker.start().await?;
Processes multiple jobs in parallel:
use queue_workers::{
redis_queue::RedisQueue,
concurrent_worker::{ConcurrentWorker, ConcurrentWorkerConfig}
};
let config = ConcurrentWorkerConfig {
max_concurrent_jobs: 5, // Process 5 jobs simultaneously
retry_attempts: 3,
retry_delay: Duration::from_secs(5),
shutdown_timeout: Duration::from_secs(30),
};
let worker = ConcurrentWorker::new(queue.clone(), config);
worker.start().await?;
// Or with shutdown support:
let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel(1);
let worker = ConcurrentWorker::new(queue.clone(), config);
worker.start_with_shutdown(shutdown_rx).await?;
This library uses the log crate as a logging facade, allowing you to choose your preferred logging implementation. For basic usage:
// Using env_logger
env_logger::init();
// Or using tracing
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.init();
For detailed logging configuration, including production setups and testing configurations, see LOGGING.md.
git checkout -b feature/amazing-feature)git commit -m 'Add some amazing feature')git push origin feature/amazing-feature)This project is licensed under the MIT License - see the LICENSE file for details.