| Crates.io | fluxion-exec |
| lib.rs | fluxion-exec |
| version | 0.8.0 |
| created_at | 2025-11-16 08:08:57.444243+00 |
| updated_at | 2026-01-13 17:44:55.530709+00 |
| description | Async stream subscribers and execution utilities for fluxion |
| homepage | |
| repository | https://github.com/umbgtt10/fluxion |
| max_upload_size | |
| id | 1935322 |
| size | 135,363 |
Part of Fluxion - A reactive stream processing library for Rust
Async stream execution utilities that enable processing streams with async handlers, providing fine-grained control over concurrency, cancellation, and error handling.
fluxion-exec provides two powerful execution patterns for consuming async streams:
subscribe - Sequential processing where every item is processed to completionsubscribe_latest - Latest-value processing with automatic cancellation of outdated workThese utilities solve the common problem of how to process stream items with async functions while controlling concurrency, managing cancellation, and handling errors gracefully.
✨ Two Execution Modes
🎯 Flexible Error Handling
🚀 Async-First Design
⚡ Cancellation Support
CancellationToken integrationsubscribe_latest)🔧 Extension Trait Pattern
Stream implementationAdd to your Cargo.toml:
[dependencies]
fluxion-exec = "0.5"
tokio = { version = "1.48", features = ["rt", "sync", "macros"] }
tokio-stream = "0.1"
futures = "0.3"
The following sections contain a wide range of examples and suggestions. These are indicative and should not be expected to compile as they are. Check the following files for genuine runnable examples that can be used as they are:
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (tx, rx) = unbounded_channel();
let stream = UnboundedReceiverStream::new(rx);
// Spawn processor
tokio::spawn(async move {
stream
.subscribe(
|item, _ctx| async move {
println!("Processing: {}", item);
// Simulate async work
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
Ok::<_, std::io::Error>(())
},
|_| {}, // No error callback
None // No cancellation token
)
.await
});
// Send items
tx.send(1)?;
tx.send(2)?;
tx.send(3)?;
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (tx, rx) = unbounded_channel();
let stream = UnboundedReceiverStream::new(rx);
tokio::spawn(async move {
stream
.subscribe_latest(
|item, token| async move {
// Check cancellation periodically
for i in 0..10 {
if token.is_cancelled() {
println!("Cancelled processing {}", item);
return Ok(());
}
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
}
println!("Completed: {}", item);
Ok::<_, std::io::Error>(())
},
None,
None
)
.await
});
// Rapidly send items - intermediate ones will be skipped
tx.send(1)?;
tx.send(2)?;
tx.send(3)?;
tx.send(4)?;
Ok(())
}
A subscription attaches an async handler to a stream and processes items until the stream ends or a cancellation token is triggered.
With subscribe, items are processed one at a time. Each item's handler must complete before the next item is processed. This guarantees:
With subscribe_latest, only the most recent value is processed. When new items arrive during processing:
This is ideal for scenarios where:
Process every item in order with async handlers.
stream.subscribe(
|item, cancellation_token| async move {
// Your async processing logic
process_item(item).await?;
Ok::<_, MyError>(())
},
|error| eprintln!("Error: {:?}", error), // Error callback
Some(cancellation_token) // Optional cancellation
).await?;
When to use:
Examples:
Process only the latest value, canceling work for outdated items.
stream.subscribe_latest(
|item, cancellation_token| async move {
// Check cancellation periodically in long-running tasks
for chunk in work_chunks {
if cancellation_token.is_cancelled() {
return Ok(()); // Gracefully exit
}
process_chunk(chunk).await?;
}
Ok::<_, MyError>(())
},
Some(|error| eprintln!("Error: {:?}", error)),
Some(cancellation_token)
).await?;
When to use:
Examples:
Process every event sequentially and persist to database:
#[derive(Debug, Serialize, Deserialize)]
struct Event {
id: u64,
data: String,
}
async fn save_to_db(event: &Event) -> Result<(), Box<dyn std::error::Error>> {
// Database operation
println!("Saving event {} to database", event.id);
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx);
let handle = tokio::spawn(async move {
stream
.subscribe(
|event, _| async move {
save_to_db(&event).await?;
Ok::<_, Box<dyn std::error::Error>>(())
},
|err| eprintln!("Failed to save event: {}", err),
None
)
.await
});
// Generate events
for i in 0..10 {
tx.send(Event { id: i, data: format!("Event {}", i) })?;
}
drop(tx);
handle.await??;
Ok(())
}
Cancel outdated searches when new queries arrive:
async fn search_api(query: &str) -> Result<Vec<String>, Box<dyn std::error::Error>> {
println!("Searching for: {}", query);
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
Ok(vec![format!("Result for {}", query)])
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (tx, rx) = futures::channel::mpsc::unbounded();
let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx);
let handle = tokio::spawn(async move {
stream
.subscribe_latest(
|query: String, token| async move {
if token.is_cancelled() {
println!("Query '{}' cancelled", query);
return Ok(());
}
let results = search_api(&query).await?;
if !token.is_cancelled() {
println!("Results for '{}': {:?}", query, results);
}
Ok::<_, Box<dyn std::error::Error>>(())
},
None,
None
)
.await
});
// User types rapidly
tx.send("r".to_string())?;
tx.send("ru".to_string())?;
tx.send("rus".to_string())?;
tx.send("rust".to_string())?; // Only this search completes
drop(tx);
handle.await??;
Ok(())
}
#[derive(Debug)]
struct ProcessingError(String);
impl core::fmt::Display for ProcessingError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "ProcessingError: {}", self.0)
}
}
impl std::error::Error for ProcessingError {}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (tx, rx) = futures::channel::mpsc::unbounded();
let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx);
let error_count = Arc::new(Mutex::new(0));
let error_count_clone = error_count.clone();
let handle = tokio::spawn(async move {
stream
.subscribe(
|item: i32, _| async move {
if item % 3 == 0 {
return Err(ProcessingError(format!("Cannot process {}", item)));
}
println!("Processed: {}", item);
Ok(())
},
move |err| {
eprintln!("Error occurred: {}", err);
let count = error_count_clone.clone();
tokio::spawn(async move {
*count.lock().await += 1;
});
},
None
)
.await
});
for i in 1..=10 {
tx.send(i)?;
}
drop(tx);
handle.await??;
println!("Total errors: {}", *error_count.lock().await);
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (tx, rx) = futures::channel::mpsc::unbounded();
let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx);
let cancel_token = CancellationToken::new();
let cancel_clone = cancel_token.clone();
// Spawn shutdown handler
tokio::spawn(async move {
tokio::signal::ctrl_c().await.ok();
println!("Shutting down gracefully...");
cancel_clone.cancel();
});
let handle = tokio::spawn(async move {
stream
.subscribe(
|item: i32, token| async move {
if token.is_cancelled() {
println!("Processing cancelled for item {}", item);
return Ok(());
}
println!("Processing: {}", item);
tokio::time::sleep(Duration::from_millis(100)).await;
Ok::<_, std::io::Error>(())
},
|_| {},
Some(cancel_token)
)
.await
});
for i in 0..100 {
if tx.send(i).is_err() {
break;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
handle.await??;
Ok(())
}
subscribe)| Use Case | Description |
|---|---|
| Event Logging | Write every event to logs/database |
| Transaction Processing | Process financial transactions in order |
| Message Queue Consumer | Consume and acknowledge every message |
| Audit Trail | Maintain complete audit history |
| Batch ETL | Extract, transform, load data sequentially |
| Notification Service | Send every notification |
| File Processing | Process every file in a directory |
subscribe_latest)| Use Case | Description |
|---|---|
| UI Rendering | Render only the latest application state |
| Auto-Save | Save current document (skip intermediate edits) |
| Live Preview | Update preview with latest content |
| Search Suggestions | Show results for latest query only |
| Real-time Dashboard | Display current metrics |
| Configuration Reload | Apply latest config changes |
| Debounced API Calls | Call API with latest parameters |
subscribe)Best for: Correctness and completeness over speed
subscribe_latest)Best for: Responsiveness and efficiency over completeness
Errors are passed to the callback, processing continues:
stream.subscribe(
|item, _| async move {
risky_operation(item).await?;
Ok::<_, MyError>(())
},
|error| {
eprintln!("Error: {}", error);
// Log to monitoring service
// Increment error counter
// etc.
},
None
).await?; // Returns Ok(()) even if individual items failed
If you want to ignore errors and continue processing:
use fluxion_exec::ignore_errors;
stream.subscribe(handler, ignore_errors, None).await?;
// All items processed, errors are silently ignored
Return error immediately to stop processing:
stream.subscribe(
|item, _| async move {
critical_operation(item).await?;
Ok::<_, CriticalError>(())
},
None,
None
).await?; // Stops on first error
let cancel_token = CancellationToken::new();
let cancel_clone = cancel_token.clone();
// Start processing
let handle = tokio::spawn(async move {
stream.subscribe(
|item, token| async move {
if token.is_cancelled() {
return Ok(()); // Exit early
}
process(item).await
},
Some(cancel_token),
None
).await
});
// Cancel from another task
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(10)).await;
cancel_clone.cancel();
});
subscribe_latestThe cancellation token passed to handlers is automatically cancelled when newer items arrive:
stream.subscribe_latest(
|item, token| async move {
for i in 0..100 {
if token.is_cancelled() {
println!("Item {} cancelled", item);
return Ok(());
}
// Do work...
}
Ok(())
},
None,
None
).await?;
stream.subscribe(
|item, _| async move {
let mut attempts = 0;
loop {
match process_with_retry(&item).await {
Ok(()) => return Ok(()),
Err(e) if attempts < 3 => {
attempts += 1;
eprintln!("Retry {} for item {:?}: {}", attempts, item, e);
tokio::time::sleep(Duration::from_millis(100 * attempts)).await;
}
Err(e) => return Err(e),
}
}
},
|_| {},
None
).await?;
let last_process = Arc::new(Mutex::new(Instant::now()));
stream.subscribe(
move |item, _| {
let last = last_process.clone();
async move {
let mut last_instant = last.lock().await;
let elapsed = last_instant.elapsed();
if elapsed < Duration::from_millis(100) {
tokio::time::sleep(Duration::from_millis(100) - elapsed).await;
}
*last_instant = Instant::now();
drop(last_instant);
process(item).await
}
},
|_| {},
None
).await?;
stream
.chunks(100) // Batch 100 items
.subscribe(
|batch, _| async move {
process_batch(&batch).await?;
Ok::<_, MyError>(())
},
|_| {},
None
)
.await?;
stream
.filter(|item| futures::future::ready(item.is_important()))
.subscribe(
|item, _| async move {
process_important(item).await
},
|_| {},
None
)
.await?;
subscribe_latest for Critical Work// BAD: Payment processing might be skipped!
payment_stream.subscribe_latest(
|payment, _| async move {
process_payment(payment).await // Could be cancelled!
},
None,
None
).await?;
✅ Good: Use subscribe for work that must complete:
payment_stream.subscribe(
|payment, _| async move {
process_payment(payment).await // Every payment processed
},
|_| {},
None
).await?;
// BAD: Blocking operations stall the executor
stream.subscribe(
|item, _| async move {
std::thread::sleep(Duration::from_secs(1)); // Blocks executor!
Ok(())
},
None,
None
).await?;
✅ Good: Use async operations:
stream.subscribe(
|item, _| async move {
tokio::time::sleep(Duration::from_secs(1)).await; // Non-blocking
Ok(())
},
|_| {},
None
).await?;
// BAD: CPU work blocks async tasks
stream.subscribe(
|data, _| async move {
expensive_computation(data); // Blocks!
Ok(())
},
|_| {},
None
).await?;
✅ Good: Use spawn_blocking for CPU work:
stream.subscribe(
|data, _| async move {
tokio::task::spawn_blocking(move || {
expensive_computation(data)
}).await??;
Ok::<_, Box<dyn std::error::Error>>(())
},
|_| {},
None
).await?;
// BAD: Long-running work that can't be cancelled
stream.subscribe_latest(
|item, _token| async move { // Token ignored!
for i in 0..1000000 {
expensive_step(i).await;
}
Ok(())
},
None,
None
).await?;
✅ Good: Check cancellation periodically:
stream.subscribe_latest(
|item, token| async move {
for i in 0..1000000 {
if token.is_cancelled() {
return Ok(()); // Exit early
}
expensive_step(i).await;
}
Ok(())
},
None,
None
).await?;
futures::StreamExt::for_each| Feature | subscribe |
for_each |
|---|---|---|
| Execution | Spawns tasks | Inline execution |
| Cancellation | Built-in token support | Manual |
| Error handling | Callbacks + collection | Returns first error |
| Concurrency | Configurable | Sequential only |
futures::StreamExt::buffer_unordered| Feature | subscribe |
buffer_unordered |
|---|---|---|
| Ordering | Sequential | Unordered |
| Concurrency | One at a time | N concurrent |
| Backpressure | Automatic | Manual (buffer size) |
| Use case | Sequential processing | Parallel processing |
| Feature | subscribe_latest |
Manual spawning |
|---|---|---|
| Cancellation | Automatic | Manual |
| Latest-value | Built-in | Manual tracking |
| Error handling | Integrated | Manual |
| Cleanup | Automatic | Manual |
Symptoms: Stream processing hangs indefinitely
Causes:
Solutions:
// Add timeout
use tokio::time::{timeout, Duration};
let result = timeout(
Duration::from_secs(30),
stream.subscribe(handler, None, None)
).await??;
// Use cancellation token
let cancel = CancellationToken::new();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(30)).await;
cancel.cancel();
});
Symptoms: Memory grows unbounded during processing
Causes:
Solutions:
// Add backpressure with bounded channels
let (tx, rx) = futures::channel::mpsc::channel(100); // Bounded
// Use subscribe_latest to skip items
stream.subscribe_latest(handler, None, None).await?;
// Process in batches and clear
stream.chunks(100).subscribe(
|mut batch, _| async move {
process(&batch).await?;
batch.clear(); // Free memory
Ok(())
},
None,
None
).await?;
Symptoms: Errors occur but are silently ignored
Cause: Error callback provided, but errors not handled
Solution:
let errors = Arc::new(Mutex::new(Vec::new()));
let errors_clone = errors.clone();
stream.subscribe(
handler,
None,
Some(move |err| {
let errors = errors_clone.clone();
tokio::spawn(async move {
errors.lock().await.push(err.to_string());
});
})
).await?;
// Check errors after processing
let all_errors = errors.lock().await;
if !all_errors.is_empty() {
eprintln!("Errors occurred: {:?}", all_errors);
}
Symptoms: Items appear in unexpected order
Cause: Using concurrent processing patterns
Solution: Use subscribe (strictly sequential) instead of buffer_unordered or parallel patterns
See the full API documentation for detailed type signatures and additional examples.
SubscribeExt - Sequential processingSubscribeLatestExt - Latest-value processingfluxion-stream - Stream composition operatorsfluxion-core - Core types and errorsLicensed under the Apache License, Version 2.0. See LICENSE for details.
Copyright 2025 Umberto Gotti