| Crates.io | apalis-core |
| lib.rs | apalis-core |
| version | 1.0.0-rc.2 |
| created_at | 2021-05-14 14:30:57.756704+00 |
| updated_at | 2026-01-07 05:40:43.065696+00 |
| description | Core for apalis: simple, extensible multithreaded background processing for Rust |
| homepage | |
| repository | https://github.com/apalis-dev/apalis |
| max_upload_size | |
| id | 397389 |
| size | 368,999 |
A high-performance, type-safe task processing framework for rust.
apalis-core provides the fundamental abstractions and runtime components for building
scalable background task systems with middleware support, graceful shutdown, and monitoring capabilities.
This is advanced documentation, for guide level documentation is found on the website.
apalis-core is built around four primary abstractions that provide a flexible and extensible task processing system:
Tasks: Type-safe task data structures with processing metadataBackends: Pluggable task storage and streaming implementationsWorkers: Task processing engines with lifecycle managementMonitor: Multi-worker coordination and observabilityThe framework leverages the tower service abstraction to provide a rich middleware
ecosystem like error handling, timeouts, rate limiting,
and observability.
The task struct provides type-safe components for task data and metadata:
Args - The primary structure for the taskParts - Wrapper type for information for task execution includes context, status, attempts, task_id and metadataContext - contextual information with the task provided by the backendStatus - Represents the current state of a taskTaskId - Unique identifier for task trackingAttempt - Retry tracking and attempt informationExtensions - Type-safe storage for additional task dataMetadata - metadata associated with the taskTaskBuilderlet task: Task<String, ()> = TaskBuilder::new("my-task".to_string())
.id("task-123".into())
.attempts(3)
.timeout(Duration::from_secs(30))
.run_in_minutes(10)
.build();
Specific documentation for tasks can be found in the [task] and [task::builder] modules.
The Backend trait serves as the core abstraction for all task sources.
It defines task polling mechanisms, streaming interfaces, and middleware integration points.
Stream - Defines the task stream type for polling operationsLayer - Specifies the middleware layer stack for the backendCodec - Determines serialization format for task data persistenceBeat - Heartbeat stream for worker liveness checksIdType - Type used for unique task identifiersCtx - Context associated with tasksError - Error type for backend operationsMemoryStorage : In-memory storage based on channelsPipe : Pipe-based backend for a stream-to-backend pipelineCustomBackend : Flexible backend composition allowing custom functions for task managementBackends handle task persistence, distribution, and reliability concerns while providing a uniform interface for worker consumption.
The Worker is the core runtime component responsible for task polling, execution, and lifecycle management:
The following are the main components the worker module:
WorkerBuilder] - Fluent builder for configuring and constructing workersWorker] - Actual worker implementation that processes tasksWorkerContext] - Runtime state including task counts and execution statusEvent] - Worker event enumeration (Start, Engage, Idle, Error, Stop)Ext - Extension traits and middleware for adding functionality to workers#[tokio::main]
async fn main() {
let mut in_memory = MemoryStorage::new();
in_memory.push(1u32).await.unwrap();
async fn task(
task: u32,
worker: WorkerContext,
) -> Result<(), BoxDynError> {
/// Do some work
tokio::time::sleep(Duration::from_secs(1)).await;
worker.stop().unwrap();
Ok(())
}
let worker = WorkerBuilder::new("rango-tango")
.backend(in_memory)
.on_event(|ctx, ev| {
println!("On Event = {:?}, {:?}", ev, ctx.name());
})
.build(task);
worker.run().await.unwrap();
}
Learn more about workers in the worker and worker::builder modules.
TaskFn] traitTestWorker - Specialized worker implementation for unit and integration testingThe Monitor helps manage and coordinate multiple workers:
Main Features:
Monitor with a Worker#[tokio::main]
async fn main() {
let mut storage = JsonStorage::new_temp().unwrap();
storage.push(1u32).await.unwrap();
let monitor = Monitor::new()
.on_event(|ctx, event| println!("{}: {:?}", ctx.name(), event))
.register(move |_| {
WorkerBuilder::new("demo-worker")
.backend(storage.clone())
.build(|req: u32, ctx: WorkerContext| async move {
println!("Processing task: {:?}", req);
Ok::<_, std::io::Error>(req)
})
});
// Start monitor and run all registered workers
monitor.run().await.unwrap();
}
Learn more about the monitor in the [monitor module](https://docs.rs/apalis-core/1.0.0-rc.2/apalis_core/monitor/index.html.
Built on the tower ecosystem, apalis-core provides extensive middleware support like error handling, timeouts, rate limiting, and observability.
The following middleware layers are included with their worker extensions:
AcknowledgmentLayer] - Task acknowledgment after processingEventListenerLayer] - Worker event emission and handlingCircuitBreakerLayer] - Circuit breaker pattern for failure handlingLongRunningLayer] - Support for tracking long-running tasksYou can write your own middleware to run code before or after a task is processed.
Here's a simple example of a logging middleware layer:
use apalis_core::task::Task;
use tower::{Layer, Service};
use std::task::{Context, Poll};
// Define a logging service that wraps another service
pub struct LoggingService<S> {
inner: S,
}
impl<S, Req, Res, Err, IdType> Service<Task<Req, (), IdType>> for LoggingService<S>
where
S: Service<Task<Req, (), IdType>, Response = Res, Error = Err>,
Req: std::fmt::Debug,
{
type Response = Res;
type Error = Err;
type Future = S::Future;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, req: Task<Req, (), IdType>) -> Self::Future {
println!("Processing task: {:?}", req.args);
self.inner.call(req)
}
}
// Define a layer that wraps services with LoggingService
pub struct LoggingLayer;
impl<S> Layer<S> for LoggingLayer {
type Service = LoggingService<S>;
fn layer(&self, service: S) -> Self::Service {
LoggingService { inner: service }
}
}
If you want your middleware to do more than just intercept requests and responses, you can use extension traits. See the worker::ext module for examples.
apalis-core defines a comprehensive error taxonomy for robust error handling:
AbortError] - Non-retryable fatal errors requiring immediate terminationRetryAfterError] - Retryable execution errors triggering retry mechanisms after a delayDeferredError] - Retryable execution errors triggering immediate retryThis error classification enables precise error handling strategies and appropriate retry behavior for different failure scenarios.
apalis-core has a reliable graceful shutdown system that makes sure
workers stop safely and all tasks finish before shutting down:
Key Features:
Shutdown] token helps all workers stop together.with_terminator.Learn more about the graceful shutdown process in the monitor module.
License: MIT