| Crates.io | drasi-reaction-sse |
| lib.rs | drasi-reaction-sse |
| version | 0.2.1 |
| created_at | 2026-01-15 04:15:17.979938+00 |
| updated_at | 2026-01-23 06:23:23.38211+00 |
| description | SSE reaction plugin for Drasi |
| homepage | |
| repository | https://github.com/drasi-project/drasi-core |
| max_upload_size | |
| id | 2044555 |
| size | 176,553 |
Server-Sent Events (SSE) reaction plugin for Drasi that streams continuous query results to browser clients in real-time.
The SSE Reaction component exposes Drasi continuous query results to web clients via Server-Sent Events, enabling real-time data streaming over HTTP. SSE provides a simple, unidirectional push-based communication channel from server to clients, making it ideal for scenarios where clients need to receive continuous updates without polling.
The SSE Reaction can be configured using either the builder pattern (recommended) or the config struct approach.
use drasi_reaction_sse::SseReaction;
let reaction = SseReaction::builder("my-sse-reaction")
.with_host("0.0.0.0")
.with_port(8080)
.with_sse_path("/events")
.with_heartbeat_interval_ms(30000)
.with_queries(vec!["sensor-data".to_string(), "alerts".to_string()])
.with_priority_queue_capacity(1000)
.with_auto_start(true)
.build()?;
use drasi_reaction_sse::{SseReaction, SseReactionConfig};
let config = SseReactionConfig {
host: "0.0.0.0".to_string(),
port: 8080,
sse_path: "/events".to_string(),
heartbeat_interval_ms: 30000,
};
let reaction = SseReaction::new(
"my-sse-reaction",
vec!["sensor-data".to_string()],
config
);
| Option | Description | Type | Valid Values | Default |
|---|---|---|---|---|
id |
Unique identifier for the reaction | String | Any valid string | Required |
host |
Host address to bind the SSE server | String | Valid IP address or hostname | "0.0.0.0" |
port |
Port number to bind the SSE server | u16 | 1-65535 | 8080 |
sse_path |
HTTP path for SSE endpoint | String | Valid URL path | "/events" |
heartbeat_interval_ms |
Interval between heartbeat messages in milliseconds | u64 | > 0 | 30000 (30 seconds) |
queries |
List of query IDs to subscribe to | Vec<String> | Valid query identifiers | [] |
priority_queue_capacity |
Custom capacity for priority queue (optional) | usize | > 0 | Auto-configured |
auto_start |
Whether to start automatically when added | bool | true/false | true |
routes |
Query-specific template configurations | HashMap<String, QueryConfig> | Query-specific configs | {} |
default_template |
Default template configuration used when no query-specific route is defined | Option<QueryConfig> | Template config | None |
The SSE Reaction supports per-query configuration, allowing you to customize templates and endpoints for each query. This feature enables fine-grained control over the output format for different queries and operation types.
You can set a default template configuration that applies to all queries when no query-specific route is defined. This is useful when you have multiple queries and want to use the same custom templates for all of them, instead of the built-in default format.
Important: Template validation occurs at creation time. If a template is invalid (malformed Handlebars syntax), the builder will return an error. Additionally, if you define routes for queries that aren't subscribed to, the builder will return an error.
Defines template specifications for each operation type within a query.
| Name | Description | Type | Required |
|---|---|---|---|
added |
Template specification for ADD operations (new rows). | Option<TemplateSpec> | No |
updated |
Template specification for UPDATE operations (modified rows). | Option<TemplateSpec> | No |
deleted |
Template specification for DELETE operations (removed rows). | Option<TemplateSpec> | No |
Specification for SSE event output with custom templates and paths.
| Name | Description | Type | Default | Required |
|---|---|---|---|---|
path |
Optional custom path for this template. If provided, events will be sent to this path. Can be absolute (e.g., "/sensors") or relative to base sse_path (e.g., "sensors" becomes "/events/sensors" if base is "/events"). Supports Handlebars templates for dynamic paths. | Option<String> | None | No |
template |
Event data template as a Handlebars template. If empty, uses default JSON format. | String | Empty | No |
When using Handlebars templates, the following variables are available:
| Variable | Description | Available Operations |
|---|---|---|
after |
The new/current state of the data | ADD, UPDATE |
before |
The previous state of the data | UPDATE, DELETE |
query_name |
The ID of the query that triggered the change | ALL |
operation |
The operation type: "ADD", "UPDATE", or "DELETE" | ALL |
timestamp |
Unix timestamp in milliseconds | ALL |
use drasi_reaction_sse::{SseReaction, QueryConfig, TemplateSpec};
let query_config = QueryConfig {
added: Some(TemplateSpec {
path: None,
template: r#"{
"event": "sensor_added",
"sensor_id": "{{after.id}}",
"temperature": {{after.temperature}},
"timestamp": {{timestamp}}
}"#.to_string(),
}),
updated: Some(TemplateSpec {
path: None,
template: r#"{
"event": "sensor_updated",
"sensor_id": "{{after.id}}",
"old_temp": {{before.temperature}},
"new_temp": {{after.temperature}},
"timestamp": {{timestamp}}
}"#.to_string(),
}),
deleted: Some(TemplateSpec {
path: Some("/sensors/deleted".to_string()),
template: r#"{
"event": "sensor_removed",
"sensor_id": "{{before.id}}",
"timestamp": {{timestamp}}
}"#.to_string(),
}),
};
let reaction = SseReaction::builder("sensor-sse")
.with_query("sensor-data")
.with_route("sensor-data", query_config)
.build()?;
The json Handlebars helper serializes complex objects:
use drasi_reaction_sse::{TemplateSpec};
let template_spec = TemplateSpec {
path: None,
template: r#"{
"event": "{{operation}}",
"query": "{{query_name}}",
"data": {{json after}}
}"#.to_string(),
};
This ensures the entire after object is properly JSON-serialized.
The SSE Reaction supports flexible query ID matching:
"my-query" matches query ID "my-query""source.my-query", it will also match configuration key "my-query"This is useful when queries are namespaced by source or other prefixes.
If no route configuration and no default template is provided for a query, the SSE Reaction uses the built-in default behavior:
{"queryId": "...", "results": [...], "timestamp": ...}sse_pathThis maintains backward compatibility with existing configurations.
The SSE Reaction performs validation at build time:
Template Validation: All Handlebars templates (in routes and default template) are validated for correct syntax. If a template is invalid, the builder returns an error.
Route Validation: All routes must correspond to subscribed queries. The builder checks that each route matches either:
If a route doesn't match any subscribed query, the builder returns an error.
Example of validation error:
// This will fail because "query2" is not subscribed
let result = SseReaction::builder("test")
.with_query("query1")
.with_route("query2", some_config)
.build();
assert!(result.is_err());
The SSE Reaction emits two types of events:
Sent whenever subscribed queries produce new results:
{
"queryId": "sensor-data",
"results": [
{
"id": "sensor-1",
"temperature": 85.2,
"timestamp": "2025-12-05T10:30:00Z"
},
{
"id": "sensor-2",
"temperature": 92.7,
"timestamp": "2025-12-05T10:30:01Z"
}
],
"timestamp": 1706742123456
}
Fields:
queryId (string): The ID of the query that produced the resultsresults (array): Array of result objects from the querytimestamp (number): Unix timestamp in milliseconds when the event was generatedSent at regular intervals to keep connections alive. Note: Heartbeat messages are sent to all SSE paths and are not affected by custom templates. They always use the standard format shown below:
{
"type": "heartbeat",
"ts": 1706742123456
}
Fields:
type (string): Always "heartbeat" for heartbeat eventsts (number): Unix timestamp in milliseconds when the heartbeat was sentuse drasi_reaction_sse::SseReaction;
// Create SSE reaction for a single query
let reaction = SseReaction::builder("temperature-monitor")
.with_query("high-temp-sensors")
.with_port(8080)
.build()?;
// The reaction will be available at http://0.0.0.0:8080/events
use drasi_reaction_sse::SseReaction;
let reaction = SseReaction::builder("multi-query-sse")
.with_queries(vec![
"sensor-data".to_string(),
"alert-events".to_string(),
"system-metrics".to_string(),
])
.with_host("localhost")
.with_port(9090)
.with_sse_path("/api/stream")
.with_heartbeat_interval_ms(15000) // 15 second heartbeats
.build()?;
// Available at http://localhost:9090/api/stream
use drasi_reaction_sse::SseReaction;
// For high-volume scenarios, configure a larger priority queue
let reaction = SseReaction::builder("high-volume-sse")
.with_query("rapid-events")
.with_priority_queue_capacity(10000)
.build()?;
use drasi_reaction_sse::{SseReaction, QueryConfig, TemplateSpec};
// Define a default template that applies to all queries
let default_template = QueryConfig {
added: Some(TemplateSpec {
path: None,
template: r#"{
"event": "data_added",
"query": "{{query_name}}",
"data": {{json after}},
"timestamp": {{timestamp}}
}"#.to_string(),
}),
updated: Some(TemplateSpec {
path: None,
template: r#"{
"event": "data_updated",
"query": "{{query_name}}",
"before": {{json before}},
"after": {{json after}},
"timestamp": {{timestamp}}
}"#.to_string(),
}),
deleted: Some(TemplateSpec {
path: None,
template: r#"{
"event": "data_deleted",
"query": "{{query_name}}",
"data": {{json before}},
"timestamp": {{timestamp}}
}"#.to_string(),
}),
};
let reaction = SseReaction::builder("multi-query-sse")
.with_queries(vec![
"sensor-data".to_string(),
"alert-events".to_string(),
"system-metrics".to_string(),
])
.with_default_template(default_template)
.build()?;
// All queries will use the same custom template format
use drasi_reaction_sse::{SseReaction, QueryConfig, TemplateSpec};
// Define custom templates for different operation types
let sensor_config = QueryConfig {
added: Some(TemplateSpec {
path: None,
template: r#"{
"type": "new_sensor",
"id": "{{after.sensor_id}}",
"name": "{{after.name}}",
"initial_temp": {{after.temperature}},
"timestamp": {{timestamp}}
}"#.to_string(),
}),
updated: Some(TemplateSpec {
path: None,
template: r#"{
"type": "sensor_update",
"id": "{{after.sensor_id}}",
"temp_change": {
"from": {{before.temperature}},
"to": {{after.temperature}}
},
"timestamp": {{timestamp}}
}"#.to_string(),
}),
deleted: Some(TemplateSpec {
path: None,
template: r#"{
"type": "sensor_removed",
"id": "{{before.sensor_id}}",
"last_temp": {{before.temperature}},
"timestamp": {{timestamp}}
}"#.to_string(),
}),
};
let reaction = SseReaction::builder("sensor-stream")
.with_query("sensor-readings")
.with_route("sensor-readings", sensor_config)
.with_port(8080)
.build()?;
// Clients will receive formatted events based on operation type
use drasi_lib::DrasiLib;
use drasi_reaction_sse::SseReaction;
let drasi = DrasiLib::new()
.add_query(my_query)
.add_reaction(
SseReaction::builder("web-dashboard")
.with_query("dashboard-data")
.with_port(8080)
.build()?
)
.build()
.await?;
// Connect to SSE endpoint
const eventSource = new EventSource('http://localhost:8080/events');
// Handle query result events
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.type === 'heartbeat') {
console.log('Heartbeat received at', data.ts);
} else {
console.log('Query results from', data.queryId);
console.log('Results:', data.results);
console.log('Timestamp:', data.timestamp);
// Update your UI with the new data
updateDashboard(data.results);
}
};
// Handle connection events
eventSource.onerror = (error) => {
console.error('SSE connection error:', error);
};
eventSource.onopen = () => {
console.log('SSE connection opened');
};
import sseclient
import json
import requests
def stream_events():
response = requests.get('http://localhost:8080/events', stream=True)
client = sseclient.SSEClient(response)
for event in client.events():
data = json.loads(event.data)
if data.get('type') == 'heartbeat':
print(f"Heartbeat at {data['ts']}")
else:
print(f"Query: {data['queryId']}")
print(f"Results: {data['results']}")
print(f"Timestamp: {data['timestamp']}")
if __name__ == '__main__':
stream_events()
The SSE Reaction uses Tokio's broadcast channel to efficiently distribute events to multiple clients:
The SSE server is configured with permissive CORS to allow browser connections from any origin:
*)Two mechanisms ensure connection stability:
The SSE Reaction handles various error conditions gracefully:
Heartbeat interval: Balance between connection stability and bandwidth
Priority queue capacity: Size based on expected query result frequency
Number of queries: Multiple queries share the same SSE connection
Network considerations: SSE uses HTTP/1.1 with chunked encoding
SSE connection immediately closes:
No events received:
Heartbeat messages but no query results:
Client shows lag errors:
drasi-lib: Core Drasi library for reaction frameworkaxum: HTTP server for SSE endpointstower-http: CORS middlewaretokio: Async runtime and broadcast channelstokio-stream: Stream utilities for SSEserde_json: JSON serializationchrono: Timestamp generationlog: Logging frameworkCopyright 2025 The Drasi Authors.
Licensed under the Apache License, Version 2.0. See LICENSE file for details.