pipeflow

Crates.iopipeflow
lib.rspipeflow
version0.0.4
created_at2026-01-06 05:35:44.440447+00
updated_at2026-01-15 01:26:03.994046+00
descriptionA lightweight, configuration-driven data pipeline framework
homepage
repositoryhttps://github.com/telepair/pipeflow
max_upload_size
id2025202
size969,430
Liys (liysx)

documentation

https://docs.rs/pipeflow

README

pipeflow

Crates.io Documentation License: MIT

中文文档

A lightweight, configuration-driven data pipeline framework for Rust.

Source → Transform → Sink

Features

  • YAML Configuration: Declarative pipeline definition with DAG validation (duplicate IDs, missing inputs/outputs, cycles)
  • Fan-out: One source can broadcast to multiple sinks
  • Configurable Broadcast Capacity: Tune output_buffer_size (source/transform broadcast)
  • Built-in Nodes:
    • Sources: http_client, http_server, file, redis, sql
    • Sinks: console, file, blackhole, http_client, redis, sql, notify
  • CLI: run, config validate, config show, config graph

Feature Flags

Pipeflow uses Cargo features to keep optional dependencies behind flags.

  • http-client (default): Enables the http_client source and sink.
  • http-server: Enables the http_server source.
  • database: Enables sql source and sink.
  • redis: Enables the redis source and sink.
  • file (default): Enables the file source and sink.
  • notify (default): Enables the notify sink.

Core-only build (no optional sources/sinks):

cargo build --no-default-features

If a pipeline config references a node behind a disabled feature, Engine::build() returns a configuration error explaining which feature is required.

Quick Start

Requirements

  • Rust 1.92 or later (uses Rust 2024 edition)

Installation

cargo add pipeflow

Configuration

Create a pipeline configuration file pipeline.yaml:

system:
  # output_buffer_size: broadcast channel capacity for sources/transforms (default: 1024)
  output_buffer_size: 1024

pipeline:
  sources:
    - id: api_poller
      type: http_client
      config:
        url: "https://httpbin.org/json"
        interval: "10s"
        # schedule: "0 0 * * *" # Run daily at 00:00 (local time, 5 fields; seconds default to 0)

  transforms:
    - id: pass_through
      inputs: [api_poller]
      outputs: [console]

  sinks:
    - id: console
      type: console
      config:
        format: pretty

Wiring Nodes

Pipeflow wiring is source -> transform -> sink:

  • Transforms declare inputs (one or more sources or transforms).
  • Transforms declare outputs (one or more sinks or transforms).
  • Transform-to-transform wiring can be declared on either side; the engine infers the missing side.
  • Transforms may omit steps to act as pass-through nodes.
  • Sources do not declare inputs or outputs.
  • Sinks do not declare inputs; their target is defined by sink type/config (e.g. file path).

Loading from a directory

All commands that accept CONFIG also accept a directory. When a directory is provided, pipeflow loads all *.yaml / *.yml files in lexical order and merges them into a single configuration before normalization and validation.

This is useful for larger pipelines:

# Directory-based config
pipeflow run ./configs/
pipeflow config validate ./configs/
pipeflow config show ./configs/ --format yaml

Run (Programmatic)

use pipeflow::prelude::*;

#[tokio::main]
async fn main() -> Result<()> {
    let config = Config::from_file("pipeline.yaml")?;
    let mut engine = Engine::from_config(config)?;
    engine.build().await?;
    engine.run().await
}

Node Types

Sources

Type Description Status
http_client HTTP polling Implemented
http_server HTTP push/webhook Implemented
redis Redis GET polling Implemented
sql SQL polling Implemented
file File watching Implemented

System Sources (implicit, fixed IDs):

  • source::system::dlq - Dead Letter Queue
  • source::system::event - System events
  • source::system::notify - Notifications

System sources are always available and emit into their corresponding system sinks by default. You can consume a system source in your pipeline by referencing it in a transform inputs.

Transforms

Type Description I/O Status
remap Field mapping (step) 1:1 Implemented
filter Conditional filtering 1:0/1 Implemented
window Time/count aggregation n:1 Implemented
compute Math expression eval 1:1 Implemented
hash Generate hash IDs 1:1 Implemented

Sinks

Type Description Status
blackhole Discard messages Implemented
console Print to stdout Implemented
file Write to file Implemented
sql SQL Database insert Implemented
redis Redis operations Implemented
notify Email/Telegram/Webhook Implemented
http_client HTTP API calls Implemented

System Sinks (implicit, fixed IDs):

  • sink::system::dlq - Default DLQ output (data/system_dlq.jsonl)
  • sink::system::event - Default event output (data/system_event.jsonl)
  • sink::system::notify - Default notify output (data/system_notify.jsonl)

You can override the base directory with PIPEFLOW_SYSTEM_SINK_DIR or configure paths in system.sinks.

system:
  sinks:
    dir: ./data
    dlq: ./data/custom_dlq.jsonl
    event: ./data/custom_event.jsonl
    notify: ./data/custom_notify.jsonl

Configuration Reference

See docs/CONFIGURATION.md for detailed configuration parameters for all supported sources and sinks.

Broadcast Buffer Configuration

Pipeflow uses tokio::sync::broadcast channels to connect nodes that can emit messages (sources/transforms). You can tune the broadcast capacity via output_buffer_size.

system:
  output_buffer_size: 1024 # broadcast channel capacity for sources/transforms

Notes:

  • Sources can override output_buffer_size per source.
  • If a sink/transform lags behind the broadcast buffer, it may drop messages and log Lagged.

Dead Letter Queue

The source::system::dlq source is implemented and can be wired to any sink. Chain-depth protection prevents infinite loops when messages are routed through system sources (max depth: 8).

Current status:

  • source::system::dlq source: Implemented
  • Chain-depth protection: Implemented
  • Automatic DLQ routing on transform/sink errors: Implemented

See docs/DESIGN.md for the full design.

CLI Commands

# Run pipeline
pipeflow run config.yaml

# Validate configuration
pipeflow config validate config.yaml

# Show pipeline graph (ASCII)
pipeflow config graph config.yaml

# Show merged + normalized configuration
pipeflow config show config.yaml --format yaml

Notes:

  • pipeflow config validate checks YAML structure and pipeline wiring (IDs, references, cycles, system routing). It does not validate node-specific config contents (e.g. required http_client.url); those are validated during Engine::build() (and therefore pipeflow run).
  • If you use directory-based configs, config show displays the merged + normalized result.

Distributed & High Availability

Pipeflow is stand-alone by design.

To keep the architecture simple and robust (KISS principle), Pipeflow does not implement complex distributed coordination protocols (like Raft or Paxos).

  • Persistence: State (like silence records) is stored on the local filesystem (./data by default). We have removed complex distributed backends like Redis for silence to favor simplicity and filesystem atomicity.
  • Scaling: We recommend Manual Sharding. Deploy multiple independent instances, each handling a different subset of configuration files.
  • High Availability: Use detailed health checks (e.g., K8s liveness probes) to restart failed instances. If you need shared state across instances (e.g., shared silence), mount a shared volume (NFS/EFS) to the data_dir.

Documentation

See docs/DESIGN.md for detailed design documentation.

Testing

# Unit + integration tests
cargo test --all-features

# Lint (clippy)
cargo clippy --all-targets --all-features -- -D warnings

# Format check
cargo fmt --all -- --check

License

MIT

Commit count: 44

cargo fmt