| Crates.io | raftoral |
| lib.rs | raftoral |
| version | 0.2.0 |
| created_at | 2025-10-11 05:59:14.00226+00 |
| updated_at | 2025-11-15 04:02:58.861574+00 |
| description | Fault-tolerant workflow orchestration library using embedded Raft consensus for Rust applications |
| homepage | https://github.com/orishu/raftoral |
| repository | https://github.com/orishu/raftoral |
| max_upload_size | |
| id | 1877893 |
| size | 1,004,166 |
A Rust library for building fault-tolerant, distributed workflows using the Raft consensus protocol.
Building long-running, fault-tolerant workflows typically requires deploying and managing separate orchestration infrastructure:
External Orchestrators (Temporal, AWS Step Functions, etc.):
Example Setup (Temporal):
Your Services (Workers) → Temporal Server Cluster → Database (Cassandra/Postgres)
3+ nodes 3-5+ nodes 3+ nodes
You end up managing 10+ nodes across multiple systems just to orchestrate workflows.
Raftoral eliminates separate orchestration infrastructure by embedding the orchestrator directly into your long-running services using Raft consensus:
The Architecture Difference:
Requirements:
See our detailed comparison: Raftoral vs. Temporal vs. DBOS
This guide helps you choose the right workflow orchestration system by comparing architecture, scalability, complexity, use cases, and trade-offs across all three platforms.
Raftoral uses Raft consensus to coordinate workflow execution across a cluster of nodes without requiring external infrastructure. The owner/wait pattern ensures efficient operation in multi-node clusters:
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Node 1 │────▶│ Node 2 │◀────│ Node 3 │
│ (Leader) │ │ (Follower) │ │ (Follower) │
│ OWNER │ │ WAITER │ │ WAITER │
└─────────────┘ └─────────────┘ └─────────────┘
▲ ▲ ▲
│ │ │
└───────────────────┴───────────────────┘
Raft Consensus Protocol
(No external database needed)
All nodes execute workflows in parallel, but only the owner proposes state changes:
WorkflowStart command through RaftKey Benefits:
For large deployments (20+ nodes), Raftoral uses a two-tier architecture to prevent checkpoint replication from overwhelming the cluster:
┌─────────────────────────────────────────────────────────┐
│ Management Cluster (cluster_id = 0) │
│ Tracks topology & coordinates multiple exec clusters │
│ Voters: 3-5 nodes | Learners: N nodes │
└─────────────────────────────────────────────────────────┘
│
┌────────────────┼────────────────┐
│ │ │
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Exec Cluster│ │ Exec Cluster│ │ Exec Cluster│
│ (ID: 1) │ │ (ID: 2) │ │ (ID: 3) │
│ 5 nodes │ │ 5 nodes │ │ 5 nodes │
│ ┌─────────┐ │ │ ┌─────────┐ │ │ ┌─────────┐ │
│ │Workflows│ │ │ │Workflows│ │ │ │Workflows│ │
│ │ + Chkpts│ │ │ │ + Chkpts│ │ │ │ + Chkpts│ │
│ └─────────┘ │ │ └─────────┘ │ │ └─────────┘ │
└─────────────┘ └─────────────┘ └─────────────┘
How It Works:
Scalability Benefits:
Single 50-node cluster:
- Checkpoint replication: 50x per checkpoint
- State: O(W) workflows tracked globally
Multi-cluster (10 exec clusters × 5 nodes):
- Checkpoint replication: 5x per checkpoint (10x reduction!)
- State: O(C×N) clusters×nodes (massive reduction for high workflow count)
- Each node in ~2-3 execution clusters
See docs/SCALABILITY_ARCHITECTURE.md for detailed architecture.
If you're familiar with Temporal, Raftoral's replicated variables serve a similar purpose to Activities, but with a different philosophy:
// External service call with retries
const result = await workflow.executeActivity('chargeCard', {
amount: 100,
retries: 3
});
use raftoral::{checkpoint, checkpoint_compute};
// Deterministic computation with consensus-backed checkpoints (using macros)
let mut amount = checkpoint!(ctx, "charge_amount", 100);
let result = checkpoint_compute!(ctx, "payment_result", || async {
charge_card(*amount).await // External call executed once (owner only)
});
Key Differences:
| Aspect | Temporal Activities | Raftoral Replicated Variables |
|---|---|---|
| Execution Model | Separate worker pools | Same process, all nodes execute |
| State Storage | External database | Raft consensus (in-memory + snapshots) |
| Side Effects | Activity-specific retry logic | with_computation() for one-time execution |
| Network Overhead | Every activity call | Only during checkpoint creation (owner-only) |
| Determinism | Activities can be non-deterministic | Workflow code must be deterministic |
When to use checkpoint! vs checkpoint_compute!:
checkpoint!(ctx, "key", value): For deterministic state (counters, status, computed values)checkpoint_compute!(ctx, "key", || async { ... }): For side effects (API calls, external services)
Example - Payment Processing:
use raftoral::{checkpoint, checkpoint_compute};
runtime.register_workflow_closure("process_payment", 1,
|input: PaymentInput, ctx: WorkflowContext| async move {
// Deterministic state (using checkpoint! macro)
let order_id = checkpoint!(ctx, "order_id", input.order_id);
let amount = checkpoint!(ctx, "amount", input.amount);
// Side effect: charge card once (owner-only execution)
let charge_result = checkpoint_compute!(ctx, "charge", || async {
stripe::charge_card(*order_id, *amount).await
});
// Update based on result
let mut status = checkpoint!(ctx, "status",
if charge_result.is_ok() { "completed" } else { "failed" }
);
Ok(PaymentOutput { status: status.get() })
}
)?;
Why This Matters:
use raftoral::workflow::{WorkflowRuntime, WorkflowContext};
use raftoral::{checkpoint, checkpoint_compute};
use raftoral::raft::generic::{RaftNodeConfig, TransportLayer, GrpcTransport};
use tokio::signal;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();
// 1. Create transport and runtime
let config = RaftNodeConfig {
node_id: 1,
cluster_id: 1,
..Default::default()
};
let transport = Arc::new(TransportLayer::new(Arc::new(
GrpcTransport::new("127.0.0.1:7001".to_string())
)));
let (tx, rx) = tokio::sync::mpsc::channel(100);
let logger = create_logger();
let (runtime, node) = WorkflowRuntime::new(config, transport, rx, logger)?;
let runtime = Arc::new(runtime);
// 2. Register workflow with checkpoints (using macros for clean syntax)
runtime.register_workflow_closure(
"process_order", 1,
|input: OrderInput, ctx: WorkflowContext| async move {
// Regular checkpoint for deterministic state
let mut status = checkpoint!(ctx, "status", "processing");
// Computed checkpoint for side effects (API calls)
let inventory_check = checkpoint_compute!(
ctx,
"inventory",
|| async {
check_inventory_service(input.item_id).await
}
);
if *inventory_check {
status.set("confirmed").await?;
} else {
status.set("out_of_stock").await?;
}
Ok(OrderOutput { status: status.get() })
}
).await?;
// 3. Run node and wait for shutdown
tokio::spawn(async move {
let _ = RaftNode::run_from_arc(node).await;
});
signal::ctrl_c().await?;
Ok(())
}
// From ANY node in the cluster
let input = OrderInput {
order_id: "ORD-123".to_string(),
item_id: "ITEM-456".to_string(),
};
let workflow = runtime
.start_workflow::<OrderInput, OrderOutput>(
"my-workflow-1".to_string(),
"process_order".to_string(),
1,
input
)
.await?;
let output = workflow.wait_for_completion().await?;
println!("Order status: {}", output.status);
One of Raftoral's key strengths is dynamic cluster membership - you can start with any cluster size and safely add or remove nodes at runtime.
# Start with a single node (development)
./raftoral --listen 127.0.0.1:7001 --bootstrap
# Or start with 3 nodes (production)
./raftoral --listen 127.0.0.1:7001 --bootstrap
./raftoral --listen 127.0.0.1:7002 --peers 127.0.0.1:7001
./raftoral --listen 127.0.0.1:7003 --peers 127.0.0.1:7001
New nodes can join a running cluster and automatically catch up on in-flight workflows:
// New node joins cluster
let (runtime, node) = WorkflowRuntime::new_joining_node(
config,
transport,
rx,
vec![1, 2], // Initial voter IDs
logger
)?;
// Node discovers cluster configuration, gets assigned node ID,
// and receives Raft snapshot to catch up on running workflows
What Happens During Join:
Raft's Native Snapshot Mechanism:
Challenge: What if a node joins while workflows are running with lots of checkpoints?
Solution: Checkpoint Queues + Owner/Wait Pattern
// Workflow running on nodes 1, 2, 3:
for i in 0..1000 {
counter.set(i).await?; // Creates 1000 checkpoints
}
// Node 4 joins after 500 iterations:
// - Receives snapshot with checkpoint queues containing values 0-500
// - Starts executing at iteration 0
// - Pops from queue instead of waiting for owner: instant catch-up!
// - Joins live execution at iteration 500+
Technical Details:
Result: New nodes can join a cluster with running workflows and seamlessly catch up without blocking the cluster or missing state.
Workflows evolve over time - you add features, fix bugs, change behavior. Raftoral handles this through explicit versioning with a migration path for long-running workflows.
// Version 1 (deployed in production with running workflows)
runtime.register_workflow_closure("process_order", 1, |input, ctx| async {
let status = ReplicatedVar::with_value("status", &ctx, "processing").await?;
// ...original logic...
});
// Later: You want to add fraud detection
// But some workflows started with v1 and are still running!
Best Practice: Register both old and new versions during rollout:
// Version 1 - Keep running for in-flight workflows
runtime.register_workflow_closure("process_order", 1, |input, ctx| async {
let status = ReplicatedVar::with_value("status", &ctx, "processing").await?;
// ...original logic...
Ok(OrderOutput { status: status.get() })
}).await?;
// Version 2 - New workflows use this
runtime.register_workflow_closure("process_order", 2, |input, ctx| async {
let status = ReplicatedVar::with_value("status", &ctx, "processing").await?;
// NEW: Fraud detection
let fraud_check = ReplicatedVar::with_computation("fraud_check", &ctx, || async {
fraud_service::check(input.order_id).await
}).await?;
if !*fraud_check {
status.set("fraud_detected").await?;
return Ok(OrderOutput { status: status.get() });
}
// ...rest of logic...
Ok(OrderOutput { status: status.get() })
}).await?;
Deployment Strategy:
Phase 1 - Deploy with Both Versions:
# All nodes run with v1 and v2 registered
# New workflows use v2, old workflows continue with v1
Phase 2 - Wait for v1 Workflows to Complete:
# Monitor running workflows
# Wait for all v1 instances to finish naturally
Phase 3 - Remove v1:
// Only register v2 in new deployments
runtime.register_workflow_closure("process_order", 2, /* ... */).await?;
Why Explicit Versioning:
# Simple workflow example
cargo run --example typed_workflow_example
# Run tests
cargo test
# Two-node cluster test
./scripts/test_two_node_cluster.sh
use raftoral::raft::generic::{InProcessNetwork, InProcessNetworkSender, TransportLayer};
use raftoral::workflow::WorkflowRuntime;
// Create shared network
let network = Arc::new(InProcessNetwork::new());
// Create transport for node 1
let (tx1, rx1) = mpsc::channel(100);
network.register_node(1, tx1.clone()).await;
let transport1 = Arc::new(TransportLayer::new(Arc::new(InProcessNetworkSender::new(
network.clone(),
))));
let config1 = RaftNodeConfig {
node_id: 1,
cluster_id: 1,
..Default::default()
};
let (runtime1, node1) = WorkflowRuntime::new(config1, transport1, rx1, logger)?;
// Execute workflows in-memory (no network)
let workflow = runtime1.start_workflow("wf-1", "my_workflow", 1, input).await?;
let result = workflow.wait_for_completion().await?;
Serialize + Deserializesrc/
├── raft/generic/
│ ├── node.rs # RaftNode with raft-rs integration
│ ├── proposal_router.rs # ProposalRouter for command submission
│ ├── transport.rs # Transport abstraction (Layer 2-3)
│ ├── server/
│ │ ├── in_process.rs # InProcessNetwork for testing
│ │ └── grpc.rs # gRPC transport implementation
│ ├── message.rs # Message types & CommandExecutor trait
│ ├── errors.rs # Error types
│ ├── cluster_router.rs # Multi-cluster message routing
│ └── integration_tests.rs # Two-node KV cluster tests
├── workflow/
│ ├── mod.rs # Public API exports
│ ├── runtime.rs # WorkflowRuntime with owner/wait pattern
│ ├── state_machine.rs # WorkflowStateMachine & commands
│ ├── context.rs # WorkflowContext & WorkflowRun
│ ├── registry.rs # Type-safe workflow storage
│ ├── replicated_var.rs # ReplicatedVar with with_value/with_computation
│ ├── event.rs # WorkflowEvent definitions
│ ├── error.rs # Error types
│ └── ownership.rs # Workflow ownership tracking
├── nodemanager/
│ ├── mod.rs # NodeManager (dual-cluster coordination)
│ ├── node_manager.rs # Owns management + execution clusters
│ ├── management_command.rs # Management cluster commands
│ └── management_executor.rs # Management state & execution
├── grpc/
│ └── server.rs # gRPC service implementation
└── lib.rs # Public API exports
examples/
├── typed_workflow_example.rs # Complete workflow example
└── ...
docs/
├── SCALABILITY_ARCHITECTURE.md # Multi-cluster architecture details
└── COMPARISON.md # Raftoral vs Temporal vs DBOS
Contributions welcome! Areas of interest:
Ori Shalev - ori.shalev@gmail.com
MIT License. See LICENSE for details.