| Crates.io | fluxion-test-utils |
| lib.rs | fluxion-test-utils |
| version | 0.8.0 |
| created_at | 2025-11-16 07:59:17.21623+00 |
| updated_at | 2026-01-13 17:44:43.400266+00 |
| description | Test utilities and infrastructure for fluxion workspace |
| homepage | |
| repository | https://github.com/umbgtt10/fluxion |
| max_upload_size | |
| id | 1935317 |
| size | 63,741 |
Part of Fluxion - A reactive stream processing library for Rust
Shared test utilities and fixtures used across the fluxion workspace. This crate provides helpers for creating sequenced streams with deterministic timestamps, reusable test data, and common assertions to simplify unit and integration tests.
Sequenced<T> - Counter-based timestamp wrapper for deterministic testingPerson, Animal, Plant typesTestData Enum - Unified enum for diverse test scenariosErrorInjectingStream for testing error handling| Utility | Purpose | Use Case |
|---|---|---|
Sequenced<T> |
Counter-based timestamps (u64) | Deterministic ordering in tests |
Sequenced::new(value) |
Auto-assigned sequence number | Quick test values |
Sequenced::with_timestamp(value, ts) |
Explicit timestamp | Control ordering precisely |
test_channel() |
Create test channel | Simplified test setup |
unwrap_stream() |
Extract values from StreamItem | Clean test assertions |
assert_no_element_emitted() |
Verify stream silence | Timeout testing |
Unlike InstantTimestamped<T> in fluxion-stream-time (which uses real monotonic time), Sequenced<T> provides:
tokio::time::sleep() neededPerfect for unit tests, integration tests, and CI environments.
use fluxion_test_utils::Sequenced;
use fluxion_stream::{FluxionStream, IntoFluxionStream};
#[tokio::test]
async fn example() {
let (tx, rx) = futures::channel::mpsc::unbounded();
let stream = rx.into_fluxion_stream();
// Auto-assigned sequence numbers (0, 1, 2, ...)
tx.send(Sequenced::new(42)).unwrap();
tx.send(Sequenced::new(100)).unwrap();
// Explicit timestamps for precise control
tx.send(Sequenced::with_timestamp(200, 5)).unwrap();
}
use fluxion_test_utils::test_data::{person_alice, person_bob, animal_cat};
#[test]
fn test_with_fixtures() {
let alice = person_alice(); // Person { id: 1, name: "Alice" }
let bob = person_bob(); // Person { id: 2, name: "Bob" }
let cat = animal_cat(); // Animal { id: 1, species: "Cat" }
assert_eq!(alice.name, "Alice");
}
use fluxion_test_utils::{test_channel, unwrap_stream};
use fluxion_stream::FluxionStream;
#[tokio::test]
async fn test_with_helpers() {
// Simplified channel creation
let (tx, stream) = test_channel::<Sequenced<i32>>();
tx.send(Sequenced::new(42)).unwrap();
// Extract values easily
let values = unwrap_stream(stream, 1).await;
assert_eq!(values, vec![42]);
}
use fluxion_test_utils::ErrorInjectingStream;
use fluxion_core::{StreamItem, FluxionError};
use futures::stream;
#[tokio::test]
async fn test_error_handling() {
let base = stream::iter(vec![
StreamItem::Value(1),
StreamItem::Value(2),
]);
// Inject error at position 1
let stream_with_error = ErrorInjectingStream::new(
base,
1,
FluxionError::custom("Test error")
);
// Test your error handling logic
}
| Feature | Sequenced<T> (this crate) |
InstantTimestamped<T> (stream-time) |
|---|---|---|
| Timestamp Type | u64 (counter) |
std::time::Instant |
| Purpose | Testing, simulation | Production, real time |
| Deterministic | ✅ Always | ❌ Monotonic time dependent |
| Time Operators | ❌ No | ✅ Yes (delay, debounce, etc.) |
| Core Operators | ✅ Yes (all) | ✅ Yes (all) |
| Speed | ⚡ Instant | 🐌 Real delays |
| Dependencies | None | std, tokio::time |
| Best For | Unit/integration tests | Production systems |
impl<T> Sequenced<T> {
// Create with auto-assigned sequence
pub fn new(value: T) -> Self;
// Create with explicit timestamp
pub fn with_timestamp(value: T, timestamp: u64) -> Self;
// Extract inner value
pub fn into_inner(self) -> T;
// Access inner value (via public field)
pub value: T;
}
Pre-defined test data available in test_data module:
// People
person_alice() // Person { id: 1, name: "Alice" }
person_bob() // Person { id: 2, name: "Bob" }
person_carol() // Person { id: 3, name: "Carol" }
// Animals
animal_cat() // Animal { id: 1, species: "Cat" }
animal_dog() // Animal { id: 2, species: "Dog" }
// Plants
plant_rose() // Plant { id: 1, name: "Rose" }
plant_tulip() // Plant { id: 2, name: "Tulip" }
// Create test channel with FluxionStream
pub fn test_channel<T>() -> (UnboundedSender<T>, FluxionStream<...>);
// Extract values from stream
pub async fn unwrap_stream<T>(stream: S, count: usize) -> Vec<T>;
// Extract single value
pub async fn unwrap_value<T>(stream: S) -> T;
// Assert no emission within timeout
pub async fn assert_no_element_emitted<S>(stream: &mut S, timeout_ms: u64);
// Assert stream has ended
pub async fn assert_stream_ended<S>(stream: &mut S);
cargo test --package fluxion-test-utils --all-features --all-targets
InstantTimestamped<T>Licensed under the Apache License, Version 2.0. See LICENSE for details.