| Crates.io | drasi-reaction-grpc |
| lib.rs | drasi-reaction-grpc |
| version | 0.2.1 |
| created_at | 2026-01-15 04:14:08.695707+00 |
| updated_at | 2026-01-23 06:22:12.334735+00 |
| description | gRPC reaction plugin for Drasi |
| homepage | |
| repository | https://github.com/drasi-project/drasi-core |
| max_upload_size | |
| id | 2044549 |
| size | 142,595 |
A Drasi reaction plugin that sends continuous query results to external gRPC services. This component enables real-time integration with gRPC-based systems by streaming query change events as they occur.
The gRPC Reaction component forwards query results from Drasi continuous queries to an external gRPC endpoint. It implements the Drasi Reaction Service protocol, providing a reliable way to push change events to downstream systems.
The builder pattern provides a fluent API for creating gRPC reactions:
use drasi_reaction_grpc::GrpcReaction;
// Minimal configuration
let reaction = GrpcReaction::builder("my-grpc-reaction")
.with_endpoint("grpc://localhost:50052")
.with_query("sensor-alerts")
.build()?;
// Full configuration with all options
let reaction = GrpcReaction::builder("my-grpc-reaction")
.with_queries(vec!["query1".to_string(), "query2".to_string()])
.with_endpoint("grpc://api.example.com:50052")
.with_timeout_ms(10000)
.with_batch_size(200)
.with_batch_flush_timeout_ms(2000)
.with_max_retries(5)
.with_connection_retry_attempts(10)
.with_initial_connection_timeout_ms(15000)
.with_metadata("api-key", "your-api-key")
.with_metadata("tenant-id", "tenant-123")
.with_priority_queue_capacity(1000)
.with_auto_start(true)
.build()?;
Alternatively, create a configuration object and pass it to the constructor:
use drasi_reaction_grpc::{GrpcReaction, GrpcReactionConfig};
use std::collections::HashMap;
let mut metadata = HashMap::new();
metadata.insert("api-key".to_string(), "your-api-key".to_string());
let config = GrpcReactionConfig {
endpoint: "grpc://api.example.com:50052".to_string(),
timeout_ms: 10000,
batch_size: 200,
batch_flush_timeout_ms: 2000,
max_retries: 5,
connection_retry_attempts: 10,
initial_connection_timeout_ms: 15000,
metadata,
};
let reaction = GrpcReaction::new(
"my-grpc-reaction",
vec!["query1".to_string()],
config
);
use drasi_reaction_grpc::{GrpcReaction, GrpcReactionConfig};
// Uses all default values
let config = GrpcReactionConfig::default();
let reaction = GrpcReaction::new("my-reaction", vec!["my-query".to_string()], config);
| Name | Description | Data Type | Valid Values | Default |
|---|---|---|---|---|
endpoint |
gRPC server URL | String | Valid gRPC URL (grpc://host:port) | grpc://localhost:50052 |
timeout_ms |
Request timeout in milliseconds | u64 | Positive integer | 5000 |
batch_size |
Maximum number of items per batch | usize | Positive integer | 100 |
batch_flush_timeout_ms |
Maximum time to wait before flushing partial batch | u64 | Positive integer | 1000 |
max_retries |
Maximum retry attempts for failed requests | u32 | 0 or positive integer | 3 |
connection_retry_attempts |
Number of connection retry attempts | u32 | Positive integer | 5 |
initial_connection_timeout_ms |
Initial connection timeout in milliseconds | u64 | Positive integer | 10000 |
metadata |
Custom metadata headers to include in requests | HashMap<String, String> | Key-value pairs | Empty map |
priority_queue_capacity |
Capacity of the internal priority queue | Option |
None or positive integer | None (uses default) |
auto_start |
Whether to automatically start the reaction | bool | true or false | true |
endpoint: Must be in the format grpc://hostname:port. The protocol prefix is automatically converted to http:// for the underlying transport.
timeout_ms: Controls how long the client will wait for a response from the gRPC server before timing out.
batch_size: Controls the maximum number of query result items to send in a single gRPC request. Larger batches improve throughput but increase memory usage.
batch_flush_timeout_ms: If a batch doesn't reach the batch_size within this timeout, it will be sent anyway. Prevents delays when query results arrive slowly.
max_retries: Number of times to retry a failed request. Uses exponential backoff starting at 100ms, doubling each retry, up to a maximum of 5 seconds.
connection_retry_attempts: When establishing the initial connection, this controls how many times to retry before giving up.
initial_connection_timeout_ms: Timeout for establishing the initial connection to the gRPC server.
metadata: Custom headers sent with each gRPC request. Useful for authentication, tenant identification, or routing.
priority_queue_capacity: Controls the size of the internal queue. If not set, uses the default from the reaction base.
auto_start: If true, the reaction will start immediately when added to the Drasi system. If false, you must manually call start().
The gRPC Reaction sends data using the Protocol Buffer format defined in the drasi.v1 package. All communication uses the ReactionService with the ProcessResults RPC method.
message ProcessResultsRequest {
QueryResult results = 1;
map<string, string> metadata = 2; // Custom metadata headers
}
message QueryResult {
string query_id = 1; // ID of the source query
repeated QueryResultItem results = 2; // Batch of result items
google.protobuf.Timestamp timestamp = 3; // When results were generated
}
message QueryResultItem {
string type = 1; // Change type: "ADD", "UPDATE", "DELETE"
google.protobuf.Struct data = 2; // Current data
google.protobuf.Struct before = 3; // Previous state (for UPDATE)
google.protobuf.Struct after = 4; // New state (for UPDATE)
}
Your gRPC service should return:
message ProcessResultsResponse {
bool success = 1; // Whether processing succeeded
string message = 2; // Human-readable message
string error = 3; // Error details if success = false
uint32 items_processed = 4; // Number of items processed
}
When a query detects changes, the gRPC reaction sends batches like this:
{
"results": {
"query_id": "high-temperature-sensors",
"results": [
{
"type": "ADD",
"data": {
"id": "sensor-001",
"temperature": 85.5,
"location": "Building A"
}
},
{
"type": "UPDATE",
"data": {
"id": "sensor-002",
"temperature": 78.0,
"location": "Building B"
},
"before": {
"id": "sensor-002",
"temperature": 72.0,
"location": "Building B"
},
"after": {
"id": "sensor-002",
"temperature": 78.0,
"location": "Building B"
}
},
{
"type": "DELETE",
"data": {
"id": "sensor-003",
"temperature": 65.0,
"location": "Building C"
}
}
],
"timestamp": "2025-12-05T10:30:00.123456Z"
},
"metadata": {
"api-key": "your-api-key",
"tenant-id": "tenant-123"
}
}
use drasi_reaction_grpc::GrpcReaction;
// Monitor temperature sensors and send alerts to a gRPC service
let reaction = GrpcReaction::builder("temperature-alerts")
.with_endpoint("grpc://alerts.example.com:50052")
.with_query("high-temperature-sensors")
.with_batch_size(50)
.build()?;
use drasi_reaction_grpc::GrpcReaction;
// Aggregate results from multiple queries
let reaction = GrpcReaction::builder("multi-query-aggregator")
.with_endpoint("grpc://aggregator.example.com:50052")
.with_queries(vec![
"sensor-data".to_string(),
"device-status".to_string(),
"alert-conditions".to_string(),
])
.with_batch_size(500)
.with_batch_flush_timeout_ms(5000)
.build()?;
use drasi_reaction_grpc::GrpcReaction;
// Send results to an authenticated API with custom metadata
let reaction = GrpcReaction::builder("authenticated-integration")
.with_endpoint("grpc://api.example.com:50052")
.with_query("customer-events")
.with_metadata("authorization", "Bearer token-xyz")
.with_metadata("x-api-version", "v2")
.with_metadata("x-tenant-id", "acme-corp")
.with_timeout_ms(15000)
.with_max_retries(5)
.build()?;
use drasi_reaction_grpc::GrpcReaction;
// Configure for high-throughput scenarios
let reaction = GrpcReaction::builder("high-throughput-pipeline")
.with_endpoint("grpc://pipeline.example.com:50052")
.with_query("real-time-events")
.with_batch_size(1000)
.with_batch_flush_timeout_ms(100)
.with_priority_queue_capacity(10000)
.with_timeout_ms(30000)
.with_connection_retry_attempts(10)
.build()?;
use drasi_reaction_grpc::GrpcReaction;
use drasi_lib::Reaction;
// Create reaction without auto-start for manual control
let reaction = GrpcReaction::builder("manual-control")
.with_endpoint("grpc://service.example.com:50052")
.with_query("manual-query")
.with_auto_start(false)
.build()?;
// Start manually when ready
reaction.start().await?;
// Check status
let status = reaction.status().await;
println!("Reaction status: {:?}", status);
// Stop when done
reaction.stop().await?;
To receive results from the gRPC Reaction, implement a gRPC service using the drasi.v1.ReactionService protocol.
use tonic::{Request, Response, Status};
use drasi::v1::reaction_service_server::{ReactionService, ReactionServiceServer};
use drasi::v1::{ProcessResultsRequest, ProcessResultsResponse};
pub struct MyReactionService;
#[tonic::async_trait]
impl ReactionService for MyReactionService {
async fn process_results(
&self,
request: Request<ProcessResultsRequest>,
) -> Result<Response<ProcessResultsResponse>, Status> {
let req = request.into_inner();
let query_result = req.results.unwrap();
println!("Received {} items from query: {}",
query_result.results.len(),
query_result.query_id);
// Process each result item
for item in query_result.results {
match item.r#type.as_str() {
"ADD" => {
println!("New item added: {:?}", item.data);
}
"UPDATE" => {
println!("Item updated from {:?} to {:?}",
item.before, item.after);
}
"DELETE" => {
println!("Item deleted: {:?}", item.data);
}
_ => {}
}
}
Ok(Response::new(ProcessResultsResponse {
success: true,
message: "Processed successfully".to_string(),
error: String::new(),
items_processed: query_result.results.len() as u32,
}))
}
}
import grpc
from concurrent import futures
from drasi.v1 import reaction_pb2, reaction_pb2_grpc
class ReactionServicer(reaction_pb2_grpc.ReactionServiceServicer):
def ProcessResults(self, request, context):
query_id = request.results.query_id
items = request.results.results
print(f"Received {len(items)} items from query: {query_id}")
for item in items:
if item.type == "ADD":
print(f"New item: {item.data}")
elif item.type == "UPDATE":
print(f"Updated: {item.before} -> {item.after}")
elif item.type == "DELETE":
print(f"Deleted: {item.data}")
return reaction_pb2.ProcessResultsResponse(
success=True,
message="Processed successfully",
items_processed=len(items)
)
# Start server
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
reaction_pb2_grpc.add_ReactionServiceServicer_to_server(
ReactionServicer(), server
)
server.add_insecure_port('[::]:50052')
server.start()
server.wait_for_termination()
The gRPC Reaction implements sophisticated error handling and retry logic:
max_retries times with exponential backoffsuccess: false, retries up to max_retriesbatch_size) improve throughput but increase latencybatch_flush_timeout_ms to balance latency vs efficiencypriority_queue_capacity based on expected throughputtimeout_ms to your server's processing timemax_retriesThe gRPC Reaction uses the standard Rust log crate with these levels:
Enable logging by configuring the RUST_LOG environment variable:
RUST_LOG=drasi_reaction_grpc=debug cargo run
The GrpcReaction is designed for concurrent use:
Key dependencies:
drasi-lib: Core Drasi librarytonic: gRPC frameworkprost: Protocol Buffer implementationtokio: Async runtimeasync-trait: Async trait supportserde: Configuration serializationanyhow: Error handlingCopyright 2025 The Drasi Authors. Licensed under the Apache License, Version 2.0.