| Crates.io | streamweave-error |
| lib.rs | streamweave-error |
| version | 0.6.0 |
| created_at | 2025-12-27 23:00:06.120545+00 |
| updated_at | 2025-12-30 23:05:24.40609+00 |
| description | Error handling system for StreamWeave |
| homepage | https://github.com/Industrial/streamweave |
| repository | https://github.com/Industrial/streamweave/tree/main/packages/error |
| max_upload_size | |
| id | 2007988 |
| size | 61,364 |
Error handling system for StreamWeave
Comprehensive error handling with multiple strategies and rich error context.
The streamweave-error package provides a comprehensive error handling system for StreamWeave pipelines and graphs. It includes error strategies, error actions, rich error context, and component information for effective error handling and debugging.
Add this to your Cargo.toml:
[dependencies]
streamweave-error = "0.3.0"
use streamweave_error::{ErrorStrategy, ErrorAction, StreamError, ErrorContext, ComponentInfo};
// Configure error strategy
let strategy = ErrorStrategy::Skip; // Skip errors and continue
// Create error context
let context = ErrorContext {
timestamp: chrono::Utc::now(),
item: Some(42),
component_name: "MyTransformer".to_string(),
component_type: "Transformer".to_string(),
};
// Create component info
let component = ComponentInfo::new(
"my-transformer".to_string(),
"MyTransformer".to_string(),
);
// Create stream error
let error = StreamError::new(
Box::new(std::io::Error::other("Something went wrong")),
context,
component,
);
// Determine action based on strategy
let action = match strategy {
ErrorStrategy::Stop => ErrorAction::Stop,
ErrorStrategy::Skip => ErrorAction::Skip,
ErrorStrategy::Retry(n) if error.retries < n => ErrorAction::Retry,
_ => ErrorAction::Stop,
};
The ErrorStrategy enum defines how components should handle errors:
pub enum ErrorStrategy<T> {
Stop, // Stop processing on error
Skip, // Skip errors and continue
Retry(usize), // Retry up to N times
Custom(CustomHandler), // Custom error handling logic
}
Stop Strategy:
Skip Strategy:
Retry Strategy:
Custom Strategy:
The ErrorAction enum represents the action to take when an error occurs:
pub enum ErrorAction {
Stop, // Stop processing immediately
Skip, // Skip the item and continue
Retry, // Retry the operation
}
The StreamError type provides rich error information:
pub struct StreamError<T> {
pub source: Box<dyn Error + Send + Sync>, // Original error
pub context: ErrorContext<T>, // Error context
pub component: ComponentInfo, // Component info
pub retries: usize, // Retry count
}
The ErrorContext struct provides detailed information about when and where an error occurred:
pub struct ErrorContext<T> {
pub timestamp: chrono::DateTime<chrono::Utc>, // When error occurred
pub item: Option<T>, // Item being processed
pub component_name: String, // Component name
pub component_type: String, // Component type
}
The ComponentInfo struct identifies the component that encountered an error:
pub struct ComponentInfo {
pub name: String, // Component name
pub type_name: String, // Component type name
}
Stop processing on first error (default behavior):
use streamweave_error::ErrorStrategy;
let strategy = ErrorStrategy::<i32>::Stop;
// When an error occurs, processing stops immediately
// This ensures data integrity
Skip errors and continue processing:
use streamweave_error::ErrorStrategy;
let strategy = ErrorStrategy::<i32>::Skip;
// Invalid items are skipped, processing continues
// Useful for data cleaning pipelines
Retry failed operations:
use streamweave_error::ErrorStrategy;
let strategy = ErrorStrategy::<i32>::Retry(3);
// Retries up to 3 times before giving up
// Useful for transient failures like network timeouts
Implement custom error handling logic:
use streamweave_error::{ErrorStrategy, ErrorAction, StreamError};
let strategy = ErrorStrategy::<i32>::new_custom(|error: &StreamError<i32>| {
// Retry transient errors
if error.retries < 3 && is_transient(&error.source) {
ErrorAction::Retry
}
// Skip validation errors
else if is_validation_error(&error.source) {
ErrorAction::Skip
}
// Stop on critical errors
else {
ErrorAction::Stop
}
});
Create detailed error context for debugging:
use streamweave_error::{ErrorContext, ComponentInfo, StreamError};
// Create error context
let context = ErrorContext {
timestamp: chrono::Utc::now(),
item: Some(problematic_item),
component_name: "DataValidator".to_string(),
component_type: "Transformer".to_string(),
};
// Create component info
let component = ComponentInfo::new(
"validator".to_string(),
"DataValidator".to_string(),
);
// Create stream error
let error = StreamError::new(
Box::new(validation_error),
context,
component,
);
Handle errors at different levels:
use streamweave_error::{ErrorStrategy, StreamError, ErrorAction};
// Component-level error handling
fn handle_component_error(error: &StreamError<i32>) -> ErrorAction {
match error.retries {
0..=2 => ErrorAction::Retry, // Retry first 3 attempts
_ => ErrorAction::Skip, // Skip after retries exhausted
}
}
// Pipeline-level error handling
fn handle_pipeline_error(error: &StreamError<i32>) -> ErrorAction {
if is_critical_error(&error.source) {
ErrorAction::Stop // Stop on critical errors
} else {
ErrorAction::Skip // Skip non-critical errors
}
}
Error strategies can be configured at multiple levels:
use streamweave_error::ErrorStrategy;
// Configure producer error handling
let producer = MyProducer::new()
.with_config(
ProducerConfig::default()
.with_error_strategy(ErrorStrategy::Retry(3))
);
// Configure transformer error handling
let transformer = MyTransformer::new()
.with_config(
TransformerConfig::default()
.with_error_strategy(ErrorStrategy::Skip)
);
// Configure consumer error handling
let consumer = MyConsumer::new()
.with_config(
ConsumerConfig {
error_strategy: ErrorStrategy::Stop,
name: "output".to_string(),
}
);
use streamweave::PipelineBuilder;
use streamweave_error::ErrorStrategy;
let pipeline = PipelineBuilder::new()
.producer(producer)
.transformer(transformer)
.consumer(consumer)
.with_error_strategy(ErrorStrategy::Skip); // Pipeline-wide default
The error handling system integrates at multiple levels:
βββββββββββββββββββ
β Component ββββencounters errorβββ> StreamError
βββββββββββββββββββ β
β
βΌ
βββββββββββββββββββ ββββββββββββββββ
β ErrorStrategy ββββdeterminesβββ> β ErrorAction β
βββββββββββββββββββ ββββββββββββββββ
β β
β βΌ
β ββββββββββββββββ
ββββuses context fromβββ> β ErrorContext β
ββββββββββββββββ
Stop (Default):
Skip:
Retry:
Custom:
Error context provides:
Component info enables:
For more examples, see:
streamweave-error depends on:
thiserror - Error trait implementationserde - Serialization supportserde_json - JSON serializationchrono - Timestamp supportThe error handling system is used for:
Contributions are welcome! Please see the Contributing Guide for details.
This project is licensed under the CC BY-SA 4.0 license.