| Crates.io | drasi-reaction-application |
| lib.rs | drasi-reaction-application |
| version | 0.2.1 |
| created_at | 2026-01-15 05:58:11.065672+00 |
| updated_at | 2026-01-23 06:25:06.94399+00 |
| description | Application reaction plugin for Drasi |
| homepage | |
| repository | https://github.com/drasi-project/drasi-core |
| max_upload_size | |
| id | 2044672 |
| size | 120,015 |
The Application Reaction component provides programmatic access to continuous query results directly within your Rust application. Unlike other reaction types (HTTP, gRPC, SSE, Log) that send results to external systems, the Application Reaction delivers results through an in-process channel, enabling direct consumption of query results with zero network overhead.
┌─────────────────┐
│ Drasi Queries │
└────────┬────────┘
│ Query Results
▼
┌─────────────────────────┐
│ ApplicationReaction │
│ (Priority Queue) │
└────────┬────────────────┘
│ mpsc::channel
▼
┌─────────────────────────┐
│ ApplicationReactionHandle│
└────────┬────────────────┘
│
┌────┴─────┬──────────┬────────────┐
▼ ▼ ▼ ▼
Callback Stream Subscription Raw Receiver
The builder pattern provides a fluent API for creating application reactions:
use drasi_reaction_application::ApplicationReaction;
let (reaction, handle) = ApplicationReaction::builder("my-app-reaction")
.with_query("users")
.with_query("orders")
.with_priority_queue_capacity(5000)
.with_auto_start(true)
.build();
// Add to DrasiLib
drasi_lib.add_reaction(reaction).await?;
// Use handle to receive results
let mut subscription = handle.subscribe_with_options(Default::default()).await?;
while let Some(result) = subscription.recv().await {
println!("Received {} results from {}", result.results.len(), result.query_id);
}
For simple cases, use the direct constructor:
use drasi_reaction_application::ApplicationReaction;
let (reaction, handle) = ApplicationReaction::new(
"my-app-reaction",
vec!["users".to_string(), "orders".to_string()]
);
drasi_lib.add_reaction(reaction).await?;
The ApplicationReactionConfig struct is used for serialization/deserialization:
use drasi_reaction_application::ApplicationReactionConfig;
use std::collections::HashMap;
let config = ApplicationReactionConfig {
properties: HashMap::new(), // Flexible properties for future extensions
};
| Method | Description | Data Type | Default |
|---|---|---|---|
with_query(query_id) |
Add a single query ID to subscribe to | String |
Empty list |
with_queries(query_ids) |
Set multiple query IDs to subscribe to | Vec<String> |
Empty list |
with_priority_queue_capacity(capacity) |
Set the priority queue buffer size | usize |
1000 |
with_auto_start(auto_start) |
Set whether reaction auto-starts | bool |
true |
Configure how results are received using SubscriptionOptions:
| Option | Description | Data Type | Default |
|---|---|---|---|
buffer_size |
Maximum number of results to buffer | usize |
1000 |
query_filter |
Filter results by query IDs (empty = all) | Vec<String> |
Empty |
timeout |
Maximum time to wait for results | Option<Duration> |
None (wait forever) |
batch_size |
Maximum results per batch | Option<usize> |
None (10 for batches) |
Results are delivered as QueryResult objects with the following schema:
pub struct QueryResult {
/// The ID of the query that produced these results
pub query_id: String,
/// Timestamp when the result was generated
pub timestamp: chrono::DateTime<chrono::Utc>,
/// Result rows as JSON values
pub results: Vec<serde_json::Value>,
/// Additional metadata about the results
pub metadata: HashMap<String, serde_json::Value>,
/// Optional profiling information for performance analysis
pub profiling: Option<ProfilingMetadata>,
}
Each result in the results array is a JSON object representing a row:
{
"query_id": "users",
"timestamp": "2025-12-05T10:30:00Z",
"results": [
{
"id": 123,
"name": "Alice",
"email": "alice@example.com"
},
{
"id": 456,
"name": "Bob",
"email": "bob@example.com"
}
],
"metadata": {},
"profiling": null
}
The most flexible and recommended approach using subscriptions:
use drasi_reaction_application::{ApplicationReaction, subscription::SubscriptionOptions};
// Create reaction and handle
let (reaction, handle) = ApplicationReaction::builder("results")
.with_query("users")
.build();
// Add to DrasiLib
drasi_lib.add_reaction(reaction).await?;
// Create subscription with default options
let mut subscription = handle.subscribe_with_options(
SubscriptionOptions::default()
).await?;
// Receive results one at a time
while let Some(result) = subscription.recv().await {
println!("Query: {}, Results: {}", result.query_id, result.results.len());
for row in result.results {
println!(" {:?}", row);
}
}
Process results with a callback function (spawns background task):
use drasi_reaction_application::ApplicationReaction;
let (reaction, handle) = ApplicationReaction::builder("results")
.with_queries(vec!["users".to_string(), "orders".to_string()])
.build();
drasi_lib.add_reaction(reaction).await?;
// Subscribe with callback - runs in background
handle.subscribe(|result| {
println!("Query: {}", result.query_id);
println!("Received {} results", result.results.len());
for row in result.results {
// Process each row
println!(" {:?}", row);
}
}).await?;
// Keep main task alive while callback processes results
tokio::time::sleep(Duration::from_secs(60)).await;
Use async iteration for processing results:
use drasi_reaction_application::ApplicationReaction;
let (reaction, handle) = ApplicationReaction::builder("results")
.with_query("sensors")
.build();
drasi_lib.add_reaction(reaction).await?;
// Convert to stream for async iteration
if let Some(mut stream) = handle.as_stream().await {
while let Some(result) = stream.next().await {
println!("Sensor reading: {:?}", result);
}
}
Only receive results from specific queries:
use drasi_reaction_application::{ApplicationReaction, subscription::SubscriptionOptions};
let (reaction, handle) = ApplicationReaction::builder("results")
.with_queries(vec![
"users".to_string(),
"orders".to_string(),
"products".to_string()
])
.build();
drasi_lib.add_reaction(reaction).await?;
// Only receive "users" query results
let options = SubscriptionOptions::default()
.with_query_filter(vec!["users".to_string()]);
let mut subscription = handle.subscribe_with_options(options).await?;
while let Some(result) = subscription.recv().await {
// Only user results arrive here
println!("User update: {:?}", result);
}
Receive multiple results at once for high-throughput scenarios:
use drasi_reaction_application::{ApplicationReaction, subscription::SubscriptionOptions};
let (reaction, handle) = ApplicationReaction::builder("results")
.with_query("high-volume-data")
.with_priority_queue_capacity(10000) // Large buffer
.build();
drasi_lib.add_reaction(reaction).await?;
// Configure for batch processing
let options = SubscriptionOptions::default()
.with_buffer_size(5000)
.with_batch_size(100); // Receive up to 100 results at a time
let mut subscription = handle.subscribe_with_options(options).await?;
loop {
let batch = subscription.recv_batch().await;
if batch.is_empty() {
break; // Channel closed
}
println!("Processing batch of {} results", batch.len());
for result in batch {
// Process each result
}
}
Use timeouts to prevent indefinite blocking:
use drasi_reaction_application::{ApplicationReaction, subscription::SubscriptionOptions};
use std::time::Duration;
let (reaction, handle) = ApplicationReaction::builder("results")
.with_query("sporadic-data")
.build();
drasi_lib.add_reaction(reaction).await?;
// Configure timeout
let options = SubscriptionOptions::default()
.with_timeout(Duration::from_secs(30));
let mut subscription = handle.subscribe_with_options(options).await?;
loop {
match subscription.recv().await {
Some(result) => {
println!("Received result: {:?}", result);
}
None => {
println!("Timeout or channel closed");
break;
}
}
}
Check for results without blocking:
use drasi_reaction_application::ApplicationReaction;
use tokio::time::{sleep, Duration};
let (reaction, handle) = ApplicationReaction::builder("results")
.with_query("events")
.build();
drasi_lib.add_reaction(reaction).await?;
let mut subscription = handle.subscribe_with_options(Default::default()).await?;
loop {
// Non-blocking check for results
match subscription.try_recv() {
Some(result) => {
println!("Got result: {:?}", result);
}
None => {
// Do other work
println!("No results available, doing other work...");
sleep(Duration::from_millis(100)).await;
}
}
}
If you need multiple consumers, create separate application reactions:
use drasi_reaction_application::ApplicationReaction;
// Create separate reactions for different consumers
let (reaction1, handle1) = ApplicationReaction::builder("consumer-1")
.with_query("users")
.build();
let (reaction2, handle2) = ApplicationReaction::builder("consumer-2")
.with_query("users")
.build();
// Add both reactions
drasi_lib.add_reaction(reaction1).await?;
drasi_lib.add_reaction(reaction2).await?;
// Each consumer gets its own copy of results
tokio::spawn(async move {
let mut sub1 = handle1.subscribe_with_options(Default::default()).await.unwrap();
while let Some(result) = sub1.recv().await {
println!("Consumer 1: {:?}", result);
}
});
tokio::spawn(async move {
let mut sub2 = handle2.subscribe_with_options(Default::default()).await.unwrap();
while let Some(result) = sub2.recv().await {
println!("Consumer 2: {:?}", result);
}
});
Each ApplicationReactionHandle can only be consumed once. The underlying receiver is taken on first use:
let (reaction, handle) = ApplicationReaction::builder("results").build();
// This works - first call succeeds
let mut subscription = handle.subscribe_with_options(Default::default()).await?;
// This fails - receiver already taken
let result = handle.as_stream().await;
assert!(result.is_none()); // Returns None
ApplicationReactionHandle is Clone, but all clones share the same receiver:
let (reaction, handle1) = ApplicationReaction::builder("results").build();
let handle2 = handle1.clone();
// Only ONE of these will succeed (whichever is called first)
let result1 = handle1.take_receiver().await; // Gets the receiver
let result2 = handle2.take_receiver().await; // Returns None
ApplicationReactionHandle is thread-safe and can be shared across threadsSubscription and ResultStream are NOT Send - use within a single taskSend + 'static as they run in background tasksResults are delivered in timestamp order using a priority queue:
with_priority_queue_capacity()Methods return anyhow::Result for error handling:
use anyhow::Result;
async fn handle_results(handle: ApplicationReactionHandle) -> Result<()> {
let mut subscription = handle.subscribe_with_options(Default::default()).await?;
while let Some(result) = subscription.recv().await {
// Process result
}
Ok(())
}
recv_batch() for high-throughput scenariosquery_filter to reduce processing overheadimpl ApplicationReaction {
// Builder pattern (recommended)
pub fn builder(id: impl Into<String>) -> ApplicationReactionBuilder;
// Direct constructor
pub fn new(id: impl Into<String>, queries: Vec<String>)
-> (Self, ApplicationReactionHandle);
}
impl ApplicationReactionBuilder {
pub fn new(id: impl Into<String>) -> Self;
pub fn with_queries(self, queries: Vec<String>) -> Self;
pub fn with_query(self, query_id: impl Into<String>) -> Self;
pub fn with_priority_queue_capacity(self, capacity: usize) -> Self;
pub fn with_auto_start(self, auto_start: bool) -> Self;
pub fn build(self) -> (ApplicationReaction, ApplicationReactionHandle);
}
impl ApplicationReactionHandle {
// Flexible subscription (recommended)
pub async fn subscribe_with_options(
&self,
options: SubscriptionOptions
) -> Result<Subscription>;
// Callback pattern
pub async fn subscribe<F>(&self, callback: F) -> Result<()>
where F: FnMut(QueryResult) + Send + 'static;
pub async fn subscribe_filtered<F>(
&self,
query_filter: Vec<String>,
callback: F
) -> Result<()>
where F: FnMut(QueryResult) + Send + 'static;
// Stream pattern
pub async fn as_stream(&self) -> Option<ResultStream>;
// Low-level API
pub async fn take_receiver(&self) -> Option<mpsc::Receiver<QueryResult>>;
// Metadata
pub fn reaction_id(&self) -> &str;
}
impl SubscriptionOptions {
pub fn new() -> Self;
pub fn default() -> Self;
pub fn with_buffer_size(self, size: usize) -> Self;
pub fn with_query_filter(self, queries: Vec<String>) -> Self;
pub fn with_timeout(self, timeout: Duration) -> Self;
pub fn with_batch_size(self, size: usize) -> Self;
}
impl Subscription {
// Blocking receive (with optional timeout)
pub async fn recv(&mut self) -> Option<QueryResult>;
// Non-blocking receive
pub fn try_recv(&mut self) -> Option<QueryResult>;
// Batch receive
pub async fn recv_batch(&mut self) -> Vec<QueryResult>;
// Convert to stream
pub fn into_stream(self) -> SubscriptionStream;
}
impl ResultStream {
pub async fn next(&mut self) -> Option<QueryResult>;
pub fn try_next(&mut self) -> Option<QueryResult>;
}
impl SubscriptionStream {
pub async fn next(&mut self) -> Option<QueryResult>;
}
Run the component tests:
cd drasi-core/components/reactions/application
cargo test
Run with logging:
RUST_LOG=debug cargo test -- --nocapture
Copyright 2025 The Drasi Authors.
Licensed under the Apache License, Version 2.0.