| Crates.io | pipe-it |
| lib.rs | pipe-it |
| version | 0.2.5 |
| created_at | 2026-01-23 13:56:39.107202+00 |
| updated_at | 2026-01-25 14:17:49.095584+00 |
| description | A lightweight, type-safe library for building linear and concurrent processing pipelines in Rust. |
| homepage | |
| repository | https://github.com/Jw-23/pipe-it |
| max_upload_size | |
| id | 2064554 |
| size | 82,530 |
Pipe-it is a lightweight, type-safe library for building linear and concurrent processing pipelines in Rust. It draws inspiration from the API style of the Bevy framework, using Res and ResMut to access shared resources, but with a simpler implementation.
Res, ResMut) or inputs (Input) directly in your function signatures.Shared container for managing global or scoped application state.Cargo dependency
pipe-it = "0.2"
You can chain handlers (functions) together using .pipe() and .connect().
use pipe_it::{Context, Input, ext::HandlerExt};
// Step 1: Takes an integer, returns an integer
async fn double(num: Input<i32>) -> i32 {
*num * 2
}
// Step 2: Takes an integer, returns a string
async fn to_string(num: Input<i32>) -> String {
format!("Value is: {}", *num)
}
#[tokio::main]
async fn main() {
// Define the pipeline: double -> to_string
// The output of 'double' (i32) becomes the input context for 'to_string'
let pipeline = double.pipe().connect(to_string);
// execute
let ctx = Context::empty(10); // Initial input: 10
let result = pipeline.apply(ctx).await;
println!("{}", result); // Output: "Value is: 20"
}
Map Context:use invoke to context pass to pipline and generate new context.
use crate::{Context, Input, Res, ResMut, Shared};
#[derive(Debug, Clone)]
struct Counter {
c: i32,
}
#[tokio::test]
// test context invoke api
async fn test_chain_invoke() {
let ctx = Context::new(3, Shared::new().insert(Counter { c: 1 }));
ctx.invoke(async |x: Input<i32>, mut counter: ResMut<Counter>| {
counter.c += 1;
*x + 1
})
.await
.invoke(async |x: Input<i32>, counter: Res<Counter>| *x + counter.c)
.await
.invoke(async |x:Input<i32>|assert_eq!(*x,6))
.await;
}
You can inject shared state into any handler using Res (read-only) and ResMut (mutable).
use pipe_it::{Context, Shared, Input, ResMut, ext::HandlerExt};
#[derive(Default)]
struct AccessLog {
count: usize,
}
// This handler updates the shared log AND processes the input
async fn log_and_square(input: Input<i32>, mut log: ResMut<AccessLog>) -> i32 {
log.count += 1;
println!("Log count: {}", log.count);
*input * *input
}
#[tokio::main]
async fn main() {
// 1. Setup shared state
let shared = Shared::new()
.insert(AccessLog::default());
// 2. Create context with input and shared state
let ctx = Context::new(5, shared);
// 3. Run the pipeline
let result = log_and_square.pipe().apply(ctx).await;
assert_eq!(result, 25);
}
You can decouple handler definition from usage by assigning string tags to functions using the #[node] macro, and referencing them with the tag!() macro.
use pipe_it::{Context, Input, ext::HandlerExt, node, tag};
// 1. Define a handler with a unique tag
#[node("greet")]
async fn greet(name: Input<String>) -> String {
format!("Hello, {}!", *name)
}
#[tokio::main]
async fn main() {
// 2. Create a pipeline using the tag instead of the function name
let pipeline = tag!("greet").pipe();
let ctx = Context::empty("World".to_string());
let result = pipeline.apply(ctx).await;
println!("{}", result); // Output: "Hello, World!"
}
You can use tuples (A, B, ...) to compose multiple pipelines that return PResult. The framework attempts execution sequentially until one succeeds (returns Ok).
use pipe_it::{Context, Input, PResult, PipelineError, ext::HandlerExt};
// Try to parse string to int
async fn parse_int(s: Input<String>) -> PResult<i32> {
// Hint: Use try_unwrap() to attempt acquiring ownership of String to avoid cloning
s.parse().map_err(|_| PipelineError::Fatal { msg: "Not an int".into() })
}
// Fallback: return default value 0
async fn default_zero(_: Input<String>) -> PResult<i32> {
Ok(0)
}
#[tokio::main]
async fn main() {
// Create branching pipeline: Try parse_int first, if it fails, run default_zero
let pipeline = (parse_int.pipe(), default_zero.pipe());
let ctx = Context::empty("not number".to_string());
let result = pipeline.apply(ctx).await;
assert_eq!(result.unwrap(), 0);
}
You can define functions that return a constructed Pipeline. This is useful for encapsulating complex logic or creating reusable middleware.
use pipe_it::{Context, Input, Pipeline, ext::HandlerExt};
async fn step_a(n: Input<i32>) -> i32 { *n + 1 }
async fn step_b(n: Input<i32>) -> String { n.to_string() }
// Encapsulate a chain of handlers into a single function
fn common_logic() -> impl Pipeline<i32, String> {
step_a.pipe().connect(step_b)
}
#[tokio::main]
async fn main() {
let pipe = common_logic();
let result = pipe.apply(Context::empty(10)).await;
println!("{}", result); // "11"
}
Using try_unwrap can be dangerous in certain environments, such as tuple branches. Because each branch needs to attempt to access the data, if one branch takes ownership (occupies the data), subsequent branches will fail to access it.
We compared the performance of a 5-step pipeline processing a Vec<i32> of 10,000 items.
| Implementation | Time (per iter) | Overhead vs Raw | Note |
|---|---|---|---|
| Raw Rust (Move) | 1.02 µs | - | Baseline (Zero Copy) |
| Pipe-it (try_unwrap) | 1.09 µs | +7% | Recommended |
| Pipe-it (Clone) | 5.22 µs | +411% | Default usage |
Conclusion: By using
try_unwrap()to acquire ownership, the framework overhead is negligible (~0.07 µs), achieving near-native performance (5x faster than cloning).
Recommendation: Acquire ownership via try_unwrap in safe (linear) workflows to avoid cloning costs.
Handler: Any async function that takes arguments implementing FromContext.Context<I>: Holds the current input I and Shared resources.Input<I>: A wrapper to extract the current input from the context.Res<T> / ResMut<T>: Wrappers to extract shared resources from the context.HandlerExt).pipe(): Converts a handler into a Pipeline..connect(next_handler): Chains two pipelines linearly..map(inner_pipeline): (If available) Processes Vec<I> inputs concurrently.