| Crates.io | ironbeam |
| lib.rs | ironbeam |
| version | 1.1.0 |
| created_at | 2025-11-15 18:42:55.523259+00 |
| updated_at | 2025-12-06 14:26:17.595471+00 |
| description | A batch processing clone of Apache Beam in Rust. |
| homepage | |
| repository | https://github.com/nhubbard/ironbeam |
| max_upload_size | |
| id | 1934652 |
| size | 1,032,155 |
A data processing framework for Rust inspired by Apache Beam and Google Cloud Dataflow. Ironbeam provides a declarative API for building batch data pipelines with support for transformations, aggregations, joins, and I/O operations.
map, filter, flat_map, map_batchesgroup_by_key, combine_values, keyed aggregationsAdd to your Cargo.toml:
[dependencies]
ironbeam = "1.0.0"
By default, all features are enabled. To use a minimal configuration:
[dependencies]
ironbeam = { version = "1.0.0", default-features = false }
Available feature flags:
io-jsonl - JSON Lines supportio-csv - CSV supportio-parquet - Parquet supportcompression-gzip - gzip compressioncompression-zstd - zstd compressioncompression-bzip2 - bzip2 compressioncompression-xz - xz compressionparallel-io - parallel I/O operationsmetrics - pipeline metrics collectioncheckpointing - checkpoint and recovery supportuse ironbeam::*;
fn main() -> anyhow::Result<()> {
// Create a pipeline
let p = Pipeline::default();
// Build a word count pipeline
let lines = from_vec(&p, vec![
"hello world".to_string(),
"hello rust".to_string(),
]);
let counts = lines
.flat_map(|line: &String| {
line.split_whitespace()
.map(|w| w.to_string())
.collect::<Vec<_>>()
})
.key_by(|word: &String| word.clone())
.map_values(|_word: &String| 1u64)
.combine_values(Count);
// Execute and collect results
let results = counts.collect_seq()?;
for (word, count) in results {
println!("{}: {}", word, count);
}
Ok(())
}
A Pipeline is the container for your computation graph. Create one with Pipeline::default(), then attach data sources and transformations.
A PCollection<T> represents a distributed collection of elements. Collections are immutable, lazy, and type-safe. Transformations create new collections, and computation happens when you call an execution method like collect_seq() or collect_par().
Stateless operations work on individual elements:
map - transform each elementfilter - keep elements matching a predicateflat_map - transform each element into zero or more outputsmap_batches - process elements in batchesStateful operations work on keyed data:
key_by - convert to a keyed collectionmap_values - transform values while preserving keysgroup_by_key - group values by keycombine_values - aggregate values per keytop_k_per_key - select top K values per keyCombiners provide efficient aggregation. Built-in options include:
Count - count elementsSum - sum numeric valuesMin / Max - find minimum/maximumAverageF64 - compute averagesDistinctCount - count unique valuesTopK - select top K elementsCustom combiners can be implemented via the CombineFn trait.
Keyed collections support all standard join operations:
let joined = left_collection.join_inner(&right_collection)?;
Available methods: join_inner, join_left, join_right, join_full.
Enrich pipelines with auxiliary data:
let lookup = side_hashmap(&p, my_map);
data.map_with_side(&lookup, |elem, map| {
// Use map to enrich elem
})
Execute pipelines in sequential or parallel mode:
let results = collection.collect_seq()?; // Single-threaded
let results = collection.collect_par()?; // Multithreading with Rayon
use ironbeam::*;
use serde::{Deserialize, Serialize};
#[derive(Clone, Serialize, Deserialize)]
struct Record {
id: u32,
name: String,
}
fn main() -> anyhow::Result<()> {
let p = Pipeline::default();
// Read file
let data = read_jsonl::<Record>(&p, "data.jsonl")?;
// Process
let filtered = data.filter(|r: &Record| r.id > 100);
// Write results
filtered.write_jsonl("output.jsonl")?;
Ok(())
}
let data = read_csv::<Record>(&p, "data.csv")?;
data.write_csv("output.csv")?;
let data = read_parquet::<Record>(&p, "data.parquet")?;
data.write_parquet("output.parquet")?;
Compression is automatically detected by file extension:
// Read compressed file
let data = read_jsonl::<Record>(&p, "data.jsonl.gz")?;
// Write compressed file
data.write_jsonl("output.jsonl.zst")?;
Supported extensions: .gz, .zst, .bz2, .xz
Group time-series data into fixed or sliding windows:
let windowed = data
.window_fixed(Duration::from_secs(60))
.group_by_key()
.combine_values(Sum);
Save and restore the pipeline's state:
data.checkpoint("checkpoints/step1")?;
// Later, recover from checkpoint
let recovered = recover_checkpoint::<MyType>(&p, "checkpoints/step1")?;
Collect pipeline execution metrics:
let result = collection.collect_seq()?;
let metrics = p.get_metrics();
println!("Elements processed: {}", metrics.elements_processed);
The examples/ directory contains complete demonstrations:
etl_pipeline.rs - Extract, transform, load workflowadvanced_joins.rs - Join operationswindowing_aggregations.rs - Time-based windowingcombiners_showcase.rs - Using built-in combinerscompressed_io.rs - Working with compressed filescheckpointing_demo.rs - Checkpoint and recoverymetrics_example.rs - Collecting metricstesting_pipeline.rs - Testing patternsRun examples with:
cargo run --example etl_pipeline --features io-jsonl,io-csv
Run tests with:
cargo test
For coverage:
cargo tarpaulin --out Html
This project uses the Rust 2024 edition and requires a recent Rust toolchain.