| Crates.io | ash-flare |
| lib.rs | ash-flare |
| version | 1.1.0 |
| created_at | 2025-11-17 22:16:08.805311+00 |
| updated_at | 2025-11-23 12:40:27.312587+00 |
| description | Fault-tolerant supervision trees for Rust with distributed capabilities inspired by Erlang/OTP |
| homepage | https://github.com/gntem/ash-flare |
| repository | https://github.com/gntem/ash-flare |
| max_upload_size | |
| id | 1937613 |
| size | 399,732 |
Fault-tolerant supervision trees for Rust with distributed capabilities inspired by Erlang/OTP. Build resilient systems that automatically recover from failures with supervisor trees, restart strategies, and distributed supervision.
OneForOne, OneForAll, and RestForOne strategiesPermanent, Temporary, and Transient restart behaviorsStatefulSupervisorSpec)slog structured loggingAdd to your Cargo.toml:
cargo add ash-flare
use ash_flare::{SupervisorSpec, RestartPolicy, Worker};
use async_trait::async_trait;
// Define your worker
struct Counter {
id: u32,
max: u32,
}
#[async_trait]
impl Worker for Counter {
type Error = std::io::Error;
async fn run(&mut self) -> Result<(), Self::Error> {
for i in 0..self.max {
println!("Counter {}: {}", self.id, i);
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
Ok(())
}
}
#[tokio::main]
async fn main() {
// Build supervisor tree
let spec = SupervisorSpec::new("root")
.with_worker("counter-1", || Counter { id: 1, max: 5 }, RestartPolicy::Permanent)
.with_worker("counter-2", || Counter { id: 2, max: 5 }, RestartPolicy::Permanent);
// Start supervision tree
let handle = SupervisorHandle::start(spec);
// Query children
let children = handle.which_children().await.unwrap();
println!("Running children: {}", children.len());
// Graceful shutdown
handle.shutdown().await.unwrap();
}
Restarts only the failed child (default):
use ash_flare::{SupervisorSpec, RestartStrategy};
let spec = SupervisorSpec::new("supervisor")
.with_restart_strategy(RestartStrategy::OneForOne);
Restarts all children if any child fails:
let spec = SupervisorSpec::new("supervisor")
.with_restart_strategy(RestartStrategy::OneForAll);
Restarts the failed child and all children started after it:
let spec = SupervisorSpec::new("supervisor")
.with_restart_strategy(RestartStrategy::RestForOne);
Control when a child should be restarted:
use ash_flare::RestartPolicy;
// Always restart (default)
RestartPolicy::Permanent
// Never restart
RestartPolicy::Temporary
// Restart only on abnormal termination
RestartPolicy::Transient
Build hierarchical supervision trees:
let database_supervisor = SupervisorSpec::new("database")
.with_worker("db-pool", || DbPool::new(), RestartPolicy::Permanent)
.with_worker("db-cache", || DbCache::new(), RestartPolicy::Transient);
let app_supervisor = SupervisorSpec::new("app")
.with_supervisor(database_supervisor)
.with_worker("http-server", || HttpServer::new(), RestartPolicy::Permanent);
let handle = SupervisorHandle::start(app_supervisor);
Configure maximum restart attempts within a time window:
use ash_flare::RestartIntensity;
let spec = SupervisorSpec::new("supervisor")
.with_restart_intensity(RestartIntensity {
max_restarts: 5, // Maximum restarts
within_seconds: 10, // Within time window
});
Use StatefulSupervisorSpec for workers that need to share state via an in-memory KV store:
use ash_flare::{StatefulSupervisorSpec, StatefulSupervisorHandle, WorkerContext};
use std::sync::Arc;
struct AuctionWorker {
id: u32,
ctx: Arc<WorkerContext>,
}
#[async_trait]
impl Worker for AuctionWorker {
type Error = std::io::Error;
async fn run(&mut self) -> Result<(), Self::Error> {
// Read from shared store
let current_bid = self.ctx.get("highest_bid")
.and_then(|v| v.as_u64())
.unwrap_or(0);
// Update shared store
self.ctx.set("highest_bid", serde_json::json!(current_bid + 100));
// Atomic update
self.ctx.update("bid_count", |v| {
let count = v.and_then(|v| v.as_u64()).unwrap_or(0);
Some(serde_json::json!(count + 1))
});
Ok(())
}
}
// Create stateful supervisor (WorkerContext auto-initialized)
let spec = StatefulSupervisorSpec::new("auction-supervisor")
.with_worker(
"auction-worker",
|ctx: Arc<WorkerContext>| AuctionWorker { id: 1, ctx },
RestartPolicy::Permanent,
);
let handle = StatefulSupervisorHandle::start(spec);
Or use the stateful_supervision_tree! macro for a more declarative approach:
use ash_flare::stateful_supervision_tree;
let spec = stateful_supervision_tree! {
name: "auction-supervisor",
strategy: OneForOne,
intensity: (5, 10),
workers: [
("bidder-1", |ctx| AuctionWorker::new(1, ctx), Permanent),
("bidder-2", |ctx| AuctionWorker::new(2, ctx), Permanent),
],
supervisors: []
};
WorkerContext API:
get(key) - Retrieve a valueset(key, value) - Store a valuedelete(key) - Remove a keyupdate(key, fn) - Atomic update with a functionThe store is process-local, concurrent-safe (backed by DashMap), and persists across worker restarts.
Add and remove children at runtime:
// Dynamically add a worker
let child_id = handle
.start_child("dynamic-worker", || MyWorker::new(), RestartPolicy::Temporary)
.await
.unwrap();
// Terminate a specific child
handle.terminate_child(&child_id).await.unwrap();
// List all running children
let children = handle.which_children().await.unwrap();
Run supervisors across processes or machines:
use ash_flare::distributed::{SupervisorServer, RemoteSupervisorHandle};
// Start supervisor server
let handle = SupervisorHandle::start(spec);
let server = SupervisorServer::new(handle);
tokio::spawn(async move {
server.listen_tcp("127.0.0.1:8080").await.unwrap();
});
// Connect from another process/machine
let remote = RemoteSupervisorHandle::connect_tcp("127.0.0.1:8080").await.unwrap();
let children = remote.which_children().await.unwrap();
remote.shutdown().await.unwrap();
Implement the Worker trait with optional lifecycle hooks:
use ash_flare::Worker;
use async_trait::async_trait;
struct MyWorker;
#[async_trait]
impl Worker for MyWorker {
type Error = std::io::Error;
async fn initialize(&mut self) -> Result<(), Self::Error> {
// Called once before run()
println!("Worker initializing...");
Ok(())
}
async fn run(&mut self) -> Result<(), Self::Error> {
// Main worker loop
loop {
// Do work...
}
}
async fn shutdown(&mut self) -> Result<(), Self::Error> {
// Called during graceful shutdown
println!("Worker shutting down...");
Ok(())
}
}
Workers return errors that trigger restart policies:
#[async_trait]
impl Worker for MyWorker {
type Error = MyError;
async fn run(&mut self) -> Result<(), Self::Error> {
match self.do_work().await {
Ok(_) => Ok(()), // Normal termination
Err(e) => Err(e), // Triggers restart based on policy
}
}
}
Ash Flare uses slog for structured logging. To see logs, set up a global logger:
use slog::{Drain, Logger, o};
use slog_async::Async;
use slog_term::{FullFormat, TermDecorator};
fn main() {
// Set up logger
let decorator = TermDecorator::new().build();
let drain = FullFormat::new(decorator).build().fuse();
let drain = Async::new(drain).build().fuse();
let logger = Logger::root(drain, o!());
// Set as global logger
let _guard = slog_scope::set_global_logger(logger);
// Your supervision tree code here...
}
Logs include structured data for easy filtering:
INFO server listening on tcp; address: "127.0.0.1:8080"
DEBUG child terminated; supervisor: "root", child: "worker-1", reason: Normal
ERROR restart intensity exceeded, shutting down; supervisor: "root"
Check the examples/ directory for more:
counter.rs - Basic supervisor with multiple workersdistributed.rs - Network-distributed supervisorssuper_tree.rs - Complex nested supervision treesinteractive_demo.rs - Interactive supervisor managementRun an example:
cargo run --example counter
MIT License - see LICENSE file for details.
Inspired by Erlang/OTP's in some way.
Some code generated with the help of AI tools.