| Crates.io | joerl |
| lib.rs | joerl |
| version | 0.7.1 |
| created_at | 2025-12-05 15:01:51.064138+00 |
| updated_at | 2025-12-25 04:49:40.80147+00 |
| description | An Erlang-inspired actor model library for Rust |
| homepage | https://github.com/am-kantox/joerl |
| repository | https://github.com/am-kantox/joerl |
| max_upload_size | |
| id | 1968345 |
| size | 752,137 |
An Erlang-inspired actor model library for Rust, named in tribute to Joe Armstrong, the creator of Erlang.
Add this to your Cargo.toml:
[dependencies]
joerl = "0.4"
tokio = { version = "1", features = ["full"] }
async-trait = "0.1"
use joerl::{Actor, ActorContext, ActorSystem, Message};
use async_trait::async_trait;
// Define your actor
struct Counter {
count: i32,
}
#[async_trait]
impl Actor for Counter {
async fn handle_message(&mut self, msg: Message, _ctx: &mut ActorContext) {
if let Some(cmd) = msg.downcast_ref::<&str>() {
match *cmd {
"increment" => {
self.count += 1;
println!("Count: {}", self.count);
},
"get" => println!("Current count: {}", self.count),
_ => {}
}
}
}
}
#[tokio::main]
async fn main() {
let system = ActorSystem::new();
let counter = system.spawn(Counter { count: 0 });
counter.send(Box::new("increment")).await.unwrap();
counter.send(Box::new("increment")).await.unwrap();
counter.send(Box::new("get")).await.unwrap();
// Give actors time to process
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
Actors are the fundamental unit of computation. Each actor:
Pid (Process ID)Actors communicate by sending messages. Messages are type-erased using Box<dyn Any>:
actor_ref.send(Box::new("hello")).await?;
actor_ref.send(Box::new(42i32)).await?;
Links create bidirectional relationships between actors. If one fails, both fail:
system.link(actor1.pid(), actor2.pid())?;
Monitors create unidirectional observation. The monitoring actor receives a DOWN signal when the monitored actor terminates:
let monitor_ref = actor_ref.monitor(my_pid)?;
Supervisors monitor child actors and restart them according to strategies:
use joerl::{SupervisorSpec, RestartStrategy, ChildSpec};
let spec = SupervisorSpec::new(RestartStrategy::OneForOne)
.child(ChildSpec::new("worker1", || Box::new(Worker::new())))
.child(ChildSpec::new("worker2", || Box::new(Worker::new())));
let supervisor = spawn_supervisor(&system, spec);
Restart Strategies:
OneForOne: Restart only the failed childOneForAll: Restart all children when one failsRestForOne: Restart the failed child and all children started after itFor structured stateful actors with synchronous call/reply and asynchronous cast semantics:
use joerl::gen_server::{GenServer, GenServerContext};
struct Counter;
#[derive(Debug)]
enum CounterCall {
Get,
Add(i32),
}
#[derive(Debug)]
enum CounterCast {
Increment,
}
#[async_trait]
impl GenServer for Counter {
type State = i32;
type Call = CounterCall;
type Cast = CounterCast;
type CallReply = i32;
async fn init(&mut self, _ctx: &mut GenServerContext<'_, Self>) -> Self::State {
0 // Initial state
}
async fn handle_call(
&mut self,
call: Self::Call,
state: &mut Self::State,
_ctx: &mut GenServerContext<'_, Self>,
) -> Self::CallReply {
match call {
CounterCall::Get => *state,
CounterCall::Add(n) => {
*state += n;
*state
}
}
}
async fn handle_cast(
&mut self,
cast: Self::Cast,
state: &mut Self::State,
_ctx: &mut GenServerContext<'_, Self>,
) {
match cast {
CounterCast::Increment => *state += 1,
}
}
}
// Usage
let counter = gen_server::spawn(&system, Counter);
let value = counter.call(CounterCall::Get).await?; // Synchronous
counter.cast(CounterCast::Increment).await?; // Asynchronous
For finite state machines, joerl provides a powerful DSL using Mermaid state diagrams with compile-time validation:
use joerl::{gen_statem, ActorSystem, ExitReason};
use std::sync::Arc;
#[gen_statem(fsm = r#"
[*] --> locked
locked --> |coin| unlocked
locked --> |push| locked
unlocked --> |push| locked
unlocked --> |coin| unlocked
unlocked --> |off| [*]
"#)]
#[derive(Debug, Clone)]
struct Turnstile {
donations: u32,
pushes: u32,
}
impl Turnstile {
/// Called on every state transition
fn on_transition(
&mut self,
event: TurnstileEvent,
state: TurnstileState,
) -> TurnstileTransitionResult {
match (state.clone(), event.clone()) {
(TurnstileState::Locked, TurnstileEvent::Coin) => {
self.donations += 1;
TurnstileTransitionResult::Next(TurnstileState::Unlocked, self.clone())
}
(TurnstileState::Unlocked, TurnstileEvent::Push) => {
self.pushes += 1;
TurnstileTransitionResult::Next(TurnstileState::Locked, self.clone())
}
(TurnstileState::Unlocked, TurnstileEvent::Off) => {
// FSM will auto-terminate on this transition
TurnstileTransitionResult::Keep(self.clone())
}
_ => TurnstileTransitionResult::Keep(self.clone()),
}
}
/// Called when entering a new state
fn on_enter(
&self,
old_state: &TurnstileState,
new_state: &TurnstileState,
_data: &TurnstileData,
) {
println!("Transition: {:?} -> {:?}", old_state, new_state);
}
/// Called on termination
fn on_terminate(
&self,
reason: &ExitReason,
state: &TurnstileState,
data: &TurnstileData,
) {
println!("Terminated in {:?}: {:?}", state, reason);
}
}
// Usage
let system = Arc::new(ActorSystem::new());
let initial_data = Turnstile { donations: 0, pushes: 0 };
let turnstile = Turnstile(&system, initial_data);
turnstile.send(Box::new(TurnstileEvent::Coin)).await.unwrap();
turnstile.send(Box::new(TurnstileEvent::Push)).await.unwrap();
Features:
[*] end stateon_transition, on_enter, and on_terminate hooksThe macro generates:
{Name}State enum with all states{Name}Event enum with all eventsTransitionResult enum for transition outcomesLike Erlang, joerl supports registering actors with names for easy lookup:
let system = ActorSystem::new();
let worker = system.spawn(Worker);
// Register with a name
system.register("my_worker", worker.pid()).unwrap();
// Look up by name
if let Some(pid) = system.whereis("my_worker") {
system.send(pid, Box::new("Hello")).await?;
}
// List all registered names
let names = system.registered();
// Unregister
system.unregister("my_worker").unwrap();
Names are automatically cleaned up when actors terminate. Actors can look up names from within their context:
#[async_trait]
impl Actor for MyActor {
async fn handle_message(&mut self, msg: Message, ctx: &mut ActorContext) {
if let Some(manager_pid) = ctx.whereis("manager") {
ctx.send(manager_pid, Box::new("status")).await.ok();
}
}
}
joerl supports Erlang-style send_after for delayed message delivery:
use joerl::scheduler::Destination;
use std::time::Duration;
let system = ActorSystem::new();
let actor = system.spawn(Worker);
// Schedule a message to be sent after 5 seconds
let timer_ref = system.send_after(
Destination::Pid(actor.pid()),
Box::new("delayed message"),
Duration::from_secs(5)
).await;
// Cancel the timer if needed
system.cancel_timer(timer_ref)?;
// Schedule to a named process
system.send_after(
Destination::Name("worker".to_string()),
Box::new("task"),
Duration::from_millis(100)
).await;
Actors can schedule messages from within their context:
#[async_trait]
impl Actor for MyActor {
async fn started(&mut self, ctx: &mut ActorContext) {
// Schedule a reminder to ourselves
ctx.send_after(
Destination::Pid(ctx.pid()),
Box::new("timeout"),
Duration::from_secs(10)
);
}
}
Actors can trap exit signals to handle failures gracefully:
#[async_trait]
impl Actor for MyActor {
async fn started(&mut self, ctx: &mut ActorContext) {
ctx.trap_exit(true);
}
async fn handle_signal(&mut self, signal: Signal, _ctx: &mut ActorContext) {
if let Signal::Exit { from, reason } = signal {
println!("Actor {} exited: {}", from, reason);
}
}
}
|| Erlang | joerl | Description |
|--------|-------|-------------|
| spawn/1 | system.spawn(actor) | Spawn a new actor |
| register(Name, Pid) | system.register(name, pid) | Register a process with a name |
| unregister(Name) | system.unregister(name) | Unregister a name |
| whereis(Name) | system.whereis(name) | Look up a process by name |
| registered() | system.registered() | List all registered names |
| erlang:send_after(Time, Dest, Msg) | system.send_after(dest, msg, duration) | Schedule delayed message |
| erlang:cancel_timer(TRef) | system.cancel_timer(tref) | Cancel a scheduled timer |
| gen_server:start_link/3 | gen_server::spawn(&system, server) | Spawn a gen_server |
| gen_server:call/2 | server_ref.call(request) | Synchronous call |
| gen_server:cast/2 | server_ref.cast(message) | Asynchronous cast |
| gen_statem:start_link/3 | #[gen_statem(fsm = "...")] | Define state machine with DSL |
| Pid | Pid | Process identifier |
| ! (send) | actor_ref.send(msg) | Send a message |
| link/1 | system.link(pid1, pid2) | Link two actors |
| monitor/2 | actor_ref.monitor(from) | Monitor an actor |
| process_flag(trap_exit, true) | ctx.trap_exit(true) | Trap exit signals |
| {'EXIT', Pid, Reason} | Signal::Exit { from, reason } | Exit signal |
| {'DOWN', Ref, process, Pid, Reason} | Signal::Down { reference, pid, reason } | Down signal |
See the examples/ directory for more examples:
counter.rs - Simple counter actorgen_server_counter.rs - GenServer (gen_server behavior) exampleturnstile.rs - GenStatem DSL with Mermaid state diagramgen_statem_turnstile.rs - Alternative GenStatem exampledocument_workflow.rs - Complex FSM with approval workflow and revision cycleping_pong.rs - Two actors communicatingsupervision_tree.rs - Supervision tree examplelink_monitor.rs - Links and monitors demonstrationnamed_processes.rs - Named process registry demonstrationscheduled_messaging.rs - Delayed message delivery with timerspanic_handling.rs - Comprehensive panic handling demonstration (Erlang/OTP-style)telemetry_demo.rs - Telemetry and metrics with Prometheus exportserialization_example.rs - Trait-based message serializationremote_actors.rs - Distributed actors conceptual foundationremote_ping_pong.rs - Remote messaging between nodesdistributed_chat.rs - Multi-node chat system over TCPdistributed_cluster.rs - Multi-node cluster with EPMD discoverydistributed_system_example.rs - Distributed system example (uses deprecated API, see remote_ping_pong.rs for current API)epmd_server.rs - Standalone EPMD serverRun examples with:
cargo run --example counter
joerl now features a unified ActorSystem with true location transparency! The same API works for both local and distributed scenarios - just like Erlang/OTP.
Quick Start with EPMD:
# Terminal 1: Start EPMD server
cargo run --example epmd_server
# Terminal 2: Start first node
cargo run --example distributed_system_example -- node_a 5001
# Terminal 3: Start second node
cargo run --example distributed_system_example -- node_b 5002
# Nodes automatically discover and connect!
Remote Ping-Pong Example:
# Terminal 1: Start server node
cargo run --example remote_ping_pong -- server
# Terminal 2: Start client node
cargo run --example remote_ping_pong -- client
This demonstrates:
ActorSystem::new_distributed() - same API as local!ctx.send() works for both local and remote actorsnodes(), node(), is_process_alive(), connect_to_node()Conceptual Examples:
The remote_actors example demonstrates distributed concepts using multiple local systems:
cargo run --example remote_actors
The distributed_chat example shows a real TCP-based distributed chat:
# Terminal 1
cargo run --example distributed_chat -- --node alice --port 8001
# Terminal 2
cargo run --example distributed_chat -- --node bob --port 8002 --connect 127.0.0.1:8001
For detailed documentation on building distributed systems with joerl, see DISTRIBUTED.md.
joerl provides comprehensive telemetry support for production monitoring:
[dependencies]
joerl = { version = "0.4", features = ["telemetry"] }
use joerl::telemetry;
use metrics_exporter_prometheus::PrometheusBuilder;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Set up Prometheus exporter
PrometheusBuilder::new()
.with_http_listener(([127, 0, 0, 1], 9090))
.install()?;
telemetry::init();
// Metrics automatically tracked:
// - Actor spawns, stops, panics
// - Message throughput and latency
// - Mailbox depth and backpressure
// - Supervisor restarts
let system = ActorSystem::new();
// ... your code
Ok(())
}
Available Metrics:
joerl_actors_spawned_total - Total actors spawnedjoerl_actors_active - Current active actorsjoerl_messages_sent_total - Message throughputjoerl_message_processing_duration_seconds - Processing latencyjoerl_supervisor_restarts_total - Supervisor restart eventsSee TELEMETRY.md for comprehensive documentation and integration examples with Prometheus, Grafana, OpenTelemetry, and Datadog.
The library is organized into several modules:
actor - Core actor trait and contextsystem - Actor system runtime and registrymessage - Message types and signalsmailbox - Bounded mailbox implementationsupervisor - Supervision trees and restart strategiestelemetry - Metrics and observability (optional)error - Error types and resultspid - Process identifierRun the test suite:
cargo test
Check code coverage:
cargo tarpaulin --out Html
Contributions are welcome! Please feel free to submit a Pull Request.
Licensed under either of:
at your option.
This library is dedicated to the memory of Joe Armstrong (1950-2019), whose work on Erlang has inspired generations of developers to build robust, concurrent systems.