| Crates.io | feedme |
| lib.rs | feedme |
| version | 0.2.0 |
| created_at | 2025-12-20 09:58:21.30948+00 |
| updated_at | 2025-12-20 17:28:14.678477+00 |
| description | Deterministic streaming data pipeline with mechanical guarantees for Rust |
| homepage | https://github.com/micha/feedme |
| repository | https://github.com/micha/feedme |
| max_upload_size | |
| id | 1996292 |
| size | 2,998,217 |
FeedMe is a deterministic, linear, streaming ingest pipeline with mechanical guarantees around memory, ordering, and failure.
FeedMe is a high-performance, streaming data pipeline engine for Rust applications. It provides a linear, deterministic processing model with bounded resource usage, explicit error handling, and comprehensive observability.
Perfect for ETL, log processing, data cleaning, and real-time ingestion pipelines.
FeedMe provides these mechanical guarantees:
FeedMe is intentionally not these things, and that's by design:
FeedMe runs in a single process with no networking or cluster management.
Stages are deterministic: same input + config → same output. No hidden state or concurrency.
Processing is synchronous by default. Async is an implementation detail for I/O stages.
No embedded languages or required config files. Code-first with optional YAML support.
No long-running services or auto-restart. FeedMe is a library you embed in your application.
This focus enables FeedMe's core guarantees while keeping the codebase small and maintainable.
FeedMe enforces mechanical behavioral guarantees that are tested via runtime assertions and contract tests. These invariants ensure reliability and prevent regressions.
Add this to your Cargo.toml:
[dependencies]
feedme = "0.1"
serde_json = "1.0"
regex = "1.0"
Or install via cargo:
cargo add feedme serde_json regex
Here's a 12-line pipeline that:
use feedme::{
Pipeline, FieldSelect, RequiredFields, StdoutOutput, Deadletter,
PIIRedaction, Filter, InputSource, Stage
};
use std::path::PathBuf;
fn main() -> anyhow::Result<()> {
// Create pipeline: select fields → redact PII → require fields → filter → output
let mut pipeline = Pipeline::new();
pipeline.add_stage(Box::new(FieldSelect::new(vec![
"timestamp".into(), "level".into(), "message".into(), "email".into()
])));
pipeline.add_stage(Box::new(PIIRedaction::new(vec!["email".into()])));
pipeline.add_stage(Box::new(RequiredFields::new(vec!["level".into()])));
pipeline.add_stage(Box::new(Filter::new(Box::new(|event| {
event.get("level").and_then(|v| v.as_str()) != Some("debug")
}))));
pipeline.add_stage(Box::new(StdoutOutput::new()));
// Deadletter for errors
let mut deadletter = Deadletter::new(PathBuf::from("errors.ndjson"));
// Process input file
let mut input = InputSource::File(PathBuf::from("input.ndjson"));
input.process_input(&mut pipeline, &mut Some(&mut deadletter))?;
// Export final metrics
println!("Pipeline complete. Metrics:");
for metric in pipeline.export_json_logs() {
println!("{}", serde_json::to_string(&metric)?);
}
Ok(())
}
Input (input.ndjson):
{"timestamp":"2023-10-01T10:00:00Z","level":"info","message":"User logged in","email":"user@example.com"}
{"level":"debug","message":"Debug info"}
{"message":"Missing level"}
Output (stdout):
{"timestamp":"2023-10-01T10:00:00Z","level":"info","message":"User logged in","email":"[REDACTED]"}
Deadletter (errors.ndjson):
{"error":{"stage":"RequiredFields","code":"MISSING_FIELD","message":"Required field 'level' is missing"},"raw":"{\"message\":\"Missing level\"}"}
Metrics (JSON logs):
{"metric":"events_processed","value":2}
{"metric":"events_dropped","value":1}
{"metric":"errors","value":1}
{"metric":"stage_latencies","stage":"FieldSelect","count":3,"sum":0.05,"min":0.01,"max":0.02}
...
input.process_input(&mut pipeline, &mut deadletter)?;
println!("Example finished — metrics: {:?}", pipeline.export_json_logs());
Ok(())
}
## Determinism Verification
> **Determinism is a core guarantee** — identical runs produce identical outputs.
FeedMe guarantees deterministic output for identical inputs. Verify this with:
```bash
cargo run --example 09_complex_pipeline > run1.out
cargo run --example 09_complex_pipeline > run2.out
# On Unix: sha256sum run1.out run2.out
# On Windows: certutil -hashfile run1.out SHA256 && certutil -hashfile run2.out SHA256
The hashes should match, proving deterministic behavior.
Given messy.ndjson:
{"timestamp":"2023-10-01T10:00:00Z","level":"info","message":"User logged in","email":"user@example.com"}
{"level":"info","message":"Missing timestamp"}
{invalid json}
Run:
cargo run --example 01_redact_validate_deadletter
Processed output (samples/processed.ndjson):
{"timestamp":"2023-10-01T10:00:00Z","level":"info","message":"User logged in","email":"[REDACTED]"}
Deadletter (errors logged with context):
{"error":{"stage":"Input_File","code":"PARSE_ERROR","message":"expected value at line 1 column 1"},"raw":"{invalid json}"}
Metrics:
{"metric":"events_processed","value":1}
{"metric":"events_dropped","value":1}
{"metric":"errors","value":1}
{"metric":"stage_latencies","stage":"PIIRedaction","count":1,"sum":0.1,"min":0.1,"max":0.1}
FeedMe includes 12 comprehensive examples covering:
Run any example:
cargo run --example <number>_<description>
See examples/ for the full list.
Full documentation: docs.rs/feedme
FeedMe is designed for high-throughput, low-latency data processing:
FeedMe enforces mechanical behavioral guarantees that are tested via runtime assertions and contract tests. These invariants ensure reliability and prevent regressions. Key guarantees include:
See src/ppt_invariant_contracts.rs for the complete contract test suite.
pub trait Stage {
fn execute(&mut self, event: Event) -> Result<Option<Event>, PipelineError>;
fn name(&self) -> &str;
fn is_output(&self) -> bool { false } // true if consumes event
}
Some(event): Pass to next stageNone: Drop (filtered); if is_output(), consumedErr: Stop pipeline, error with attributionPipeline: Add stages, process events, export metricsEvent: JSON data + optional metadataInputSource: Stream from stdin/file/directoryPipelineError: Categorized errors (Parse/Transform/Validation/Output/System)We welcome contributions! Please see our Contributing Guide for details.
git clone https://github.com/Michael-A-Kuykendall/feedme.git
cd feedme
cargo build
cargo test
FeedMe guarantees deterministic output for identical inputs. Verify this with:
cargo run --example 09_complex_pipeline > run1.out
cargo run --example 09_complex_pipeline > run2.out
# On Unix: sha256sum run1.out run2.out
# On Windows: certutil -hashfile run1.out SHA256 && certutil -hashfile run2.out SHA256
The hashes should match, proving deterministic behavior.
This project follows a code of conduct to ensure a welcoming environment for all contributors. See CODE_OF_CONDUCT.md for details.
FeedMe is supported by our amazing sponsors. See SPONSORS.md for details.
Licensed under the MIT License. See LICENSE for details.
Made with ❤️ by Michael Kuykendall