| Crates.io | deltaflow |
| lib.rs | deltaflow |
| version | 0.5.0 |
| created_at | 2025-12-05 00:51:26.188993+00 |
| updated_at | 2025-12-18 23:58:06.997875+00 |
| description | The embeddable workflow engine. Type-safe, Elixir-inspired pipelines. |
| homepage | |
| repository | https://github.com/mavdi/deltaflow |
| max_upload_size | |
| id | 1967399 |
| size | 312,205 |
The embeddable workflow engine.
Type-safe, Elixir-inspired pipelines that run in your process. No infrastructure required.

use deltaflow::{Pipeline, Step, StepError, RetryPolicy, NoopRecorder};
// Define steps implementing the Step trait
struct ParseInput;
struct ProcessData;
struct FormatOutput;
// Build a type-safe pipeline
let pipeline = Pipeline::new("my_workflow")
.start_with(ParseInput) // String -> ParsedData
.then(ProcessData) // ParsedData -> ProcessedData
.then(FormatOutput) // ProcessedData -> Output
.with_retry(RetryPolicy::exponential(3))
.with_recorder(NoopRecorder)
.build();
// Run it
let result = pipeline.run(input).await?;
Add to your Cargo.toml:
[dependencies]
deltaflow = "0.2"
For SQLite-backed recording and task queue:
[dependencies]
deltaflow = { version = "0.2", features = ["sqlite"] }
The fundamental building block. Each step transforms a typed input to a typed output:
#[async_trait]
pub trait Step: Send + Sync {
type Input: Send + Clone;
type Output: Send;
fn name(&self) -> &'static str;
async fn execute(&self, input: Self::Input) -> Result<Self::Output, StepError>;
}
Compose steps with method chaining. The compiler ensures each step's output type matches the next step's input:
let pipeline = Pipeline::new("process_order")
.start_with(ValidateOrder) // Order -> ValidatedOrder
.then(ChargePayment) // ValidatedOrder -> PaidOrder
.then(FulfillOrder) // PaidOrder -> CompletedOrder
.with_retry(RetryPolicy::exponential(5))
.with_recorder(SqliteRecorder::new(pool))
.build();
Background task processing with the sqlite feature. Register pipelines, submit work, process concurrently:
let runner = Runner::new(SqliteTaskStore::new(pool))
.pipeline(order_pipeline)
.pipeline(notification_pipeline)
.max_concurrent(4)
.build();
// Submit work
runner.submit("process_order", order).await?;
// Process tasks
runner.run().await;
Route pipeline output to multiple downstream pipelines:
let pipeline = Pipeline::new("market_data")
.start_with(ValidateStep)
.then(NormalizeStep)
// Conditional fork - only triggers when predicate is true
.fork_when(|d| d.asset_class == "crypto", "crypto_analysis")
.fork_when(|d| d.asset_class == "equity", "equity_analysis")
// Conditional fork with description (for visualization)
.fork_when(|d| d.priority == "high", "priority_queue").desc("high_priority")
// Static fan-out - always sends to all targets
.fan_out(&["ml_pipeline", "stats_pipeline"])
// Dynamic spawn - generate tasks from output
.emit("alerts", |d| {
if d.price_change > 0.05 {
vec![Alert { symbol: d.symbol.clone() }]
} else {
vec![]
}
})
.build();
Note: Multiple forks can match simultaneously - they are not mutually exclusive.
Export pipeline structure for visualization tools:
let graph = pipeline.to_graph();
let json = serde_json::to_string_pretty(&graph)?;
// Returns JSON with steps, forks, fan_outs, dynamic_spawns
For interactive pipeline visualization during development:
[dev-dependencies]
deltaflow-harness = "0.1"
use deltaflow_harness::RunnerHarnessExt;
let runner = RunnerBuilder::new(store)
.pipeline(my_pipeline)
.with_visualizer(3000) // Starts web UI at http://localhost:3000
.build();
The visualizer shows pipeline steps as connected boxes with fork, fan-out, and spawn connections between pipelines.
For time-based task enqueueing, use PeriodicScheduler:
use deltaflow::{SchedulerBuilder, SqliteTaskStore};
use std::time::Duration;
let scheduler = SchedulerBuilder::new(task_store)
.job("process_video", Duration::from_secs(900), {
let repo = repo.clone();
move || {
let repo = repo.clone();
async move { repo.get_pending_videos().await.unwrap_or_default() }
}
})
.run_on_start(true)
.job("validate_signals", Duration::from_secs(3600), {
let repo = repo.clone();
move || {
let repo = repo.clone();
async move { repo.get_pending_signals().await.unwrap_or_default() }
}
})
.run_on_start(false)
.build();
// Run alongside your pipeline runner
tokio::select! {
_ = runner.run() => {}
_ = scheduler.run() => {}
}
For pipelines that need rate limiting (e.g., external API calls):
let runner = RunnerBuilder::new(task_store)
.pipeline(fast_pipeline)
.pipeline_with_concurrency(rate_limited_pipeline, 1) // Only 1 concurrent
.build();
Pipelines with custom concurrency use their own semaphore instead of the global max_concurrent limit.
See the examples/ directory for working code:
Run with:
cargo run --example visualizer_demo --features sqlite
Deltaflow is experimental (0.1.x). The API will evolve based on feedback.
What works:
What's coming:
Not planned (by design):
Feedback welcome: GitHub Issues
MIT