| Crates.io | drasi-reaction-platform |
| lib.rs | drasi-reaction-platform |
| version | 0.2.1 |
| created_at | 2026-01-15 04:15:13.522439+00 |
| updated_at | 2026-01-23 06:23:57.834724+00 |
| description | Platform reaction plugin for Drasi |
| homepage | |
| repository | https://github.com/drasi-project/drasi-core |
| max_upload_size | |
| id | 2044554 |
| size | 184,298 |
The Platform Reaction is a Drasi component that publishes continuous query results to Redis Streams in CloudEvent format. It receives query results from Drasi queries and publishes them to Redis Streams, allowing downstream consumers to subscribe to specific query results.
The Platform Reaction can be configured using either the builder pattern (preferred for programmatic usage) or the configuration struct approach (for YAML-based or dynamic configuration).
use drasi_reaction_platform::PlatformReaction;
let reaction = PlatformReaction::builder("my-platform-reaction")
.with_queries(vec!["query1".to_string(), "query2".to_string()])
.with_redis_url("redis://localhost:6379")
.with_pubsub_name("my-pubsub")
.with_source_name("my-service")
.with_max_stream_length(10000)
.with_emit_control_events(true)
.with_batch_enabled(true)
.with_batch_max_size(100)
.with_batch_max_wait_ms(50)
.with_priority_queue_capacity(1000)
.with_auto_start(true)
.build()?;
use drasi_reaction_platform::{PlatformReaction, PlatformReactionConfig};
let config = PlatformReactionConfig {
redis_url: "redis://localhost:6379".to_string(),
pubsub_name: Some("my-pubsub".to_string()),
source_name: Some("my-service".to_string()),
max_stream_length: Some(10000),
emit_control_events: true,
batch_enabled: true,
batch_max_size: 100,
batch_max_wait_ms: 50,
};
let reaction = PlatformReaction::new(
"my-platform-reaction",
vec!["query1".to_string()],
config,
)?;
| Name | Description | Data Type | Valid Values | Default |
|---|---|---|---|---|
id |
Unique identifier for the reaction | String |
Any non-empty string | Required |
queries |
List of query IDs to subscribe to | Vec<String> |
Query IDs | Required |
redis_url |
Redis connection URL | String |
Valid Redis URL (e.g., redis://host:port) |
Required |
pubsub_name |
PubSub name for CloudEvent metadata | Option<String> |
Any string | "drasi-pubsub" |
source_name |
Source name for CloudEvent metadata | Option<String> |
Any string | "drasi-core" |
max_stream_length |
Maximum length of Redis streams (enables trimming) | Option<usize> |
Any positive number | None (unlimited) |
emit_control_events |
Whether to emit control events (lifecycle signals) | bool |
true, false |
false |
batch_enabled |
Enable batching of events before publishing | bool |
true, false |
false |
batch_max_size |
Maximum number of events per batch | usize |
1-10000 (recommended: 100-1000) | 100 |
batch_max_wait_ms |
Maximum wait time before flushing batch (milliseconds) | u64 |
1-1000 (recommended: 1-100) | 100 |
priority_queue_capacity |
Capacity of internal priority queue | Option<usize> |
Any positive number | None (default) |
auto_start |
Whether to automatically start the reaction | bool |
true, false |
true |
MAXLEN ~ (approximate trimming) for efficiency. Useful for preventing unbounded stream growth.The Platform Reaction publishes messages to Redis Streams in CloudEvent format. Each message is stored in a stream named {query-id}-results.
Each entry in the Redis stream contains a single field:
XREAD STREAMS my-query-results 0
1) 1) "my-query-results"
2) 1) 1) "1705318245123-0"
2) 1) "data"
2) "{...CloudEvent JSON...}"
All events are wrapped in a CloudEvent envelope conforming to the CloudEvents 1.0 specification:
{
"data": { /* ResultEvent - see below */ },
"datacontenttype": "application/json",
"id": "550e8400-e29b-41d4-a716-446655440000",
"pubsubname": "drasi-pubsub",
"source": "drasi-core",
"specversion": "1.0",
"time": "2025-01-15T10:30:45.123Z",
"topic": "my-query-results",
"type": "com.dapr.event.sent"
}
Change events contain the actual query results:
{
"kind": "change",
"queryId": "my-query",
"sequence": 42,
"sourceTimeMs": 1705318245123,
"addedResults": [
{
"id": "1",
"name": "John Doe",
"temperature": 98.6
}
],
"updatedResults": [
{
"before": {
"id": "2",
"value": 10
},
"after": {
"id": "2",
"value": 20
},
"groupingKeys": ["sensor_id"]
}
],
"deletedResults": [
{
"id": "3",
"name": "Jane Smith"
}
],
"metadata": {
"tracking": {
"source": {
"source_ns": 1744055144490466971,
"changeRouterStart_ns": 1744055159124143047,
"changeRouterEnd_ns": 1744055173551481387,
"seq": 42
},
"query": {
"enqueue_ns": 1744055173551481387,
"dequeue_ns": 1744055178510629042,
"queryStart_ns": 1744055178510650750,
"queryEnd_ns": 1744055178510848750
}
}
}
}
Control events signal lifecycle transitions:
{
"kind": "control",
"queryId": "my-query",
"sequence": 1,
"sourceTimeMs": 1705318245123,
"controlSignal": {
"kind": "bootstrapStarted"
}
}
Control Signal Types:
bootstrapStarted - Query bootstrap process has startedbootstrapCompleted - Query bootstrap process has completedrunning - Query is running normallystopped - Query has been stoppeddeleted - Query has been deleted"application/json""1.0"){query-id}-results"com.dapr.event.sent""change" or "control")When profiling is enabled, the metadata includes timing information:
tracking.source: Source-side timing information
source_ns: Original source timestampchangeRouterStart_ns: When change router received the eventchangeRouterEnd_ns: When change router sent the eventreactivatorStart_ns: When reactivator started processingreactivatorEnd_ns: When reactivator finished processingseq: Source sequence numbertracking.query: Query-side timing information
enqueue_ns: When query was enqueued for processingdequeue_ns: When query was dequeued for processingqueryStart_ns: When query core processing startedqueryEnd_ns: When query core processing endeduse drasi_reaction_platform::PlatformReaction;
// Create a simple Platform reaction
let reaction = PlatformReaction::builder("sensor-reaction")
.with_query("high-temperature-sensors")
.with_redis_url("redis://localhost:6379")
.build()?;
// Start the reaction (if auto_start is false)
reaction.start().await?;
use drasi_reaction_platform::PlatformReaction;
// Subscribe to multiple queries
let reaction = PlatformReaction::builder("multi-query-reaction")
.with_queries(vec![
"query1".to_string(),
"query2".to_string(),
"query3".to_string(),
])
.with_redis_url("redis://redis-cluster:6379")
.build()?;
use drasi_reaction_platform::PlatformReaction;
// Enable batching for high-throughput scenarios
let reaction = PlatformReaction::builder("high-volume-reaction")
.with_query("real-time-analytics")
.with_redis_url("redis://localhost:6379")
.with_batch_enabled(true)
.with_batch_max_size(500) // Batch up to 500 events
.with_batch_max_wait_ms(50) // Or wait max 50ms
.build()?;
use drasi_reaction_platform::PlatformReaction;
// Customize CloudEvent metadata
let reaction = PlatformReaction::builder("custom-cloud-events")
.with_query("my-query")
.with_redis_url("redis://localhost:6379")
.with_pubsub_name("production-pubsub")
.with_source_name("sensor-service")
.build()?;
use drasi_reaction_platform::PlatformReaction;
// Limit stream length to prevent unbounded growth
let reaction = PlatformReaction::builder("managed-streams")
.with_query("event-stream")
.with_redis_url("redis://localhost:6379")
.with_max_stream_length(10000) // Keep only last 10k events
.build()?;
use drasi_reaction_platform::PlatformReaction;
// Enable control events for lifecycle tracking
let reaction = PlatformReaction::builder("lifecycle-tracking")
.with_query("monitored-query")
.with_redis_url("redis://localhost:6379")
.with_emit_control_events(true)
.build()?;
use drasi_reaction_platform::PlatformReaction;
// Configure internal priority queue capacity
let reaction = PlatformReaction::builder("priority-queue-reaction")
.with_query("time-sensitive-query")
.with_redis_url("redis://localhost:6379")
.with_priority_queue_capacity(5000) // Buffer up to 5000 events
.build()?;
use redis::Commands;
// Consumer reading from Redis Streams
let client = redis::Client::open("redis://localhost:6379")?;
let mut con = client.get_connection()?;
// Read from stream
let stream_key = "my-query-results";
let results: redis::streams::StreamReadReply = con.xread(
&[stream_key],
&["0"], // Start from beginning
)?;
for stream in results.keys {
for entry in stream.ids {
let data: String = entry.map.get("data").unwrap().clone();
let cloud_event: CloudEvent<ResultEvent> = serde_json::from_str(&data)?;
match cloud_event.data {
ResultEvent::Change(change) => {
println!("Change event: {} added, {} updated, {} deleted",
change.added_results.len(),
change.updated_results.len(),
change.deleted_results.len()
);
}
ResultEvent::Control(control) => {
println!("Control event: {:?}", control.control_signal);
}
}
}
}
The Platform Reaction follows a modular architecture:
When batching is enabled:
batch_max_sizebatch_max_wait_msMAXLEN ~) to avoid blockingConnection refused errors
redis://host:port)Events not appearing in streams
reaction.start().await?)High memory usage
max_stream_length to limit stream growthbatch_max_size if buffering too many eventspriority_queue_capacityHigh latency
batch_max_wait_ms for lower latency (at cost of throughput)Batch publishing failures
batch_max_size if hitting Redis limitsThe reaction uses the log crate for structured logging:
// Enable debug logging
RUST_LOG=debug cargo run
// Module-specific logging
RUST_LOG=drasi_reaction_platform=debug cargo run
Log Levels:
error! - Publishing failures, serialization errorswarn! - Retry attempts, configuration warningsinfo! - Control events, lifecycle transitionsdebug! - Individual event publishing, batch operationsCopyright 2025 The Drasi Authors.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.