| Crates.io | drasi-reaction-grpc-adaptive |
| lib.rs | drasi-reaction-grpc-adaptive |
| version | 0.1.2 |
| created_at | 2026-01-15 04:24:43.810736+00 |
| updated_at | 2026-01-23 06:22:48.850693+00 |
| description | gRPC Adaptive reaction plugin for Drasi |
| homepage | |
| repository | https://github.com/drasi-project/drasi-core |
| max_upload_size | |
| id | 2044572 |
| size | 147,887 |
A high-performance gRPC reaction plugin for Drasi that intelligently adapts batching behavior based on real-time throughput patterns to optimize both latency and throughput.
The GrpcAdaptiveReaction component sends continuous query results to external gRPC servers with intelligent, throughput-based batching. Unlike fixed-size batching, it dynamically adjusts batch sizes and wait times based on traffic patterns to provide optimal performance across varying workloads.
High-Throughput Event Processing
Real-Time Data Synchronization
Monitoring and Analytics
Multi-Tenant Systems
The builder pattern provides a type-safe, ergonomic API for constructing reactions:
use drasi_reaction_grpc_adaptive::AdaptiveGrpcReaction;
let reaction = AdaptiveGrpcReaction::builder("event-processor")
.with_endpoint("grpc://event-server:9090")
.with_queries(vec!["event-stream".to_string()])
.with_min_batch_size(50)
.with_max_batch_size(2000)
.with_timeout_ms(15000)
.with_max_retries(5)
.with_metadata("authorization", "Bearer token123")
.with_metadata("x-tenant-id", "prod-456")
.with_priority_queue_capacity(10000)
.with_auto_start(true)
.build()?;
For YAML-based configuration or programmatic construction:
use drasi_reaction_grpc_adaptive::GrpcAdaptiveReactionConfig;
use drasi_lib::reactions::common::AdaptiveBatchConfig;
use std::collections::HashMap;
let mut metadata = HashMap::new();
metadata.insert("authorization".to_string(), "Bearer token123".to_string());
let config = GrpcAdaptiveReactionConfig {
endpoint: "grpc://event-server:9090".to_string(),
timeout_ms: 15000,
max_retries: 5,
connection_retry_attempts: 5,
initial_connection_timeout_ms: 10000,
metadata,
adaptive: AdaptiveBatchConfig {
adaptive_min_batch_size: 50,
adaptive_max_batch_size: 2000,
adaptive_window_size: 100, // 10 seconds (100 × 100ms)
adaptive_batch_timeout_ms: 500,
},
};
let reaction = AdaptiveGrpcReaction::new(
"event-processor",
vec!["event-stream".to_string()],
config,
);
| Name | Description | Data Type | Valid Values | Default |
|---|---|---|---|---|
endpoint |
gRPC server endpoint URL | String | grpc://host:port |
grpc://localhost:50052 |
timeout_ms |
Request timeout in milliseconds | u64 | 100-300000 | 5000 |
max_retries |
Maximum retries for failed requests | u32 | 0-100 | 3 |
connection_retry_attempts |
Connection retry attempts before giving up | u32 | 0-100 | 5 |
initial_connection_timeout_ms |
Initial connection timeout (lazy connect) | u64 | 100-60000 | 10000 |
metadata |
gRPC metadata headers | HashMap<String, String> | Key-value pairs | Empty |
| Name | Description | Data Type | Valid Values | Default |
|---|---|---|---|---|
adaptive_min_batch_size |
Minimum items per batch (low traffic) | usize | 1-10000 | 10 |
adaptive_max_batch_size |
Maximum items per batch (high traffic) | usize | 10-100000 | 1000 |
adaptive_window_size |
Throughput window size (units of 100ms) | usize | 1-300 (0.1s-30s) | 50 (5 seconds) |
adaptive_batch_timeout_ms |
Maximum wait time for batch completion | u64 | 1-10000 | 100 |
| Name | Description | Data Type | Valid Values | Default |
|---|---|---|---|---|
priority_queue_capacity |
Queue capacity for backpressure handling | Option<usize> | None or 100-1000000 | None (unlimited) |
auto_start |
Start reaction automatically on creation | bool | true/false | true |
The reaction classifies traffic into five levels and adjusts parameters accordingly:
| Throughput Level | Messages/Second | Batch Size Strategy | Wait Time |
|---|---|---|---|
| Idle | < 1 | Minimum (optimize latency) | Minimal (1ms) |
| Low | 1-100 | 2× minimum | 1ms |
| Medium | 100-1,000 | 25% of range | 10ms |
| High | 1,000-10,000 | 50% of range | 25ms |
| Burst | > 10,000 | Maximum (optimize throughput) | 50ms |
Channel Capacity: Internal batching channel uses max_batch_size × 5 for optimal pipelining and burst absorption.
The reaction sends data using the gRPC protobuf schema defined in drasi.v1.ReactionService:
Request Format (ProcessResultsRequest):
message ProcessResultsRequest {
QueryResult results = 1;
map<string, string> metadata = 2;
}
message QueryResult {
string query_id = 1;
repeated QueryResultItem results = 2;
google.protobuf.Timestamp timestamp = 3;
}
message QueryResultItem {
string type = 1; // "ADD", "UPDATE", "DELETE"
google.protobuf.Struct data = 2;
google.protobuf.Struct before = 3; // For UPDATE events
google.protobuf.Struct after = 4; // For UPDATE events
}
Response Format (ProcessResultsResponse):
message ProcessResultsResponse {
bool success = 1;
string message = 2;
string error = 3;
uint32 items_processed = 4;
}
Add Event:
{
"query_id": "event-stream",
"results": [
{
"type": "ADD",
"data": {
"id": "sensor-123",
"temperature": 75.5,
"timestamp": 1704067200
}
}
],
"timestamp": "2025-01-01T00:00:00Z"
}
Update Event:
{
"query_id": "event-stream",
"results": [
{
"type": "UPDATE",
"before": {
"id": "sensor-123",
"temperature": 75.5
},
"after": {
"id": "sensor-123",
"temperature": 80.2
}
}
],
"timestamp": "2025-01-01T00:00:10Z"
}
Delete Event:
{
"query_id": "event-stream",
"results": [
{
"type": "DELETE",
"data": {
"id": "sensor-123"
}
}
],
"timestamp": "2025-01-01T00:00:20Z"
}
Optimized for sustained high throughput with large batches:
use drasi_reaction_grpc_adaptive::AdaptiveGrpcReaction;
let reaction = AdaptiveGrpcReaction::builder("high-throughput")
.with_endpoint("grpc://event-processor:9090")
.with_queries(vec!["event-stream".to_string()])
.with_min_batch_size(100)
.with_max_batch_size(5000)
.with_timeout_ms(30000)
.with_max_retries(5)
.build()?;
Traffic Pattern: Handles 1,000-100,000 messages/second Batch Behavior: Starts at 100, scales to 5,000 during bursts Use Case: Log aggregation, metrics collection, event sourcing
Optimized for minimal latency with small batches:
let reaction = AdaptiveGrpcReaction::builder("low-latency")
.with_endpoint("grpc://realtime-api:8080")
.with_queries(vec!["sensor-alerts".to_string()])
.with_min_batch_size(1)
.with_max_batch_size(50)
.with_timeout_ms(5000)
.build()?;
Traffic Pattern: 1-100 messages/second Batch Behavior: Sends immediately when idle, batches up to 50 during spikes Use Case: Real-time dashboards, instant notifications, monitoring alerts
Production configuration with tenant isolation:
let reaction = AdaptiveGrpcReaction::builder("tenant-processor")
.with_endpoint("grpc://multi-tenant-api:9090")
.with_queries(vec!["tenant-events".to_string()])
.with_min_batch_size(25)
.with_max_batch_size(1000)
.with_metadata("authorization", "Bearer eyJhbGciOiJSUzI1...")
.with_metadata("x-tenant-id", "prod-456")
.with_metadata("x-region", "us-west-2")
.with_priority_queue_capacity(50000)
.with_timeout_ms(15000)
.with_max_retries(3)
.build()?;
Features: JWT authentication, tenant routing, regional metadata Traffic Pattern: Variable (100-10,000 messages/second) Use Case: SaaS platforms, multi-tenant analytics, isolated processing
Maximum reliability configuration:
let mut config = GrpcAdaptiveReactionConfig::default();
config.endpoint = "grpc://critical-service:9090".to_string();
config.timeout_ms = 60000; // 1 minute timeout
config.max_retries = 10;
config.connection_retry_attempts = 20;
config.initial_connection_timeout_ms = 30000;
config.adaptive.adaptive_min_batch_size = 10;
config.adaptive.adaptive_max_batch_size = 500;
let reaction = AdaptiveGrpcReaction::new(
"mission-critical",
vec!["critical-events".to_string()],
config,
);
Resilience: 10 retries per batch, 20 connection retries Timeouts: 60s request, 30s initial connection Use Case: Financial transactions, healthcare data, compliance systems
Handles both quiet periods and traffic bursts efficiently:
let reaction = AdaptiveGrpcReaction::builder("variable-traffic")
.with_endpoint("grpc://adaptive-backend:9090")
.with_queries(vec!["user-activity".to_string()])
.with_min_batch_size(1) // Single items during quiet periods
.with_max_batch_size(10000) // Large batches during peak
.with_priority_queue_capacity(100000)
.build()?;
Adaptation Range: 1-10,000 items per batch Traffic Pattern: Highly variable (0-50,000 messages/second) Use Case: E-commerce, social media, gaming analytics
For maximum throughput (> 10,000 msgs/sec):
.with_min_batch_size(500)
.with_max_batch_size(10000)
.with_timeout_ms(30000)
.with_priority_queue_capacity(100000)
For minimum latency (< 10ms P99):
.with_min_batch_size(1)
.with_max_batch_size(50)
.with_timeout_ms(5000)
For typical workloads:
.with_min_batch_size(50)
.with_max_batch_size(1000)
.with_timeout_ms(10000)
The reaction implements comprehensive error handling:
The reaction logs operational metrics:
INFO Adaptive gRPC reaction starting for endpoint: grpc://server:9090 (lazy connection)
DEBUG Adaptive batch collected - Size: 1247, Target: 1000, Wait: 25ms, Level: High
INFO Adaptive metrics - Successful: 100, Failed: 0
WARN Failed to send batch (retry 1/3): connection reset
ERROR Failed to send batch after 3 retries
[dependencies]
drasi-lib = { path = "../../../lib" }
drasi-reaction-grpc = { path = "../grpc" }
anyhow = "1.0"
async-trait = "0.1"
log = "0.4"
tonic = "0.11"
prost = "0.12"
tokio = { version = "1.0", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
| Feature | GrpcAdaptiveReaction | GrpcReaction |
|---|---|---|
| Batching | Adaptive (dynamic) | Fixed size |
| Throughput Classification | 5 levels | None |
| Parameter Adjustment | Automatic | Manual |
| Latency (idle traffic) | Minimal (1ms) | Fixed wait |
| Throughput (burst traffic) | Optimal (large batches) | Fixed |
| Channel Capacity | 5× max batch | Fixed |
| Use Case | Variable traffic | Predictable traffic |
Copyright 2025 The Drasi Authors.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.