| Crates.io | drasi-reaction-http-adaptive |
| lib.rs | drasi-reaction-http-adaptive |
| version | 0.2.1 |
| created_at | 2026-01-15 01:04:00.630366+00 |
| updated_at | 2026-01-23 06:21:36.375156+00 |
| description | HTTP Adaptive reaction plugin for Drasi |
| homepage | |
| repository | https://github.com/drasi-project/drasi-core |
| max_upload_size | |
| id | 2044281 |
| size | 154,278 |
An intelligent HTTP webhook reaction for Drasi that automatically batches query results based on real-time throughput patterns, optimizing both latency and network efficiency.
The HTTP Adaptive Reaction extends the standard HTTP reaction with intelligent batching capabilities. It monitors data flow patterns and dynamically adjusts batch size and timing to optimize performance:
{base_url}/batch for efficient processingThe HTTP Adaptive Reaction supports two configuration approaches: a fluent builder pattern (recommended for programmatic use) and a config struct approach (for YAML/serialization).
use drasi_reaction_http_adaptive::AdaptiveHttpReaction;
use drasi_reaction_http::QueryConfig;
let reaction = AdaptiveHttpReaction::builder("analytics-webhook")
.with_base_url("https://api.example.com")
.with_token("your-api-token")
.with_timeout_ms(10000)
.with_queries(vec!["user-activity".to_string()])
.with_min_batch_size(20)
.with_max_batch_size(500)
.with_window_size(50) // 5 seconds
.with_batch_timeout_ms(1000)
.with_auto_start(true)
.build()?;
use drasi_reaction_http_adaptive::HttpAdaptiveReactionConfig;
use drasi_lib::reactions::common::AdaptiveBatchConfig;
use std::collections::HashMap;
let config = HttpAdaptiveReactionConfig {
base_url: "https://api.example.com".to_string(),
token: Some("your-api-token".to_string()),
timeout_ms: 10000,
routes: HashMap::new(),
adaptive: AdaptiveBatchConfig {
adaptive_min_batch_size: 20,
adaptive_max_batch_size: 500,
adaptive_window_size: 50, // 5 seconds (50 × 100ms)
adaptive_batch_timeout_ms: 1000,
},
};
let reaction = AdaptiveHttpReaction::new(
"analytics-webhook",
vec!["user-activity".to_string()],
config
);
| Name | Description | Data Type | Valid Values | Default |
|---|---|---|---|---|
base_url |
Base URL for HTTP requests. Batch requests sent to {base_url}/batch |
String | Valid HTTP/HTTPS URL | "http://localhost" |
token |
Optional bearer token for authentication | Option |
Any string | None |
timeout_ms |
Request timeout in milliseconds | u64 | 1 - 300000 (5 min) | 5000 |
routes |
Query-specific route configurations for individual requests | HashMap<String, QueryConfig> | See QueryConfig docs | Empty map |
adaptive_min_batch_size |
Minimum batch size used during idle/low traffic | usize | 1 - 10000 | 1 |
adaptive_max_batch_size |
Maximum batch size used during burst traffic | usize | 1 - 10000 | 100 |
adaptive_window_size |
Window size for throughput monitoring in 100ms units (10 = 1 sec, 50 = 5 sec, 100 = 10 sec) | usize | 1 - 255 | 10 (1 second) |
adaptive_batch_timeout_ms |
Maximum time to wait before flushing a partial batch | u64 | 1 - 60000 | 1000 |
auto_start |
Whether to start automatically when added to DrasiLib | bool | true, false |
true |
The reaction monitors throughput over the configured window and adjusts parameters:
| Throughput Level | Messages/Sec | Batch Size | Wait Time |
|---|---|---|---|
| Idle | < 1 | min_batch_size | 1ms |
| Low | 1-100 | 2 × min | 1ms |
| Medium | 100-1K | 25% of max | 10ms |
| High | 1K-10K | 50% of max | 25ms |
| Burst | > 10K | max_batch_size | 50ms |
When multiple results are available, the reaction sends a POST request to {base_url}/batch with the following JSON structure:
[
{
"query_id": "user-changes",
"results": [
{
"type": "ADD",
"data": {"id": "user_123", "name": "John Doe"},
"after": {"id": "user_123", "name": "John Doe"}
},
{
"type": "UPDATE",
"data": {"id": "user_456", "name": "Jane Smith"},
"before": {"id": "user_456", "name": "Jane Doe"},
"after": {"id": "user_456", "name": "Jane Smith"}
},
{
"type": "DELETE",
"data": {"id": "user_789", "name": "Bob Wilson"},
"before": {"id": "user_789", "name": "Bob Wilson"}
}
],
"timestamp": "2025-10-19T12:34:56.789Z",
"count": 3
}
]
Each batch result object contains:
query_id (string): The ID of the query that produced these resultsresults (array): Array of result objects from the query
type (string): Operation type - "ADD", "UPDATE", or "DELETE"data (object): The result data from the querybefore (object, optional): Previous state for UPDATE/DELETE operationsafter (object, optional): New state for ADD/UPDATE operationstimestamp (string): ISO 8601 timestamp when the batch was createdcount (number): Number of results in this batch (matches results.length)When only a single result is available or batch endpoints are disabled, the reaction uses query-specific routes (if configured) and sends individual POST requests with the raw result data.
All requests include:
Content-Type: application/jsonAuthorization: Bearer {token} (if token is configured)use drasi_reaction_http_adaptive::AdaptiveHttpReaction;
// Configure for high-throughput analytics with large batches
let reaction = AdaptiveHttpReaction::builder("analytics-webhook")
.with_base_url("https://analytics.example.com")
.with_token("analytics-api-key")
.with_queries(vec!["user-events".to_string()])
.with_min_batch_size(50)
.with_max_batch_size(2000)
.with_window_size(100) // 10 seconds
.with_batch_timeout_ms(500)
.with_timeout_ms(30000) // 30 second timeout for large batches
.build()?;
use drasi_reaction_http_adaptive::AdaptiveHttpReaction;
// Configure for low-latency monitoring with immediate delivery
let reaction = AdaptiveHttpReaction::builder("alert-webhook")
.with_base_url("https://alerts.example.com")
.with_queries(vec!["critical-events".to_string()])
.with_min_batch_size(1)
.with_max_batch_size(50)
.with_window_size(30) // 3 seconds
.with_batch_timeout_ms(10) // Send quickly
.with_timeout_ms(5000)
.build()?;
use drasi_reaction_http_adaptive::AdaptiveHttpReaction;
use drasi_reaction_http::{QueryConfig, CallSpec};
use std::collections::HashMap;
// Configure routes for individual fallback handling
let mut routes = HashMap::new();
routes.insert("users".to_string(), QueryConfig {
added: Some(CallSpec {
url: "/users".to_string(),
method: "POST".to_string(),
body: "{{data.after}}".to_string(),
headers: HashMap::new(),
}),
updated: Some(CallSpec {
url: "/users/{{data.after.id}}".to_string(),
method: "PUT".to_string(),
body: "{{data.after}}".to_string(),
headers: HashMap::new(),
}),
deleted: Some(CallSpec {
url: "/users/{{data.before.id}}".to_string(),
method: "DELETE".to_string(),
body: String::new(),
headers: HashMap::new(),
}),
});
let reaction = AdaptiveHttpReaction::builder("user-sync")
.with_base_url("https://api.example.com")
.with_queries(vec!["users".to_string(), "orders".to_string()])
.with_route("users".to_string(), routes["users"].clone())
.with_min_batch_size(10)
.with_max_batch_size(100)
.build()?;
# config.yaml
reactions:
- id: adaptive-webhook
reaction_type: http_adaptive
queries:
- user-activity
- order-updates
auto_start: true
config:
base_url: https://webhook.example.com
token: your-secret-token
timeout_ms: 10000
adaptive_min_batch_size: 20
adaptive_max_batch_size: 500
adaptive_window_size: 50
adaptive_batch_timeout_ms: 1000
The reaction uses an internal channel for batching with capacity automatically scaled to:
channel_capacity = max_batch_size × 5
This provides sufficient buffering for pipeline parallelism and burst handling:
| max_batch_size | Channel Capacity | Memory (1KB/event) |
|---|---|---|
| 100 | 500 | ~500 KB |
| 1,000 | 5,000 | ~5 MB |
| 5,000 | 25,000 | ~25 MB |
The reaction maintains a connection pool with:
This reduces connection overhead and improves throughput for high-frequency webhooks.
The adaptive algorithm automatically balances latency and throughput:
| Feature | HTTP Reaction | HTTP Adaptive Reaction |
|---|---|---|
| Batching | No (one request per result) | Yes (adaptive batching) |
| Throughput | Low-Medium | High |
| Latency | Lowest | Low (adaptive) |
| Network Efficiency | Low | High |
| Memory Usage | Low | Medium |
| Use Case | Low-volume events | Variable or high-volume events |
The reaction consists of two async tasks:
The reaction supports backpressure through the priority queue mechanism:
priority_queue_capacity parameterThe component includes comprehensive tests:
# Run all tests
cargo test -p drasi-reaction-http-adaptive
# Run with logging
RUST_LOG=debug cargo test -p drasi-reaction-http-adaptive -- --nocapture
Copyright 2025 The Drasi Authors.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.