| Crates.io | fluxion-stream |
| lib.rs | fluxion-stream |
| version | 0.8.0 |
| created_at | 2025-11-16 07:59:42.434874+00 |
| updated_at | 2026-01-13 17:45:21.140642+00 |
| description | Stream combinators with ordering guarantees for async Rust |
| homepage | |
| repository | https://github.com/umbgtt10/fluxion |
| max_upload_size | |
| id | 1935319 |
| size | 1,196,453 |
Part of Fluxion - A reactive stream processing library for Rust
Stream combinators for async Rust with strong temporal-ordering guarantees. This crate provides composable operators and lightweight sequencing utilities designed for correctness and performance in event-driven systems.
fluxion-stream is a collection of reactive stream operators that maintain temporal ordering across asynchronous operations. Unlike standard stream combinators, all operators in this crate respect the intrinsic ordering of items (via timestamps, sequence numbers, or other ordering mechanisms), ensuring correct temporal sequencing even when events arrive out of order.
Use this crate when:
Timestamped traitStreamItem<T> enumFluxion uses two traits for temporal ordering:
Provides read-only access to timestamp values:
pub trait HasTimestamp {
type Timestamp: Ord + Copy + Send + Sync + core::fmt::Debug;
fn timestamp(&self) -> Self::Timestamp; // Get the timestamp for ordering
}
Extends HasTimestamp with an Inner type and construction/deconstruction capabilities:
pub trait Timestamped: HasTimestamp {
type Inner: Clone;
fn with_timestamp(value: Self::Inner, timestamp: Self::Timestamp) -> Self;
fn into_inner(self) -> Self::Inner;
}
When to use each:
HasTimestamp for types that only need to expose a timestamp (read-only)Timestamped for wrapper types that construct timestamped valuesHasTimestamp for orderingImplementations:
Sequenced<T> - Test utility from fluxion-test-utils using monotonically growing sequence numbersHasTimestamp for your types (e.g., events with built-in timestamps)Temporal ordering means items are processed based on their intrinsic timestamp, not arrival time:
// Stream 1 sends: [timestamp=2, value=B]
// Stream 2 sends: [timestamp=1, value=A]
// Merged output: [timestamp=1, value=A], [timestamp=2, value=B] ✓ Correct temporal order
How it works:
timestamp() value (std::time::Instant, u64 counter, etc.)timestamp() valueWhen to use:
All operators use StreamItem<T> for structured error handling:
pub enum StreamItem<T> {
Value(T), // Successful value
Error(FluxionError), // Error (lock failures, processing errors, etc.)
}
Error handling patterns:
// Pattern 1: Unwrap (panic on error)
let value = stream.next().await.unwrap().unwrap();
// Pattern 2: Filter errors
let values = stream
.filter_map(|item| async move { item.ok() })
.collect().await;
// Pattern 3: Handle explicitly
match stream.next().await {
Some(StreamItem::Value(v)) => process(v),
Some(StreamItem::Error(e)) => log_error(e),
None => break,
}
See Error Handling Guide for comprehensive patterns.
combine_latestCombines multiple streams, emitting when any stream emits (after all have emitted once).
Use case: Dashboard combining data from multiple sources
use fluxion_stream::CombineLatestExt;
let combined = stream1.combine_latest(
vec![stream2, stream3],
|state| state.values().len() == 3 // Emit when all present
);
Behavior:
Full documentation | Tests | Benchmarks
with_latest_fromSamples secondary streams only when primary stream emits.
Use case: User actions enriched with latest configuration/state
use fluxion_stream::WithLatestFromExt;
let enriched = user_clicks.with_latest_from(
vec![config_stream, state_stream],
|combined| combined.is_complete(),
|_primary, secondary| secondary.clone()
);
Behavior:
Full documentation | Tests | Benchmarks
ordered_mergeMerges multiple streams preserving temporal order.
Use case: Event log from multiple services
use fluxion_stream::OrderedStreamExt;
let merged = stream1.ordered_merge(vec![stream2, stream3]);
Behavior:
timestamp() valueFull documentation | Tests | Benchmarks
merge_withStateful merging of multiple streams with shared state.
Use case: Repository pattern, event sourcing, aggregating events into domain state
use fluxion_stream::MergedStream;
use fluxion_test_utils::Sequenced;
struct Repository {
users: HashMap<UserId, User>,
orders: HashMap<OrderId, Order>,
}
let merged = MergedStream::seed::<Sequenced<Event>>(Repository::new())
.merge_with(user_stream, |event, repo| {
repo.users.insert(event.user_id, event.user);
Event::UserAdded(event.user_id)
})
.merge_with(order_stream, |event, repo| {
repo.orders.insert(event.order_id, event.order);
Event::OrderCreated(event.order_id)
})
.into_fluxion_stream();
Behavior:
ordered_merge internally)merge_with call adds a new stream to the mergeinto_fluxion_stream()Key Features:
merge_with callsseed()Full documentation | Tests | Benchmarks
start_withPrepend initial values to a stream.
Use case: Provide default/placeholder values, seed initial state
use fluxion_stream::{IntoFluxionStream, StartWithExt};
use fluxion_core::StreamItem;
use fluxion_test_utils::Sequenced;
let (tx, rx) = futures::channel::mpsc::unbounded();
let with_defaults = rx.into_fluxion_stream()
.start_with(vec![
StreamItem::Value(Sequenced::new(0)),
StreamItem::Value(Sequenced::new(1)),
]);
tx.send(Sequenced::new(2)).unwrap();
tx.send(Sequenced::new(3)).unwrap();
// Output: 0, 1, 2, 3
Behavior:
StreamItem::Error for testing error handlingFull documentation | Tests | Benchmarks
emit_whenGates source emissions based on filter stream conditions.
Use case: Only emit sensor data when system is active
use fluxion_stream::EmitWhenExt;
let gated = source.emit_when(
filter_stream,
|filter_value| *filter_value > 0 // Predicate for gating
);
Behavior:
Full documentation | Tests | Benchmarks
take_latest_whenSamples source when filter condition is met.
Use case: Capture latest sensor reading on user request
use fluxion_stream::TakeLatestWhenExt;
let sampled = source.take_latest_when(
trigger_stream,
|trigger| *trigger == true
);
Behavior:
Full documentation | Tests | Benchmarks
sample_ratioProbabilistic downsampling with configurable ratio.
Use case: Load reduction, logging sampling, monitoring downsampling
use fluxion_stream::SampleRatioExt;
// Sample approximately 10% of items
let sampled = stream.sample_ratio(0.1, fastrand::u64(..));
// For testing with deterministic seed
let sampled = stream.sample_ratio(0.5, 42);
Behavior:
0.0 (emit nothing) to 1.0 (emit all)Full documentation | Tests | Benchmarks
take_while_withEmits while condition holds, terminates when false.
Use case: Process events until shutdown signal
use fluxion_stream::TakeWhileExt;
let bounded = source.take_while_with(
condition_stream,
|condition| *condition == true
);
Behavior:
Full documentation | Tests | Benchmarks
scan_orderedAccumulates state across stream items, emitting intermediate results.
Use case: Running totals, moving averages, state machines, building collections over time
use fluxion_stream::ScanOrderedExt;
use fluxion_test_utils::Sequenced;
let sums = stream.scan_ordered::<Sequenced<i32>, _, _>(0, |acc, val| {
*acc += val;
*acc
});
Behavior:
Common Patterns:
Full documentation | Tests | Benchmarks
combine_with_previousPairs each value with the previous value.
Use case: Detect value changes or calculate deltas
use fluxion_stream::CombineWithPreviousExt;
let pairs = stream.combine_with_previous();
// Output: WithPrevious { previous: Some(1), current: 2 }
Behavior:
previous = Noneprevious = Some(prev)Full documentation | Tests | Benchmarks
window_by_countBatches stream items into fixed-size windows.
Use case: Batch processing, aggregating metrics, reducing API calls
use fluxion_stream::WindowByCountExt;
let windowed = stream.window_by_count(3);
// Emits: vec![item1, item2, item3], vec![item4, item5, item6], ...
Behavior:
Full documentation | Tests | Benchmarks
map_orderedMaps values while preserving ordering wrapper.
let mapped = stream.map_ordered(|x| x * 2);
Full documentation | Tests | Benchmarks
filter_orderedFilters values while preserving ordering wrapper.
let filtered = stream.filter_ordered(|x| *x > 10);
Full documentation | Tests | Benchmarks
take_itemsEmit only the first N items then complete.
Use case: Pagination, limiting results, testing
use fluxion_stream::{IntoFluxionStream, TakeItemsExt};
let limited = stream.take_items(10);
// Emits first 10 items, then completes
Behavior:
on_error() first to filter errors)Full documentation | Tests | Benchmarks
skip_itemsSkip the first N items, emit all remaining.
Use case: Pagination (skip offset), ignoring warmup data
use fluxion_stream::{IntoFluxionStream, SkipItemsExt};
let after_skip = stream.skip_items(5);
// Discards first 5 items, emits all subsequent items
Behavior:
on_error() first to filter errors)Full documentation | Tests | Benchmarks
distinct_until_changedSuppresses consecutive duplicate values.
Full documentation | Tests | Benchmarks
distinct_until_changed_byCustom duplicate suppression with comparison function.
Use case: Field comparison, case-insensitive matching, threshold filtering, custom equality
Behavior:
PartialEq requirement on inner typetrue if values considered equal (filtered)sort_by, dedup_by, max_byFull documentation | Tests | Benchmarks
tapPerform side-effects without transforming items.
Use case: Debugging, logging, metrics collection, tracing
use fluxion_stream::{IntoFluxionStream, TapExt};
let pipeline = rx.into_fluxion_stream()
.tap(|x| println!("Input: {:?}", x))
.map_ordered(|x| Sequenced::new(x.into_inner() * 2))
.tap(|x| println!("After map: {:?}", x));
Behavior:
Full documentation | Tests | Benchmarks
on_errorSelectively consume or propagate errors using the Chain of Responsibility pattern.
Use case: Logging errors, metrics collection, conditional error recovery
use fluxion_stream::OnErrorExt;
let handled = stream
.on_error(|err| {
if err.to_string().contains("validation") {
log::warn!("Validation error: {}", err);
true // Consume validation errors
} else {
false // Propagate other errors
}
})
.on_error(|_| {
metrics::increment("errors");
true // Catch-all
});
Behavior:
true to consume error (removes StreamItem::Error)false to propagate error downstreamon_error calls can be chainedFull documentation | Tests | Specification
partitionSplits a stream into two based on a predicate.
Use case: Error routing, priority queues, type routing, threshold filtering
use fluxion_stream::{IntoFluxionStream, PartitionExt};
use fluxion_test_utils::Sequenced;
use futures::StreamExt;
let (tx, rx) = futures::channel::mpsc::unbounded();
// Partition numbers into even and odd
let (mut evens, mut odds) = rx.into_fluxion_stream()
.partition(|n: &i32| n % 2 == 0);
tx.send(Sequenced::new(1)).unwrap();
tx.send(Sequenced::new(2)).unwrap();
tx.send(Sequenced::new(3)).unwrap();
tx.send(Sequenced::new(4)).unwrap();
drop(tx);
// evens: 2, 4
// odds: 1, 3
Behavior:
Full documentation | Tests | Benchmarks
shareConvert a cold stream into a hot, multi-subscriber broadcast source.
Use case: Share expensive computations across multiple consumers
use fluxion_stream::{IntoFluxionStream, ShareExt, FilterOrderedExt, MapOrderedExt};
use fluxion_test_utils::Sequenced;
let (tx, rx) = futures::channel::mpsc::unbounded::<Sequenced<i32>>();
// Source operators run ONCE
let source = rx.into_fluxion_stream()
.map_ordered(|x: Sequenced<i32>| Sequenced::new(x.into_inner() * 2));
// Share among multiple subscribers
let shared = source.share();
// Each subscriber chains independently
let evens = shared.subscribe().unwrap()
.filter_ordered(|x| x.into_inner() % 2 == 0);
let strings = shared.subscribe().unwrap()
.map_ordered(|x: Sequenced<i32>| Sequenced::new(x.into_inner().to_string()));
Behavior:
subscribe() to create independent subscriber streamsFull documentation | Tests | Benchmarks
| Operator | Triggers On | Output | Best For |
|---|---|---|---|
combine_latest |
Any stream emits | Latest from all streams | Dashboards, state aggregation |
with_latest_from |
Primary emits | Primary + context | Enriching events with state |
merge_with |
Any stream emits | Transformed via state | Repository pattern, event sourcing |
| Operator | Output | Ordering | Best For |
|---|---|---|---|
ordered_merge |
Every item | Temporal | Event logs, audit trails |
combine_with_previous |
Pairs (prev, curr) | Temporal | Change detection, deltas |
scan_ordered |
Accumulated state | Temporal | Running totals, state machines |
| Operator | Buffering | Termination | Best For |
|---|---|---|---|
emit_when |
Yes (buffers when gated) | Source completes | Conditional processing |
take_latest_when |
No (only latest) | Source completes | Sampling, snapshots |
take_while_with |
No | First false | Bounded processing |
sample_ratio |
No | Source completes | Load reduction, logging sampling |
| Operator | Comparison | Requirement | Best For |
|---|---|---|---|
distinct_until_changed |
PartialEq |
Inner type must implement PartialEq |
Simple duplicate suppression |
distinct_until_changed_by |
Custom function | No trait requirements | Field comparison, case-insensitive, threshold-based |
| Operator | Consumes Errors | Enables Side Effects | Propagation Control | Best For |
|---|---|---|---|---|
on_error |
Selective | Yes (logging, metrics) | Handler-controlled | Layered error handling, monitoring |
| Operator | Transforms Data | Side Effects | Best For |
|---|---|---|---|
tap |
No (pass-through) | Yes (logging, metrics) | Debugging pipelines, tracing, metrics |
| Operator | Late Subscribers | Source Execution | Best For |
|---|---|---|---|
share |
Miss past items | Once (broadcast) | Sharing expensive computations, fan-out |
| Operator | Outputs | Routing | Best For |
|---|---|---|---|
partition |
Two streams | By predicate | Error routing, priority queues, threshold filtering |
Add to your Cargo.toml:
[dependencies]
fluxion-stream = "0.5"
fluxion-core = "0.5"
tokio = { version = "1", features = ["full"] }
futures = "0.3"
Basic usage:
use fluxion_stream::{IntoFluxionStream, OrderedStreamExt};
use fluxion_test_utils::Sequenced;
use futures::StreamExt;
#[tokio::main]
async fn main() {
// Create channels
let (tx1, rx1) = futures::channel::mpsc::unbounded();
let (tx2, rx2) = futures::channel::mpsc::unbounded();
// Create streams
let stream1 = rx1.into_fluxion_stream();
let stream2 = rx2.into_fluxion_stream();
// Merge in temporal order
let mut merged = stream1.ordered_merge(vec![stream2]);
// Send values (out of order)
tx2.send(Sequenced::with_sequence(100, 1)).unwrap();
tx1.send(Sequenced::with_sequence(200, 2)).unwrap();
// Receive in temporal order
let first = merged.next().await.unwrap().unwrap();
assert_eq!(first.value, 100); // seq=1 emitted first
}
use fluxion_stream::{IntoFluxionStream, CombineLatestExt};
use fluxion_test_utils::Sequenced;
use futures::StreamExt;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let (cpu_tx, cpu_rx) = futures::channel::mpsc::unbounded::<Sequenced<i32>>();
let (mem_tx, mem_rx) = futures::channel::mpsc::unbounded();
let cpu_stream = cpu_rx.into_fluxion_stream();
let mem_stream = mem_rx.into_fluxion_stream();
let mut dashboard = cpu_stream.combine_latest(
vec![mem_stream],
|state| state.values().len() == 2
);
// Send metrics
cpu_tx.send(Sequenced::with_sequence(45, 1)).unwrap();
mem_tx.send(Sequenced::with_sequence(78, 2)).unwrap();
// Get combined state
if let Some(item) = dashboard.next().await {
let state = item.unwrap();
let values = state.get().values();
println!("CPU: {}%, Memory: {}%", values[0], values[1]);
}
Ok(())
}
use fluxion_stream::{IntoFluxionStream, EmitWhenExt};
use fluxion_test_utils::Sequenced;
use futures::StreamExt;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let (data_tx, data_rx) = futures::channel::mpsc::unbounded();
let (gate_tx, gate_rx) = futures::channel::mpsc::unbounded();
let data = data_rx.into_fluxion_stream();
let gate = gate_rx.into_fluxion_stream();
let mut gated = data.emit_when(gate, |open| *open);
// Send data while gate is closed
data_tx.send(Sequenced::with_sequence(1, 1)).unwrap();
data_tx.send(Sequenced::with_sequence(2, 2)).unwrap();
gate_tx.send(Sequenced::with_sequence(false, 3)).unwrap();
// Open gate - buffered items released
gate_tx.send(Sequenced::with_sequence(true, 4)).unwrap();
// Items 1 and 2 are now emitted
let first = gated.next().await.unwrap().unwrap();
assert_eq!(first.value, 1);
Ok(())
}
use fluxion_stream::{IntoFluxionStream, ScanOrderedExt};
use fluxion_test_utils::Sequenced;
use futures::StreamExt;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let (tx, rx) = futures::channel::mpsc::unbounded();
let stream = rx.into_fluxion_stream();
// Calculate running sum
let mut running_sum = stream.scan_ordered::<Sequenced<i32>, _, _>(0, |acc, val| {
*acc += val;
*acc
});
tx.send(Sequenced::with_sequence(10, 1)).unwrap();
tx.send(Sequenced::with_sequence(20, 2)).unwrap();
tx.send(Sequenced::with_sequence(30, 3)).unwrap();
// First sum: 0 + 10 = 10
let first = running_sum.next().await.unwrap().unwrap();
assert_eq!(first.value, 10);
// Second sum: 10 + 20 = 30
let second = running_sum.next().await.unwrap().unwrap();
assert_eq!(second.value, 30);
// Third sum: 30 + 30 = 60
let third = running_sum.next().await.unwrap().unwrap();
assert_eq!(third.value, 60);
Ok(())
}
use fluxion_stream::{IntoFluxionStream, CombineWithPreviousExt};
use fluxion_test_utils::Sequenced;
use futures::StreamExt;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let (tx, rx) = futures::channel::mpsc::unbounded();
let stream = rx.into_fluxion_stream();
let mut pairs = stream.combine_with_previous();
tx.send(Sequenced::with_sequence(10, 1)).unwrap();
tx.send(Sequenced::with_sequence(15, 2)).unwrap();
tx.send(Sequenced::with_sequence(15, 3)).unwrap();
// First item - no previous
let first = pairs.next().await.unwrap().unwrap();
assert_eq!(first.get().current, 10);
assert_eq!(first.get().previous, None);
// Second item - has previous
let second = pairs.next().await.unwrap().unwrap();
let (prev, curr) = second.get().as_pair();
assert_eq!(prev, Some(&10));
assert_eq!(curr, &15);
// Third item - detect no change
let third = pairs.next().await.unwrap().unwrap();
let (prev, curr) = third.get().as_pair();
if prev == Some(curr) {
println!("Value unchanged: {}", curr);
}
Ok(())
}
Run all tests:
cargo test
Run specific operator tests:
cargo test --test combine_latest_tests
cargo test --test ordered_merge_tests
cargo test --test emit_when_tests
Run with error tests:
cargo test combine_latest_error_tests
The crate includes comprehensive test coverage for:
Licensed under the Apache License, Version 2.0. See LICENSE for details.