deltaflow

Crates.iodeltaflow
lib.rsdeltaflow
version0.5.0
created_at2025-12-05 00:51:26.188993+00
updated_at2025-12-18 23:58:06.997875+00
descriptionThe embeddable workflow engine. Type-safe, Elixir-inspired pipelines.
homepage
repositoryhttps://github.com/mavdi/deltaflow
max_upload_size
id1967399
size312,205
M Avdi (mavdi)

documentation

https://docs.rs/deltaflow

README

Deltaflow

The embeddable workflow engine.

Type-safe, Elixir-inspired pipelines that run in your process. No infrastructure required.

Pipeline visualization

Why Deltaflow?

  • Type-safe composition - Compiler enforces step output matches next step's input
  • Elixir-inspired - Declarative pipelines via method chaining, not scattered callbacks
  • Observable by default - Every run and step recorded for debugging
  • Embeddable - A library, not a service. Runs in your process.

Quick Start

use deltaflow::{Pipeline, Step, StepError, RetryPolicy, NoopRecorder};

// Define steps implementing the Step trait
struct ParseInput;
struct ProcessData;
struct FormatOutput;

// Build a type-safe pipeline
let pipeline = Pipeline::new("my_workflow")
    .start_with(ParseInput)       // String -> ParsedData
    .then(ProcessData)            // ParsedData -> ProcessedData
    .then(FormatOutput)           // ProcessedData -> Output
    .with_retry(RetryPolicy::exponential(3))
    .with_recorder(NoopRecorder)
    .build();

// Run it
let result = pipeline.run(input).await?;

Installation

Add to your Cargo.toml:

[dependencies]
deltaflow = "0.2"

For SQLite-backed recording and task queue:

[dependencies]
deltaflow = { version = "0.2", features = ["sqlite"] }

Core Concepts

Step

The fundamental building block. Each step transforms a typed input to a typed output:

#[async_trait]
pub trait Step: Send + Sync {
    type Input: Send + Clone;
    type Output: Send;

    fn name(&self) -> &'static str;
    async fn execute(&self, input: Self::Input) -> Result<Self::Output, StepError>;
}

Pipeline

Compose steps with method chaining. The compiler ensures each step's output type matches the next step's input:

let pipeline = Pipeline::new("process_order")
    .start_with(ValidateOrder)    // Order -> ValidatedOrder
    .then(ChargePayment)          // ValidatedOrder -> PaidOrder
    .then(FulfillOrder)           // PaidOrder -> CompletedOrder
    .with_retry(RetryPolicy::exponential(5))
    .with_recorder(SqliteRecorder::new(pool))
    .build();

Runner

Background task processing with the sqlite feature. Register pipelines, submit work, process concurrently:

let runner = Runner::new(SqliteTaskStore::new(pool))
    .pipeline(order_pipeline)
    .pipeline(notification_pipeline)
    .max_concurrent(4)
    .build();

// Submit work
runner.submit("process_order", order).await?;

// Process tasks
runner.run().await;

Forking and Fan-out

Route pipeline output to multiple downstream pipelines:

let pipeline = Pipeline::new("market_data")
    .start_with(ValidateStep)
    .then(NormalizeStep)

    // Conditional fork - only triggers when predicate is true
    .fork_when(|d| d.asset_class == "crypto", "crypto_analysis")
    .fork_when(|d| d.asset_class == "equity", "equity_analysis")

    // Conditional fork with description (for visualization)
    .fork_when(|d| d.priority == "high", "priority_queue").desc("high_priority")

    // Static fan-out - always sends to all targets
    .fan_out(&["ml_pipeline", "stats_pipeline"])

    // Dynamic spawn - generate tasks from output
    .emit("alerts", |d| {
        if d.price_change > 0.05 {
            vec![Alert { symbol: d.symbol.clone() }]
        } else {
            vec![]
        }
    })
    .build();

Note: Multiple forks can match simultaneously - they are not mutually exclusive.

Pipeline Visualization

Exporting Pipeline Structure

Export pipeline structure for visualization tools:

let graph = pipeline.to_graph();
let json = serde_json::to_string_pretty(&graph)?;
// Returns JSON with steps, forks, fan_outs, dynamic_spawns

Web Visualizer

For interactive pipeline visualization during development:

[dev-dependencies]
deltaflow-harness = "0.1"
use deltaflow_harness::RunnerHarnessExt;

let runner = RunnerBuilder::new(store)
    .pipeline(my_pipeline)
    .with_visualizer(3000)  // Starts web UI at http://localhost:3000
    .build();

The visualizer shows pipeline steps as connected boxes with fork, fan-out, and spawn connections between pipelines.

Periodic Scheduler

For time-based task enqueueing, use PeriodicScheduler:

use deltaflow::{SchedulerBuilder, SqliteTaskStore};
use std::time::Duration;

let scheduler = SchedulerBuilder::new(task_store)
    .job("process_video", Duration::from_secs(900), {
        let repo = repo.clone();
        move || {
            let repo = repo.clone();
            async move { repo.get_pending_videos().await.unwrap_or_default() }
        }
    })
    .run_on_start(true)

    .job("validate_signals", Duration::from_secs(3600), {
        let repo = repo.clone();
        move || {
            let repo = repo.clone();
            async move { repo.get_pending_signals().await.unwrap_or_default() }
        }
    })
    .run_on_start(false)

    .build();

// Run alongside your pipeline runner
tokio::select! {
    _ = runner.run() => {}
    _ = scheduler.run() => {}
}

Per-Pipeline Concurrency

For pipelines that need rate limiting (e.g., external API calls):

let runner = RunnerBuilder::new(task_store)
    .pipeline(fast_pipeline)
    .pipeline_with_concurrency(rate_limited_pipeline, 1)  // Only 1 concurrent
    .build();

Pipelines with custom concurrency use their own semaphore instead of the global max_concurrent limit.

Examples

See the examples/ directory for working code:

  • basic.rs - Minimal pipeline setup
  • visualizer_demo.rs - Web visualizer basics
  • visualizer_complex.rs - Complex topologies (diamond patterns, cycles, multi-source)

Run with:

cargo run --example visualizer_demo --features sqlite

Status

Deltaflow is experimental (0.1.x). The API will evolve based on feedback.

What works:

  • Pipeline composition with type-safe step chaining
  • Retry policies (exponential backoff, fixed delay)
  • SQLite recording for observability
  • Task runner with concurrent execution
  • Follow-up task spawning

What's coming:

  • Per-step retry policies
  • Task priorities
  • More storage backends

Not planned (by design):

  • Distributed execution (single-process by design)
  • DAG dependencies (pipelines are linear)

Feedback welcome: GitHub Issues

License

MIT

Commit count: 0

cargo fmt