| Crates.io | llm-optimizer-processor |
| lib.rs | llm-optimizer-processor |
| version | 0.1.1 |
| created_at | 2025-11-11 01:44:06.502658+00 |
| updated_at | 2025-11-11 02:39:47.008342+00 |
| description | Data processing and transformation pipeline |
| homepage | https://github.com/globalbusinessadvisors/llm-auto-optimizer |
| repository | https://github.com/globalbusinessadvisors/llm-auto-optimizer |
| max_upload_size | |
| id | 1926580 |
| size | 3,921,456 |
High-performance, production-ready stream processing library for LLM Auto-Optimizer.
use processor::{StreamProcessorBuilder, aggregation::CompositeAggregator};
use chrono::Duration;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Create a stream processor with 5-minute tumbling windows
let mut processor = StreamProcessorBuilder::new()
.build_tumbling(
Duration::minutes(5),
|_key| {
let mut agg = CompositeAggregator::new();
agg.add_count("requests")
.add_avg("avg_latency")
.add_p95("p95_latency")
.add_p99("p99_latency");
agg
},
);
// Get results stream
let mut results = processor.results().unwrap();
// Process events
processor.process_event(
"my-service".to_string(),
chrono::Utc::now(),
123.45 // metric value
).await?;
// Collect results
tokio::spawn(async move {
while let Some(result) = results.next().await {
println!("Window: {}", result.window);
println!("Results: {:?}", result.results);
}
});
Ok(())
}
Fixed-size, non-overlapping windows for periodic aggregations:
let processor = StreamProcessorBuilder::new()
.build_tumbling(Duration::minutes(5), aggregator_factory);
Overlapping windows for moving averages and smoothed metrics:
let processor = StreamProcessorBuilder::new()
.build_sliding(
Duration::minutes(10), // Window size
Duration::seconds(30), // Slide interval
aggregator_factory
);
Gap-based windows for activity clustering:
let processor = StreamProcessorBuilder::new()
.build_session(
Duration::seconds(30), // Inactivity gap
aggregator_factory
);
Run multiple aggregations efficiently:
let mut agg = CompositeAggregator::new();
agg.add_count("total_requests")
.add_avg("avg_response_time")
.add_min("min_response_time")
.add_max("max_response_time")
.add_p50("p50_response_time")
.add_p95("p95_response_time")
.add_p99("p99_response_time")
.add_stddev("stddev_response_time");
Use individual aggregators for specific metrics:
use processor::aggregation::{
AverageAggregator,
PercentileAggregator,
StandardDeviationAggregator,
};
let avg = AverageAggregator::new();
let p95 = PercentileAggregator::new(95);
let stddev = StandardDeviationAggregator::new();
use processor::StreamProcessorConfig;
use chrono::Duration;
let config = StreamProcessorConfig {
allow_late_events: true,
late_event_threshold: Duration::seconds(30),
watermark_interval: Duration::seconds(1),
window_retention: Duration::hours(1),
max_windows_per_key: 1000,
result_buffer_size: 1000,
};
let processor = StreamProcessorBuilder::new()
.with_config(config)
.build_tumbling(Duration::minutes(5), aggregator_factory);
The library includes advanced statistical utilities:
Numerically stable variance calculation:
use processor::aggregation::statistics::OnlineVariance;
let mut variance = OnlineVariance::new();
for value in values {
variance.update(value);
}
let std_dev = variance.std_dev();
Smoothed metrics over time:
use processor::aggregation::statistics::ExponentialMovingAverage;
let mut ema = ExponentialMovingAverage::with_span(10.0)?;
ema.update(value);
let smoothed = ema.value();
Fixed-size window with percentiles:
use processor::aggregation::statistics::SlidingWindowStats;
let mut stats = SlidingWindowStats::new(100)?;
for value in values {
stats.update(value);
}
let p95 = stats.percentile(95.0)?;
let median = stats.median();
Z-score based anomaly detection:
use processor::aggregation::statistics::ZScoreAnomalyDetector;
let mut detector = ZScoreAnomalyDetector::new(3.0); // 3-sigma threshold
let is_anomaly = detector.check(value);
Prevent duplicate events from being processed multiple times:
use processor::state::{RedisStateBackend, RedisConfig, StateBackend};
use std::sync::Arc;
use std::time::Duration;
// Create Redis backend for deduplication
let redis_config = RedisConfig::builder()
.url("redis://localhost:6379")
.key_prefix("dedup:")
.default_ttl(Duration::from_secs(3600))
.build()?;
let backend = Arc::new(RedisStateBackend::new(redis_config).await?);
// Check for duplicates
let event_id = "event_123";
let key = format!("dedup:{}", event_id);
if backend.get(key.as_bytes()).await?.is_none() {
// First time seeing this event - process it
backend.put_with_ttl(key.as_bytes(), b"1", Duration::from_secs(3600)).await?;
process_event(event).await?;
} else {
// Duplicate - skip processing
info!("Duplicate event detected: {}", event_id);
}
// Basic configuration with in-memory backend
let backend = MemoryStateBackend::new(Some(Duration::from_secs(3600)));
// Production configuration with Redis
let redis_config = RedisConfig::builder()
.url("redis://redis-cluster:6379")
.key_prefix("dedup:")
.default_ttl(Duration::from_secs(86400)) // 24 hours
.pool_size(10, 50) // min 10, max 50 connections
.build()?;
// High-performance local storage with Sled
let sled_backend = SledStateBackend::new("./dedup.db")?;
See deduplication/README.md for complete documentation.
Transform irregularly sampled time-series data into uniform intervals for analysis and processing:
use processor::normalization::{TimeSeriesNormalizer, NormalizationConfig, FillStrategy};
use chrono::Duration;
// Configure normalization with 1-second intervals
let config = NormalizationConfig::builder()
.interval(Duration::seconds(1))
.fill_strategy(FillStrategy::Linear)
.max_gap_duration(Duration::seconds(60))
.out_of_order_threshold(Duration::seconds(5))
.build()?;
let mut normalizer = TimeSeriesNormalizer::new(config);
// Process irregular events
normalizer.process_event(timestamp1, value1).await?;
normalizer.process_event(timestamp2, value2).await?;
// Get normalized time-series with uniform intervals
let normalized = normalizer.normalize().await?;
Original (irregular sampling):
t=0.0s: 10.0
t=0.3s: 12.0
t=1.7s: 18.0
t=4.9s: 35.0
Normalized (1-second intervals, linear interpolation):
t=0s: 10.0
t=1s: 14.4 (interpolated)
t=2s: 18.0
t=3s: 24.3 (interpolated)
t=4s: 30.7 (interpolated)
t=5s: 35.0
let config = NormalizationConfig {
// Time interval between normalized points
interval: Duration::seconds(1),
// Timestamp alignment strategy
alignment: AlignmentStrategy::Floor, // or Ceil, Round
// Gap filling strategy
fill_strategy: FillStrategy::Linear,
// Maximum gap to fill (larger gaps left as missing)
max_gap_duration: Some(Duration::seconds(60)),
// Out-of-order handling
out_of_order_buffer_size: 1000,
out_of_order_threshold: Duration::seconds(5),
};
| Strategy | Throughput | Latency (avg) | Use Case |
|---|---|---|---|
| Linear | 500K events/sec | 2.0 μs | Continuous metrics |
| Forward Fill | 800K events/sec | 1.25 μs | Discrete states |
| Zero Fill | 1M events/sec | 1.0 μs | Event counts |
| Spline | 50K events/sec | 20 μs | Visualization |
See normalization/README.md for complete documentation.
Run the examples to see the stream processor in action:
# Basic stream processor demo
cargo run --example stream_processor_demo
# Multi-key sliding window
cargo run --example multi_key_sliding_window
# Event deduplication
cargo run --example event_deduplication_demo
cargo run --example advanced_deduplication
# Time-series normalization
cargo run --example timeseries_normalization_demo
cargo run --example advanced_normalization
# Other examples
cargo run --example kafka_pipeline
cargo run --example aggregation_demo
cargo run --example watermark_demo
Typical performance characteristics:
Get processor statistics:
let stats = processor.stats().await;
println!("Events processed: {}", stats.events_processed);
println!("Windows fired: {}", stats.windows_fired);
println!("Active windows: {}", stats.active_windows);
println!("P99 latency: {:.2}ms", stats.latency_p99_ms);
See STREAM_PROCESSOR.md for detailed implementation documentation including:
# Run all tests
cargo test -p processor
# Run with output
cargo test -p processor -- --nocapture
# Run benchmarks
cargo bench -p processor
The stream processor integrates with:
Apache-2.0
See CONTRIBUTING.md for contribution guidelines.