Crates.io | data-proc |
lib.rs | data-proc |
version | 0.1.0 |
created_at | 2025-09-25 15:42:32.354268+00 |
updated_at | 2025-09-25 15:42:32.354268+00 |
description | A data processing pipeline framework with Source, Transformer, and Sink traits |
homepage | |
repository | https://github.com/max-zhang/data-proc |
max_upload_size | |
id | 1854783 |
size | 75,932 |
A lightweight, composable data processing pipeline framework for Rust with async support.
data-proc
provides a simple yet powerful abstraction for building data processing pipelines using Rust's async streams. The library is built around three core traits:
This design allows for composable, reusable components that can be combined to build complex data processing workflows.
futures::Stream
for efficient async processingAdd data-proc
to your Cargo.toml
:
[dependencies]
data-proc = "0.1.0"
use data_proc::{Source, Transformer, Sink};
use futures::stream;
use futures::StreamExt;
// Define a simple source that produces numbers
struct NumberSource(Vec<i32>);
impl Source<i32> for NumberSource {
fn into_stream(self) -> impl Stream<Item = i32> {
stream::iter(self.0)
}
}
// Define a transformer that doubles each number
struct Doubler;
impl Transformer<i32, i32> for Doubler {
fn proc<S>(&self, stream: S) -> impl std::future::Future<Output = impl Stream<Item = i32>> + Send
where
S: Stream<Item = i32> + Send,
{
async move {
stream.map(|n| n * 2)
}
}
}
// Define a sink that prints each number
struct PrintSink;
impl Sink<i32> for PrintSink {
fn sink<S>(&self, stream: S) -> impl std::future::Future<Output = Result<(), std::io::Error>> + Send
where
S: Stream<Item = i32> + Send,
{
async move {
futures::pin_mut!(stream);
while let Some(item) = stream.next().await {
println!("Got: {}", item);
}
Ok(())
}
}
}
// Use them together in a pipeline
#[tokio::main]
async fn main() -> std::io::Result<()> {
let source = NumberSource(vec![1, 2, 3, 4, 5]);
let transformer = Doubler;
let sink = PrintSink;
// Connect the components
sink.sink(transformer.proc(source.into_stream()).await).await
}
Check out the parquet-elasticsearch example for a more complex use case that:
pub trait Source<T> {
fn into_stream(self) -> impl Stream<Item = T>;
}
Implement this trait for types that produce data. The into_stream
method consumes the source and returns a stream of items.
pub trait Transformer<T, U> {
fn proc<S>(&self, stream: S) -> impl std::future::Future<Output = impl Stream<Item = U>> + Send
where
S: Stream<Item = T> + Send;
}
Implement this trait for types that transform data. The proc
method takes a stream of input items and returns a future that resolves to a stream of output items.
pub trait Sink<T> {
fn sink<S>(&self, stream: S) -> impl std::future::Future<Output = Result<(), Error>> + Send
where
S: Stream<Item = T> + Send;
}
Implement this trait for types that consume data. The sink
method takes a stream of items and returns a future that resolves to a result indicating success or failure.
You can easily parallelize processing using the buffer_unordered
and for_each_concurrent
methods from the futures
crate:
impl Transformer<Input, Output> for MyTransformer {
fn proc<S>(&self, stream: S) -> impl std::future::Future<Output = impl Stream<Item = Output>> + Send
where
S: Stream<Item = Input> + Send,
{
async move {
stream
.map(|item| async move {
// Process each item asynchronously
process_item(item).await
})
.buffer_unordered(10) // Process up to 10 items concurrently
}
}
}
For robust error handling, you can use the Result
type in your stream items:
impl Source<Result<MyData, MyError>> for MySource {
fn into_stream(self) -> impl Stream<Item = Result<MyData, MyError>> {
// Implementation that can return errors
}
}
This project is licensed under the MIT License - see the LICENSE file for details.