| Crates.io | fluxion-stream-time |
| lib.rs | fluxion-stream-time |
| version | 0.8.0 |
| created_at | 2025-12-04 14:47:58.19119+00 |
| updated_at | 2026-01-13 17:45:37.655944+00 |
| description | Time-based operators extending fluxion-stream with monotonic timestamps |
| homepage | |
| repository | https://github.com/umbgtt10/fluxion |
| max_upload_size | |
| id | 1966558 |
| size | 319,965 |
Runtime-agnostic time-based operators for fluxion-stream.
This crate provides specialized time-based operators (delay, debounce, throttle, sample, timeout) that work with any async runtime through the Timer trait abstraction. The InstantTimestamped<T, TM> wrapper provides timestamping with the runtime's monotonic instant type.
fluxion-stream-time supports multiple async runtimes through feature flags:
runtime-tokio (default) - Tokio runtime with TokioTimerruntime-smol - smol runtime with SmolTimerruntime-wasm - WebAssembly with WasmTimer (Node.js and browser)runtime-async-std - async-std runtime ⚠️ DEPRECATED (unmaintained)runtime-embassy - Embassy for embedded/no_std + alloc (requires manual timer implementation)All operators are fully runtime-agnostic thanks to the Timer trait abstraction.
⚠️ Note: async-std has been discontinued (RUSTSEC-2025-0052, Aug 2024). The implementation is kept for compatibility with existing projects only. New projects should use tokio or smol runtimes instead.
Fluxion's core design is timestamp-agnostic: operators in fluxion-stream work with any type implementing the HasTimestamp trait, regardless of the underlying timestamp representation (u64 counters, DateTime, custom types, etc.). This flexibility is a core strength.
However, time-based operators like delay, debounce, throttle, and timeout inherently need to:
The Timer trait abstraction solves all three requirements, enabling operators that work with any runtime while maintaining zero-cost performance.
1. Counter-Based (fluxion-test-utils)
Sequenced<T> with u64 timestampsfluxion-stream2. Real-Time Based (fluxion-stream-time)
InstantTimestamped<T, TM: Timer> with runtime's Instant typedelay, debounce, throttle, sample, timeout)Keeping fluxion-stream timestamp-agnostic means:
Timer trait - Runtime-agnostic timer abstraction with sleep_future() and now()TokioTimer - Zero-cost Tokio implementation (when time-tokio enabled)InstantTimestamped<T, TM> - Generic wrapper with timer's Instant typeTokioTimestamped<T> - Type alias for InstantTimestamped<T, TokioTimer>All time operators provide two variants:
1. Convenience Methods (.delay(), .debounce(), etc.)
std runtime features (tokio, smol, async-std, wasm)2. Explicit Timer Methods (_with_timer suffix)
no_std environments (Embassy)Operator List:
delay(duration) / delay_with_timer(duration, timer) - Delays each emission by a specified durationdebounce(duration) / debounce_with_timer(duration, timer) - Emits values only after a quiet periodthrottle(duration) / throttle_with_timer(duration, timer) - Emits a value and then ignores subsequent values for a durationsample(duration) / sample_with_timer(duration, timer) - Emits the most recent value within periodic time intervalstimeout(duration) / timeout_with_timer(duration, timer) - Errors if no emission within duration| Operator | Purpose | Behavior | Use Case |
|---|---|---|---|
delay |
Time-shift emissions | Delays each item by duration, errors pass through | Artificial delays, scheduling |
debounce |
Trailing debounce | Emits after quiet period, resets on new value | Search input, button debouncing |
throttle |
Leading throttle | Emits first, ignores subsequent for duration | Rate limiting, scroll/resize handlers |
sample |
Periodic sampling | Emits latest value at intervals | Downsampling high-frequency streams |
timeout |
Watchdog timer | Errors if no emission within duration | Network reliability, health checks |
delayDelays each emission by a specified duration
use fluxion_stream_time::prelude::*;
// Convenience method (automatically uses default timer for your runtime)
let delayed = stream.delay(Duration::from_millis(100));
// Or use explicit timer when you need custom control
let timer = TokioTimer;
let delayed = stream.delay_with_timer(Duration::from_millis(100), timer);
debounceEmits only after a period of inactivity (trailing)
use fluxion_stream_time::prelude::*;
// Convenience method (automatically uses default timer for your runtime)
let debounced = stream.debounce(Duration::from_millis(500));
// Or use explicit timer when you need custom control
let timer = TokioTimer;
let debounced = stream.debounce_with_timer(Duration::from_millis(500), timer);
throttleRate-limits emissions (leading)
use fluxion_stream_time::prelude::*;
// Convenience method (automatically uses default timer for your runtime)
let throttled = stream.throttle(Duration::from_millis(100));
// Or use explicit timer when you need custom control
let timer = TokioTimer;
let throttled = stream.throttle_with_timer(Duration::from_millis(100), timer);
sampleSamples stream at periodic intervals
use fluxion_stream_time::prelude::*;
// Convenience method (automatically uses default timer for your runtime)
let sampled = stream.sample(Duration::from_millis(100));
// Or use explicit timer when you need custom control
let timer = TokioTimer;
let sampled = stream.sample_with_timer(Duration::from_millis(100), timer);
timeoutErrors if no emission within duration
use fluxion_stream_time::prelude::*;
// Convenience method (automatically uses default timer for your runtime)
let with_timeout = stream.timeout(Duration::from_secs(30));
// Or use explicit timer when you need custom control
let timer = TokioTimer;
let with_timeout = stream.timeout_with_timer(Duration::from_secs(30), timer);
FluxionError::TimeoutError("Timeout") if exceededuse fluxion_stream_time::prelude::*;
use fluxion_stream_time::{TokioTimer, TokioTimestamped};
use fluxion_stream_time::timer::Timer;
use fluxion_core::StreamItem;
use futures::stream::StreamExt;
use std::time::Duration;
use futures::channel::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
#[tokio::main]
async fn main() {
let (tx, rx) = mpsc::unbounded();
let timer = TokioTimer;
// Create timestamped stream with runtime-aware delays - convenience methods!
let stream = UnboundedReceiverStream::new(rx)
.map(StreamItem::Value)
.debounce(Duration::from_millis(100)) // No timer parameter needed
.throttle(Duration::from_millis(200)); // Automatically uses TokioTimer
// Send timestamped data
tx.send(TokioTimestamped::new(42, timer.now())).unwrap();
tx.send(TokioTimestamped::new(100, timer.now())).unwrap();
}
Key insight: Operators from both crates chain seamlessly because they all work with streams where S: Stream<Item = StreamItem<T>> and T: HasTimestamp.
The only requirement is that your stream items implement HasTimestamp with a compatible timestamp type:
map_ordered, filter_ordered, combine_latest, etc.) work with any timestamp typedelay, debounce, etc.) work with InstantTimestamped<T, TM: Timer> typesuse fluxion_stream::{IntoFluxionStream, FilterOrderedExt, MapOrderedExt, DistinctUntilChangedExt};
use fluxion_stream_time::prelude::*; // Gets convenience methods
use fluxion_stream_time::{TokioTimer, TokioTimestamped};
use std::time::Duration;
// Start with time-based stream - convenience methods!
let stream = source_stream
// Time operator (no timer parameter needed!)
.debounce(Duration::from_millis(100))
// Core operators work seamlessly
.filter_ordered(|item| *item > 50)
.map_ordered(|item| item * 2)
// Back to time operator
.delay(Duration::from_millis(50))
// More core operators
.distinct_until_changed();
This works because:
debounce returns impl Stream<Item = StreamItem<TokioTimestamped<T>>>filter_ordered accept any stream where items implement HasTimestampInstant) is preserved through the chaindelay can then use it again because the type is still InstantTimestamped<T, TM>Use fluxion-stream (core operators) when:
Sequenced<T>)Use fluxion-stream-time (time operators) when:
delay, debounce, etc.)Use both together when:
use fluxion_stream_time::prelude::*; // Convenience methods
use fluxion_stream_time::{TokioTimer, TokioTimestamped};
use fluxion_stream_time::timer::Timer;
use std::time::Duration;
let timer = TokioTimer;
// Delay all emissions by 100ms (convenience method)
let delayed_stream = source_stream
.delay(Duration::from_millis(100));
// Debounce emissions (convenience method)
let debounced_stream = source_stream
.debounce(Duration::from_millis(100));
// For explicit timer control, use *_with_timer methods:
let delayed_custom = source_stream
.delay_with_timer(Duration::from_millis(100), timer.clone());
// Create timestamped values
let timestamped = TokioTimestamped::new(my_value, timer.now());
⚠️ WARNING: async-std has been discontinued and is unmaintained (RUSTSEC-2025-0052). This implementation is kept for compatibility only. New projects should use tokio or smol.
use fluxion_stream_time::prelude::*; // Convenience methods
use fluxion_stream_time::runtimes::AsyncStdTimer;
use fluxion_stream_time::{InstantTimestamped};
use fluxion_stream_time::timer::Timer;
use std::time::Duration;
let timer = AsyncStdTimer;
// Delay all emissions by 100ms (convenience method)
let delayed_stream = source_stream
.delay(Duration::from_millis(100));
// Debounce emissions (convenience method)
let debounced_stream = source_stream
.debounce(Duration::from_millis(100));
// For explicit timer control, use *_with_timer methods:
let delayed_custom = source_stream
.delay_with_timer(Duration::from_millis(100), timer.clone());
// Create timestamped values
let timestamped = InstantTimestamped::new(my_value, timer.now());
async-std Notes:
async-io::Timer for async sleep operationsasync_core::task::spawn for true concurrencycargo test --features runtime-async-std --no-default-featuresuse fluxion_stream_time::prelude::*; // Convenience methods
use fluxion_stream_time::runtimes::SmolTimer;
use fluxion_stream_time::{SmolTimestamped};
use fluxion_stream_time::timer::Timer;
use std::time::Duration;
let timer = SmolTimer;
// Delay all emissions by 100ms (convenience method)
let delayed_stream = source_stream
.delay(Duration::from_millis(100));
// Debounce emissions (convenience method)
let debounced_stream = source_stream
.debounce(Duration::from_millis(100));
// For explicit timer control, use *_with_timer methods:
let delayed_custom = source_stream
.delay_with_timer(Duration::from_millis(100), timer.clone());
// Create timestamped values
let timestamped = SmolTimestamped::new(my_value, timer.now());
smol Notes:
async-io for timer implementation (shared with async-std)SmolTimer based on async_io::Timer for sleep operationsstd::time::Instant for monotonic time trackingcargo test --features runtime-smol --no-default-featuressmol::block_on() (single-threaded) and smol::Executor (multi-threaded)use fluxion_stream_time::prelude::*; // Convenience methods
use fluxion_stream_time::runtimes::wasm_implementation::WasmTimer;
use fluxion_stream_time::{InstantTimestamped};
use fluxion_stream_time::timer::Timer;
use std::time::Duration;
let timer = WasmTimer::new();
// Delay all emissions by 100ms (convenience method)
let delayed_stream = source_stream
.delay(Duration::from_millis(100));
// Debounce emissions (convenience method)
let debounced_stream = source_stream
.debounce(Duration::from_millis(100));
// For explicit timer control, use *_with_timer methods:
let delayed_custom = source_stream
.delay_with_timer(Duration::from_millis(100), timer.clone());
// Create timestamped values
let timestamped = InstantTimestamped::new(my_value, timer.now());
WASM Notes:
gloo-timers for async sleep (compatible with Node.js and browsers)WasmInstant based on js-sys::Date.now() for monotonic timewasm-pack test --node or --headless --chrome⚠️ Note: Embassy runtime support is in compilation-only mode. Tests verify that time operators compile correctly with Embassy, but full runtime testing requires hardware or emulator setup.
Embassy support enables time-based operators in no_std + alloc embedded environments:
#![no_std]
extern crate alloc;
use fluxion_stream_time::prelude::*;
use fluxion_stream_time::runtimes::embassy_implementation::EmbassyTimer;
use fluxion_stream_time::timer::Timer;
use embassy_time::Duration;
let timer = EmbassyTimer;
// Time operators work the same way in embedded environments
// But you must use *_with_timer methods (no default timer in no_std)
let delayed_stream = source_stream
.delay_with_timer(Duration::from_millis(100), timer.clone());
let debounced_stream = source_stream
.debounce_with_timer(Duration::from_millis(500), timer.clone());
let throttled_stream = source_stream
.throttle_with_timer(Duration::from_millis(100), timer.clone());
Embassy Notes:
no_std + alloc environmentembassy_time::{Timer, Duration, Instant} for monotonic time*_with_timer methods (convenience methods unavailable in no_std)tests/embassy/ verify all 5 time operatorsWhy Compilation-Only Tests?
Supported Operators:
delay_with_timer - Delays emissionsdebounce_with_timer - Trailing debouncethrottle_with_timer - Leading throttlesample_with_timer - Periodic samplingtimeout_with_timer - Watchdog timer⚠️ CRITICAL: async-std is no longer maintained (discontinued Aug 2024, RUSTSEC-2025-0052).
This implementation is provided for compatibility with existing projects only. For new projects, use tokio (default) or consider smol as an alternative.
async-std support is fully implemented via AsyncStdTimer using async-io for time-based operations. The Timer trait abstraction enabled this with zero operator changes.
Implementation Details:
AsyncStdTimer - Zero-cost async-std implementation using async_io::TimerAsyncStdSleep - Future wrapper for async_io::Timer::after(duration)std::time::Instant - Standard monotonic instant typeTesting:
tests/async_std/single_threaded/ and tests/async_std/multi_threaded/async_core::task::sleep and external spawningcargo test --features runtime-async-std --no-default-features.ci/async_std_tests.ps1 for CI testing configurationPlatform Support:
async_core::task::spawn)Deprecation Timeline:
To add support for a custom runtime, implement the Timer trait:
use fluxion_stream_time::timer::Timer;
use std::time::{Duration, Instant};
#[derive(Clone, Debug)]
pub struct MyCustomTimer;
impl Timer for MyCustomTimer {
type Sleep = MyRuntimeSleep;
type Instant = Instant;
fn sleep_future(&self, duration: Duration) -> Self::Sleep {
my_runtime::sleep(duration)
}
fn now(&self) -> Self::Instant {
Instant::now()
}
}
| Feature | Sequenced<T> (test-utils) |
InstantTimestamped<T, TM> (stream-time) |
|---|---|---|
| Timestamp Type | u64 (counter) |
TM::Instant (runtime's instant) |
| Crate | fluxion-test-utils |
fluxion-stream-time |
| Use Case | Testing, simulation | Production, real time |
| Time Operators | ❌ No | ✅ Yes |
| Core Operators | ✅ Yes | ✅ Yes |
| Deterministic | ✅ Yes | ❌ No (monotonic) |
| Duration Math | ❌ No | ✅ Yes |
| Runtime Support | N/A | Tokio, smol, WASM, async-std (deprecated) |
⚠️ Note: async-std support is deprecated due to discontinuation (RUSTSEC-2025-0052).
The Timer trait abstraction makes no_std support architecturally feasible. The trait design deliberately avoids std-specific dependencies, enabling potential embedded system support.
core::future::Future and core::time::DurationTimer::Instantstd::time::Instant unavailable
Instant type based on hardware timersalloc requirement
Box::pin() for stream compositionAsync runtime dependency
Embassy Support (recommended for async embedded):
// Future: EmbassyTimer implementation
use embassy_time::{Duration, Instant, Timer as EmbassyDelay};
impl Timer for EmbassyTimer {
type Sleep = EmbassyDelay;
type Instant = Instant; // u64 tick counter
// ...
}
Bare Metal (custom hardware timers):
The Timer abstraction enables no_std support without forcing it:
Status: Architecturally sound, implementation work required. The generic design means no_std support won't break existing std code.
WASM support is fully implemented via WasmTimer using gloo-timers and js-sys. The Timer trait abstraction enabled this with zero operator changes.
Implementation Details:
WasmTimer - Zero-cost WASM implementation using gloo_timers::future::sleepWasmInstant - Custom instant type based on js-sys::Date.now() (returns milliseconds since epoch)Add<Duration>, Sub<Duration>, and Sub<Self> for duration calculationsTesting:
tests/wasm/single_threaded/gloo_timers::future::sleep (no time control like Tokio)wasm-pack test --node --features runtime-wasmPlatform Support:
time-tokio enabled by default)Licensed under the Apache License, Version 2.0. See LICENSE for details.