Crates.io | data-pipeline-rs |
lib.rs | data-pipeline-rs |
version | 0.1.0 |
source | src |
created_at | 2024-10-11 23:05:11.620818 |
updated_at | 2024-10-11 23:05:11.620818 |
description | Data processing pipelines |
homepage | |
repository | |
max_upload_size | |
id | 1405955 |
size | 26,596 |
This crate has utilities for connecting different types of pipeline nodes together where each can perform an operation on some data. It includes facilities for building pipelines of nodes and enforcing different kinds of access for different handlers.
The core element is a Node
. A node has a name (for user-friendliness when
gathering stats, etc.), a data handler, an optional next Node
and an optional
previous Node
. It also gathers statistics about the data that has passed
through it. The Node
type is responsible for managing the flow of data, but
does not operate on the data in any way.
By default, nodes track a number of stats:
Inside every Node
is a data handler which will operate on the data in some
way. There are different categories of data handlers:
Each data handler category has a corresponding trait that can be implemented by your types to perform some operation.
Data handlers can define their own stats that can be tracked in addition to the
ones tracked by Node
.
To write a data handler:
Into<SomeDataHandler>
for your type (or use the macro)
// The handler type
#[derive(Default)]
struct DataLogger {
num_logs: u32,
}
// Implement the `DataObserver` trait: we just need to observe the data,
// not change it
impl<T> DataObserver<T> for DataLogger
where
T: Debug,
{
fn observe(&mut self, data: &T) {
self.num_logs += 1;
println!("{data:?}")
}
// Only need to implement this if your node has its own stats
fn get_stats(&self) -> Option<serde_json::Value> {
Some(json!({
"num_logs": self.num_logs,
}))
}
}
// Enable conversion of your type into the `SomeDataHandler` enum
impl<T> Into<SomeDataHandler<T>> for DataLogger
where
T: Debug,
{
fn into(self) -> SomeDataHandler<T> {
SomeDataHandler::Observer(Box::new(self))
}
}
// The handler type
struct DataDoubler;
// Implement the `DataTransformer` trait since we want to change the value.
// Here we're just going to operate on u32s
impl DataObserver<u32> for DataDoubler
{
fn transform(&mut self, data: u32) -> Result<T> {
Ok(data * 2)
}
}
// Enable conversion of your type into the `SomeDataHandler` enum
impl Into<SomeDataHandler<u32>> for DataDoubler
{
fn into(self) -> SomeDataHandler<T> {
SomeDataHandler::Transformer(Box::new(self))
}
}
A pipeline is a set of Node
s that are connected together. Pipelines do not
exist as a "persistent" type, but the PipelineBuilder
does make it easy to
connect Node
s together. The output of the PipelineBuilder::build
method is
the first Node
of the pipeline.
let pipeline = PipelineBuilder::new()
.demux(
"odd/even demuxer",
StaticDemuxer::new(vec![
ConditionalPath {
predicate: Box::new(|num: &u32| num % 2 == 0),
next: PipelineBuilder::new()
.attach(Node::new("1a", DataLogger))
.attach(Node::new("2a", DataLogger))
.attach(Node::new("3a", DataLogger))
.build(),
},
ConditionalPath {
predicate: Box::new(|num: &u32| num % 2 == 1),
next: PipelineBuilder::new()
.attach(Node::new("1b", DataLogger))
.attach(Node::new("2b", DataLogger))
.attach(Node::new("3b", DataLogger))
.build(),
},
]),
)
.build();
Nodes support the concept of "visitors" to allow walking a pipeline and collecting information, like stats. The StatsNodeVisitor
type is already defined, but custom visitors could be written as well.