| Crates.io | sinktools |
| lib.rs | sinktools |
| version | 0.0.1 |
| created_at | 2025-11-25 22:48:43.610315+00 |
| updated_at | 2025-11-25 22:48:43.610315+00 |
| description | Extra sink adaptors and helpers. |
| homepage | |
| repository | https://github.com/hydro-project/hydro |
| max_upload_size | |
| id | 1950588 |
| size | 99,299 |
Extra [Sink] adaptors and functions.
SinkBuild]For an intuitive API that matches the data flow direction, use [SinkBuilder] and the [SinkBuild] trait to chain
adaptors in forward order:
use sinktools::{SinkBuilder, SinkBuild};
use sinktools::sink::SinkExt; // `futures::sink::SinkExt` for `.send(_).await`
# #[tokio::main(flavor = "current_thread")]
# async fn main() {
// Forward chain: flatten -> filter_map -> map -> filter -> for_each
let mut pipeline = SinkBuilder::<Vec<i32>>::new()
.flatten::<Vec<i32>>() // Flatten input vectors
.filter_map(|x: i32| { // Double evens, filter odds
if x % 2 == 0 {
Some(x * 2)
} else {
None
}
})
.map(|x| x + 1) // Add 1
.filter(|x| *x < 100) // Only values < 100
.for_each(|x: i32| { // Terminal operation
println!("Received: {}", x);
});
// Send nested data
pipeline.send(vec![1, 2, 3, 4]).await.unwrap();
pipeline.send(vec![5, 6]).await.unwrap();
pipeline.send(vec![]).await.unwrap();
pipeline.send(vec![7, 8, 9]).await.unwrap();
# }
Alternatively, you can construct sink adaptors directly using their new methods:
use sinktools::{for_each, filter, map, filter_map, flatten};
use sinktools::sink::SinkExt; // for `.send(_).await`
# #[tokio::main(flavor = "current_thread")]
# async fn main() {
// Build the same chain from inside out: sink <- filter <- map <- filter_map <- flatten
let sink = for_each(|x: i32| {
println!("Received: {}", x);
});
let filter_sink = filter(|x: &i32| *x < 100, sink);
let map_sink = map(|x: i32| x + 1, filter_sink);
let filter_map_sink = filter_map(|x: i32| {
if x % 2 == 0 {
Some(x * 2)
} else {
None
}
}, map_sink);
let mut complex_sink = flatten::<Vec<i32>, _>(filter_map_sink);
// Send nested data
complex_sink.send(vec![1, 2, 3, 4]).await.unwrap();
complex_sink.send(vec![5, 6]).await.unwrap();
complex_sink.send(vec![]).await.unwrap();
complex_sink.send(vec![7, 8, 9]).await.unwrap();
# }
Each adaptor also provides a new_sink method which ensures the construction is correct for Sink to be implemented,
but may require additional type arguments to aid inference.
The forward SinkBuilder API is more intuitive for direct users, as it matches the data flow direction. Direct construction is
better for generated code as it aids compiler type inference.