| Crates.io | asynq |
| lib.rs | asynq |
| version | 0.1.5 |
| created_at | 2025-10-08 12:21:34.54017+00 |
| updated_at | 2026-01-10 08:38:48.796401+00 |
| description | Simple, reliable & efficient distributed task queue in Rust, inspired by hibiken/asynq |
| homepage | https://github.com/emo-crab/asynq |
| repository | https://github.com/emo-crab/asynq |
| max_upload_size | |
| id | 1873947 |
| size | 697,071 |
Asynq is a simple, reliable, and efficient distributed task queue library written in Rust, backed by Redis, inspired by hibiken/asynq.
🔗 Fully Compatible with Go asynq: This implementation is fully compatible with the Go version of hibiken/asynq, allowing seamless interoperation with Go services.
Add to your Cargo.toml:
[dependencies]
asynq = { version = "0.1", features = ["json"] }
## Enable macro support (optional)
# asynq = { version = "0.1", features = ["json", "macros"] }
## or dev channel
#asynq = { git = "https://github.com/emo-crab/asynq", branch = "main" }
tokio = { version = "1.0", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
use asynq::{client::Client, task::Task, redis::RedisConnectionType};
use serde::{Serialize, Deserialize};
#[derive(Serialize, Deserialize)]
struct EmailPayload {
to: String,
subject: String,
body: String,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create Redis configuration
let redis_config = RedisConnectionType::single("redis://127.0.0.1:6379")?;
// Create client
let client = Client::new(redis_config).await?;
// Create task
let payload = EmailPayload {
to: "user@example.com".to_string(),
subject: "Welcome!".to_string(),
body: "Welcome to our service!".to_string(),
};
let task = Task::new_with_json("email:send", &payload)?;
// Enqueue task
let task_info = client.enqueue(task).await?;
println!("Task enqueued with ID: {}", task_info.id);
Ok(())
}
use asynq::{server::Server,server::Handler,task::Task, redis::RedisConnectionType, config::ServerConfig};
use async_trait::async_trait;
use serde::{Serialize, Deserialize};
use std::collections::HashMap;
#[derive(Serialize, Deserialize)]
struct EmailPayload {
to: String,
subject: String,
body: String,
}
struct EmailProcessor;
#[async_trait]
impl Handler for EmailProcessor {
async fn process_task(&self, task: Task) -> asynq::error::Result<()> {
match task.get_type() {
"email:send" => {
let payload: EmailPayload = task.get_payload_with_json()?;
println!("Sending email to: {}", payload.to);
// Implement actual email sending logic
Ok(())
}
_ => {
Err(asynq::error::Error::other("Unknown task type"))
}
}
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Redis configuration
let redis_config = RedisConnectionType::single("redis://127.0.0.1:6379")?;
// Configure queues
let mut queues = HashMap::new();
queues.insert("critical".to_string(), 6);
queues.insert("default".to_string(), 3);
queues.insert("low".to_string(), 1);
// Server configuration
let config = ServerConfig::new()
.concurrency(4)
.queues(queues);
// Create server
let mut server = Server::new(redis_config, config).await?;
// Start server
server.run(EmailProcessor).await?;
Ok(())
}
ServeMux provides Go-like task routing functionality, automatically routing tasks to different handlers based on task type:
use asynq::{serve_mux::ServeMux, task::Task, redis::RedisConnectionType, config::ServerConfig, server::ServerBuilder};
use std::collections::HashMap;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let redis_config = RedisConnectionType::single("redis://127.0.0.1:6379")?;
// Create ServeMux
let mut mux = ServeMux::new();
// Register synchronous handler
mux.handle_func("email:send", |task: Task| {
println!("Processing email:send {:?}",task);
Ok(())
});
// Register asynchronous handler
mux.handle_async_func("image:resize", |task: Task| async move {
println!("Processing image:resize {:?}",task);
// Async processing logic
Ok(())
});
mux.handle_func("payment:process", |task: Task| {
println!("Processing payment {:?}",task);
Ok(())
});
// Configure server
let mut queues = HashMap::new();
queues.insert("default".to_string(), 3);
let config = ServerConfig::new().concurrency(4).queues(queues);
// Create and run server
let mut server = ServerBuilder::new()
.redis_config(redis_config)
.server_config(config)
.build()
.await?;
// ServeMux implements Handler trait, can be passed directly to server.run()
server.run(mux).await?;
Ok(())
}
Features:
handle_func) and asynchronous (handle_async_func) handlersSee examples/servemux_example.rs for more examples.
When the macros feature is enabled, you can use attribute macros similar to actix-web's routing macros for cleaner handler definition:
use asynq::{
serve_mux::ServeMux,
task::Task,
task_handler,
task_handler_async,
register_handlers,
register_async_handlers,
redis::RedisConnectionType,
config::ServerConfig,
server::ServerBuilder
};
use std::collections::HashMap;
// Define handlers with attribute macros
#[task_handler("email:send")]
fn handle_email(task: Task) -> asynq::error::Result<()> {
println!("Processing email:send");
Ok(())
}
#[task_handler_async("image:resize")]
async fn handle_image(task: Task) -> asynq::error::Result<()> {
println!("Processing image:resize");
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let redis_config = RedisConnectionType::single("redis://127.0.0.1:6379")?;
// Create ServeMux and register handlers with convenience macros
let mut mux = ServeMux::new();
register_handlers!(mux, handle_email);
register_async_handlers!(mux, handle_image);
// Configure and run server
let mut queues = HashMap::new();
queues.insert("default".to_string(), 3);
let config = ServerConfig::new().concurrency(4).queues(queues);
let mut server = ServerBuilder::new()
.redis_config(redis_config)
.server_config(config)
.build()
.await?;
server.run(mux).await?;
Ok(())
}
Macro Features:
register_handlers! and register_async_handlers! macros#[get("/path")] routing macrosSee examples/macro_example.rs for a complete example.
use std::time::Duration;
// Execute after 5 minutes delay
client.enqueue_in(task, Duration::from_secs(300)).await?;
use std::time::Duration;
// Keep unique within 1 hour
let unique_task = Task::new_with_json("report:daily", &payload)?;
client.enqueue_unique(unique_task, Duration::from_secs(3600)).await?;
// Add tasks to group for aggregation
for i in 1..=10 {
let item_task = Task::new_with_json("batch:process", &serde_json::json!({"item": i}))?;
client.add_to_group(item_task, "daily_batch").await?;
}
let task = Task::new_with_json("image:resize", &payload)?
.with_queue("image_processing") // Specify queue
.with_max_retry(5) // Maximum retry attempts
.with_timeout(Duration::from_secs(300)) // Timeout
.with_unique_ttl(Duration::from_secs(3600)); // Uniqueness TTL
let mut queues = HashMap::new();
queues.insert("critical".to_string(), 6); // Highest priority
queues.insert("default".to_string(), 3); // Medium priority
queues.insert("low".to_string(), 1); // Low priority
let config = ServerConfig::new()
.queues(queues)
.strict_priority(true); // Strict priority mode
Asynq uses a modular design with main components:
asynq/
├── src/
│ ├── lib.rs # Library entry and public API
│ ├── client.rs # Client implementation
│ ├── server.rs # Server implementation
│ ├── serve_mux.rs # ServeMux routing (compatible with Go serve_mux.go)
│ ├── processor.rs # Processor implementation (compatible with Go processor.go)
│ ├── task.rs # Task data structures
│ ├── error.rs # Error handling
│ ├── config.rs # Configuration management
│ ├── redis.rs # Redis connection management
│ ├── inspector.rs # Queue inspector
│ └── broker/ # Storage backend abstraction
│ ├── mod.rs # Broker trait definition
│ └── redis_broker.rs # Redis implementation
├── proto/
│ └── asynq.proto # Protocol Buffer definitions
└── examples/
├── producer.rs # Producer example
├── consumer.rs # Consumer example
├── servemux_example.rs # ServeMux usage example
└── processor_example.rs # Processor example
The Processor module implements task processing architecture compatible with Go asynq processor.go:
The GroupAggregator module implements task aggregation functionality compatible with Go asynq aggregator.go:
with_group()GroupAggregator traitGroupAggregatorFuncExample usage:
use asynq::components::aggregator::GroupAggregatorFunc;
// Define aggregation function
let aggregator = GroupAggregatorFunc::new(|group, tasks| {
// Merge multiple tasks into a single batch task
let combined = tasks.iter()
.map(|t| t.get_payload())
.collect::<Vec<_>>()
.join(&b"\n"[..]);
Task::new("batch:process", &combined)
});
// Set on server
server.set_group_aggregator(aggregator);
See GROUP_AGGREGATOR.md for more details.
use asynq::config::ServerConfig;
use std::time::Duration;
let config = ServerConfig::new()
.concurrency(8) // Number of concurrent workers
.task_check_interval(Duration::from_secs(1)) // Task check interval
.delayed_task_check_interval(Duration::from_secs(5)) // Delayed task check interval
.shutdown_timeout(Duration::from_secs(10)) // Shutdown timeout
.health_check_interval(Duration::from_secs(15)) // Health check interval
.group_grace_period(Duration::from_secs(60))? // Group aggregation grace period
.group_max_delay(Duration::from_secs(300)) // Group max delay
.group_max_size(100) // Group max size
.janitor_interval(Duration::from_secs(8)) // Janitor interval
.janitor_batch_size(50); // Janitor batch size
use asynq::redis::{RedisConnectionType};
use std::time::Duration;
// Basic configuration
let redis_config = RedisConnectionType::single("redis://127.0.0.1:6379")?;
let nodes = vec!["redis://127.0.0.1:6379/", "redis://127.0.0.1:6378/", "redis://127.0.0.1:6377/"];
let redis_config = RedisConnectionType::cluster(nodes)?;
use asynq::base::keys::TaskState;
use asynq::inspector::Inspector;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let inspector = Inspector::new(broker);
// Get queue statistics
let stats = inspector.get_queue_stats("default").await?;
println!("Pending: {}, Active: {}", stats.pending, stats.active);
// List tasks
let tasks = inspector.list_tasks("default", TaskState::Pending, 1, 10).await?;
// Requeue archived task
inspector.requeue_archived_task("default", "task-id").await?;
// Pause queue
inspector.pause_queue("default").await?;
Ok(())
}
git clone https://github.com/emo-crab/asynq.git
cd asynq
cargo build
docker run -d -p 6379:6379 redis:alpine
# Terminal 1: Start consumer
cargo run --example consumer
# Terminal 2: Run producer
cargo run --example producer
# Unit tests
cargo test
# Integration tests (requires Redis)
cargo test --features integration-tests
We welcome contributions of all kinds! Please read CONTRIBUTING.md for details.
This project is licensed under the MIT License OR GPL License.
If you have any questions or suggestions, please:
⭐ If this project helps you, please give us a star!