| Crates.io | fluxion-core |
| lib.rs | fluxion-core |
| version | 0.8.0 |
| created_at | 2025-11-16 07:58:47.23036+00 |
| updated_at | 2026-01-13 17:44:21.488876+00 |
| description | Core traits and types for ordered stream processing |
| homepage | |
| repository | https://github.com/umbgtt10/fluxion |
| max_upload_size | |
| id | 1935316 |
| size | 143,137 |
Part of Fluxion - A reactive stream processing library for Rust
Core traits and types for ordered stream processing in async Rust.
This crate provides the foundational abstractions used throughout the Fluxion ecosystem:
Timestamped trait: Temporal ordering for stream items via timestampsStreamItem<T>: Error-aware stream item wrapper (Value | Error)FluxionSubject<T>: Hot, multi-subscriber broadcast subjectFluxionError: Unified error type for stream operationsFluxion-core provides two traits for temporal ordering:
Minimal trait for types that expose a timestamp value:
pub trait HasTimestamp {
type Timestamp: Ord + Copy + Send + Sync + core::fmt::Debug;
fn timestamp(&self) -> Self::Timestamp; // Get timestamp for ordering
}
Use this when your type only needs to provide a timestamp for ordering (most common case).
Extends HasTimestamp with an Inner type and construction methods for wrapper types:
pub trait Timestamped: HasTimestamp {
type Inner: Clone;
fn with_timestamp(value: Self::Inner, timestamp: Self::Timestamp) -> Self;
fn into_inner(self) -> Self::Inner;
}
Use this for wrapper types like Sequenced<T> that wrap an inner value with a timestamp.
A hot, multi-subscriber broadcast subject for reactive programming patterns:
use fluxion_core::{FluxionSubject, StreamItem};
use futures::StreamExt;
#[tokio::main]
async fn main() {
let subject = FluxionSubject::new();
// Subscribe before sending - hot subject, no replay
let mut stream1 = subject.subscribe();
let mut stream2 = subject.subscribe();
// Send to all subscribers
subject.send(StreamItem::Value(42)).unwrap();
// Both subscribers receive the value
assert_eq!(stream1.next().await.unwrap().unwrap(), 42);
assert_eq!(stream2.next().await.unwrap().unwrap(), 42);
}
Key Characteristics:
Arc<Mutex<>> internally - cheap to clone, safe to send across threadssend() (no memory leaks)Subject Lifecycle:
let subject = FluxionSubject::new();
// Clone shares the same subject state
let subject_clone = subject.clone();
// Subscribe on any clone
let stream = subject_clone.subscribe();
// Send on any clone - all subscribers receive it
subject.send(StreamItem::Value(1)).unwrap();
// Error terminates all subscribers
subject.error(FluxionError::stream_error("failed"));
// Explicit close completes all subscribers
subject.close();
Thread Safety:
let subject = FluxionSubject::new();
// Safe to share across async tasks
tokio::spawn({
let subject = subject.clone();
async move {
subject.send(StreamItem::Value(1)).unwrap();
}
});
tokio::spawn({
let subject = subject.clone();
async move {
subject.subscribe();
}
});
Common Patterns:
When to Use:
When NOT to Use:
Error-aware wrapper for stream values:
pub enum StreamItem<T> {
Value(T),
Error(FluxionError),
}
Enables error propagation through operator chains without terminating the stream. See the Error Handling Guide for details.
The subject's design requires interior mutability with shared ownership:
Operations requiring mutation:
send() - broadcasts to all subscribers AND removes dead onessubscribe() - adds new subscriber to the listclose() - sets closed flag and clears subscribersWhy Arc:
Clone - multiple handles can existWhy Mutex:
Vecsend()/subscribe()/close()Alternative considered: &mut self methods would prevent cloning and multi-task sharing - defeats the purpose of a broadcast subject.
Subscribers are automatically cleaned up during send():
// For each send, remove disconnected subscribers
for tx in state.senders.drain(..) {
if tx.unbounded_send(item.clone()).is_ok() {
next_senders.push(tx); // Keep alive
}
// Dead subscribers dropped here
}
This prevents memory leaks when subscribers drop their streams without explicitly unsubscribing.
Add this to your Cargo.toml:
[dependencies]
fluxion-core = "0.8.0"
Apache-2.0