| Crates.io | pipeflow |
| lib.rs | pipeflow |
| version | 0.0.4 |
| created_at | 2026-01-06 05:35:44.440447+00 |
| updated_at | 2026-01-15 01:26:03.994046+00 |
| description | A lightweight, configuration-driven data pipeline framework |
| homepage | |
| repository | https://github.com/telepair/pipeflow |
| max_upload_size | |
| id | 2025202 |
| size | 969,430 |
A lightweight, configuration-driven data pipeline framework for Rust.
Source → Transform → Sink
output_buffer_size (source/transform broadcast)http_client, http_server, file, redis, sqlconsole, file, blackhole, http_client, redis, sql, notifyrun, config validate, config show, config graphPipeflow uses Cargo features to keep optional dependencies behind flags.
http-client (default): Enables the http_client source and sink.http-server: Enables the http_server source.database: Enables sql source and sink.redis: Enables the redis source and sink.file (default): Enables the file source and sink.notify (default): Enables the notify sink.Core-only build (no optional sources/sinks):
cargo build --no-default-features
If a pipeline config references a node behind a disabled feature, Engine::build() returns a
configuration error explaining which feature is required.
cargo add pipeflow
Create a pipeline configuration file pipeline.yaml:
system:
# output_buffer_size: broadcast channel capacity for sources/transforms (default: 1024)
output_buffer_size: 1024
pipeline:
sources:
- id: api_poller
type: http_client
config:
url: "https://httpbin.org/json"
interval: "10s"
# schedule: "0 0 * * *" # Run daily at 00:00 (local time, 5 fields; seconds default to 0)
transforms:
- id: pass_through
inputs: [api_poller]
outputs: [console]
sinks:
- id: console
type: console
config:
format: pretty
Pipeflow wiring is source -> transform -> sink:
inputs (one or more sources or transforms).outputs (one or more sinks or transforms).steps to act as pass-through nodes.inputs or outputs.inputs; their target is defined by sink type/config (e.g. file path).All commands that accept CONFIG also accept a directory. When a directory is provided,
pipeflow loads all *.yaml / *.yml files in lexical order and merges them into a single
configuration before normalization and validation.
This is useful for larger pipelines:
# Directory-based config
pipeflow run ./configs/
pipeflow config validate ./configs/
pipeflow config show ./configs/ --format yaml
use pipeflow::prelude::*;
#[tokio::main]
async fn main() -> Result<()> {
let config = Config::from_file("pipeline.yaml")?;
let mut engine = Engine::from_config(config)?;
engine.build().await?;
engine.run().await
}
| Type | Description | Status |
|---|---|---|
http_client |
HTTP polling | Implemented |
http_server |
HTTP push/webhook | Implemented |
redis |
Redis GET polling | Implemented |
sql |
SQL polling | Implemented |
file |
File watching | Implemented |
System Sources (implicit, fixed IDs):
source::system::dlq - Dead Letter Queuesource::system::event - System eventssource::system::notify - NotificationsSystem sources are always available and emit into their corresponding system sinks by default.
You can consume a system source in your pipeline by referencing it in a transform inputs.
| Type | Description | I/O | Status |
|---|---|---|---|
remap |
Field mapping (step) | 1:1 | Implemented |
filter |
Conditional filtering | 1:0/1 | Implemented |
window |
Time/count aggregation | n:1 | Implemented |
compute |
Math expression eval | 1:1 | Implemented |
hash |
Generate hash IDs | 1:1 | Implemented |
| Type | Description | Status |
|---|---|---|
blackhole |
Discard messages | Implemented |
console |
Print to stdout | Implemented |
file |
Write to file | Implemented |
sql |
SQL Database insert | Implemented |
redis |
Redis operations | Implemented |
notify |
Email/Telegram/Webhook | Implemented |
http_client |
HTTP API calls | Implemented |
System Sinks (implicit, fixed IDs):
sink::system::dlq - Default DLQ output (data/system_dlq.jsonl)sink::system::event - Default event output (data/system_event.jsonl)sink::system::notify - Default notify output (data/system_notify.jsonl)You can override the base directory with PIPEFLOW_SYSTEM_SINK_DIR or configure paths in system.sinks.
system:
sinks:
dir: ./data
dlq: ./data/custom_dlq.jsonl
event: ./data/custom_event.jsonl
notify: ./data/custom_notify.jsonl
See docs/CONFIGURATION.md for detailed configuration parameters for all supported sources and sinks.
Pipeflow uses tokio::sync::broadcast channels to connect nodes that can emit messages
(sources/transforms). You can tune the broadcast capacity via output_buffer_size.
system:
output_buffer_size: 1024 # broadcast channel capacity for sources/transforms
Notes:
output_buffer_size per source.Lagged.The source::system::dlq source is implemented and can be wired to any sink. Chain-depth protection
prevents infinite loops when messages are routed through system sources (max depth: 8).
Current status:
source::system::dlq source: ImplementedSee docs/DESIGN.md for the full design.
# Run pipeline
pipeflow run config.yaml
# Validate configuration
pipeflow config validate config.yaml
# Show pipeline graph (ASCII)
pipeflow config graph config.yaml
# Show merged + normalized configuration
pipeflow config show config.yaml --format yaml
Notes:
pipeflow config validate checks YAML structure and pipeline wiring (IDs, references, cycles,
system routing). It does not validate node-specific config contents (e.g. required
http_client.url); those are validated during Engine::build() (and therefore pipeflow run).config show displays the merged + normalized result.Pipeflow is stand-alone by design.
To keep the architecture simple and robust (KISS principle), Pipeflow does not implement complex distributed coordination protocols (like Raft or Paxos).
./data by default).
We have removed complex distributed backends like Redis for silence to favor simplicity and filesystem atomicity.data_dir.See docs/DESIGN.md for detailed design documentation.
# Unit + integration tests
cargo test --all-features
# Lint (clippy)
cargo clippy --all-targets --all-features -- -D warnings
# Format check
cargo fmt --all -- --check
MIT