drasi-reaction-sse

Crates.iodrasi-reaction-sse
lib.rsdrasi-reaction-sse
version0.2.1
created_at2026-01-15 04:15:17.979938+00
updated_at2026-01-23 06:23:23.38211+00
descriptionSSE reaction plugin for Drasi
homepage
repositoryhttps://github.com/drasi-project/drasi-core
max_upload_size
id2044555
size176,553
maintainers-core (github:drasi-project:maintainers-core)

documentation

README

SSE Reaction

Server-Sent Events (SSE) reaction plugin for Drasi that streams continuous query results to browser clients in real-time.

Overview

The SSE Reaction component exposes Drasi continuous query results to web clients via Server-Sent Events, enabling real-time data streaming over HTTP. SSE provides a simple, unidirectional push-based communication channel from server to clients, making it ideal for scenarios where clients need to receive continuous updates without polling.

Key Capabilities

  • Real-time streaming: Automatically pushes query result changes to connected clients
  • Browser-native: Uses standard SSE protocol supported by all modern browsers via EventSource API
  • Multi-client broadcast: Efficiently broadcasts events to multiple concurrent clients
  • Automatic heartbeats: Keeps connections alive with configurable heartbeat messages
  • CORS enabled: Configured to allow cross-origin requests from any domain
  • Timestamp tracking: All events include millisecond-precision timestamps
  • Priority queue processing: Ensures events are processed in timestamp order

Use Cases

  • Real-time dashboards and monitoring applications
  • Live data visualization and analytics
  • Event-driven notifications in web applications
  • Streaming sensor data to browser clients
  • Live query result updates for continuous queries
  • Push-based notifications for data changes

Configuration

The SSE Reaction can be configured using either the builder pattern (recommended) or the config struct approach.

Builder Pattern (Recommended)

use drasi_reaction_sse::SseReaction;

let reaction = SseReaction::builder("my-sse-reaction")
    .with_host("0.0.0.0")
    .with_port(8080)
    .with_sse_path("/events")
    .with_heartbeat_interval_ms(30000)
    .with_queries(vec!["sensor-data".to_string(), "alerts".to_string()])
    .with_priority_queue_capacity(1000)
    .with_auto_start(true)
    .build()?;

Config Struct Approach

use drasi_reaction_sse::{SseReaction, SseReactionConfig};

let config = SseReactionConfig {
    host: "0.0.0.0".to_string(),
    port: 8080,
    sse_path: "/events".to_string(),
    heartbeat_interval_ms: 30000,
};

let reaction = SseReaction::new(
    "my-sse-reaction",
    vec!["sensor-data".to_string()],
    config
);

Configuration Options

Option Description Type Valid Values Default
id Unique identifier for the reaction String Any valid string Required
host Host address to bind the SSE server String Valid IP address or hostname "0.0.0.0"
port Port number to bind the SSE server u16 1-65535 8080
sse_path HTTP path for SSE endpoint String Valid URL path "/events"
heartbeat_interval_ms Interval between heartbeat messages in milliseconds u64 > 0 30000 (30 seconds)
queries List of query IDs to subscribe to Vec<String> Valid query identifiers []
priority_queue_capacity Custom capacity for priority queue (optional) usize > 0 Auto-configured
auto_start Whether to start automatically when added bool true/false true
routes Query-specific template configurations HashMap<String, QueryConfig> Query-specific configs {}
default_template Default template configuration used when no query-specific route is defined Option<QueryConfig> Template config None

Per-Query Configuration

The SSE Reaction supports per-query configuration, allowing you to customize templates and endpoints for each query. This feature enables fine-grained control over the output format for different queries and operation types.

Default Template Configuration

You can set a default template configuration that applies to all queries when no query-specific route is defined. This is useful when you have multiple queries and want to use the same custom templates for all of them, instead of the built-in default format.

Important: Template validation occurs at creation time. If a template is invalid (malformed Handlebars syntax), the builder will return an error. Additionally, if you define routes for queries that aren't subscribed to, the builder will return an error.

QueryConfig

Defines template specifications for each operation type within a query.

Name Description Type Required
added Template specification for ADD operations (new rows). Option<TemplateSpec> No
updated Template specification for UPDATE operations (modified rows). Option<TemplateSpec> No
deleted Template specification for DELETE operations (removed rows). Option<TemplateSpec> No

TemplateSpec

Specification for SSE event output with custom templates and paths.

Name Description Type Default Required
path Optional custom path for this template. If provided, events will be sent to this path. Can be absolute (e.g., "/sensors") or relative to base sse_path (e.g., "sensors" becomes "/events/sensors" if base is "/events"). Supports Handlebars templates for dynamic paths. Option<String> None No
template Event data template as a Handlebars template. If empty, uses default JSON format. String Empty No

Template Variables

When using Handlebars templates, the following variables are available:

Variable Description Available Operations
after The new/current state of the data ADD, UPDATE
before The previous state of the data UPDATE, DELETE
query_name The ID of the query that triggered the change ALL
operation The operation type: "ADD", "UPDATE", or "DELETE" ALL
timestamp Unix timestamp in milliseconds ALL

Example: Per-Query Templates

use drasi_reaction_sse::{SseReaction, QueryConfig, TemplateSpec};

let query_config = QueryConfig {
    added: Some(TemplateSpec {
        path: None,
        template: r#"{
            "event": "sensor_added",
            "sensor_id": "{{after.id}}",
            "temperature": {{after.temperature}},
            "timestamp": {{timestamp}}
        }"#.to_string(),
    }),
    updated: Some(TemplateSpec {
        path: None,
        template: r#"{
            "event": "sensor_updated",
            "sensor_id": "{{after.id}}",
            "old_temp": {{before.temperature}},
            "new_temp": {{after.temperature}},
            "timestamp": {{timestamp}}
        }"#.to_string(),
    }),
    deleted: Some(TemplateSpec {
        path: Some("/sensors/deleted".to_string()),
        template: r#"{
            "event": "sensor_removed",
            "sensor_id": "{{before.id}}",
            "timestamp": {{timestamp}}
        }"#.to_string(),
    }),
};

let reaction = SseReaction::builder("sensor-sse")
    .with_query("sensor-data")
    .with_route("sensor-data", query_config)
    .build()?;

Example: Using the json Helper

The json Handlebars helper serializes complex objects:

use drasi_reaction_sse::{TemplateSpec};

let template_spec = TemplateSpec {
    path: None,
    template: r#"{
        "event": "{{operation}}",
        "query": "{{query_name}}",
        "data": {{json after}}
    }"#.to_string(),
};

This ensures the entire after object is properly JSON-serialized.

Query ID Matching

The SSE Reaction supports flexible query ID matching:

  • Exact match: Configuration key "my-query" matches query ID "my-query"
  • Fallback match: If query ID is "source.my-query", it will also match configuration key "my-query"
  • Disambiguation: If multiple queries share the same final segment, use full query IDs as configuration keys to avoid ambiguity

This is useful when queries are namespaced by source or other prefixes.

Default Behavior

If no route configuration and no default template is provided for a query, the SSE Reaction uses the built-in default behavior:

  • Sends all results from a query in a single event
  • Uses the default JSON format: {"queryId": "...", "results": [...], "timestamp": ...}
  • All events are sent to the configured sse_path

This maintains backward compatibility with existing configurations.

Validation

The SSE Reaction performs validation at build time:

  1. Template Validation: All Handlebars templates (in routes and default template) are validated for correct syntax. If a template is invalid, the builder returns an error.

  2. Route Validation: All routes must correspond to subscribed queries. The builder checks that each route matches either:

    • An exact query ID
    • The last segment of a dotted query ID (e.g., route "query1" matches query "source.query1")

    If a route doesn't match any subscribed query, the builder returns an error.

Example of validation error:

// This will fail because "query2" is not subscribed
let result = SseReaction::builder("test")
    .with_query("query1")
    .with_route("query2", some_config)
    .build();

assert!(result.is_err());

Output Schema

The SSE Reaction emits two types of events:

Query Result Event

Sent whenever subscribed queries produce new results:

{
  "queryId": "sensor-data",
  "results": [
    {
      "id": "sensor-1",
      "temperature": 85.2,
      "timestamp": "2025-12-05T10:30:00Z"
    },
    {
      "id": "sensor-2",
      "temperature": 92.7,
      "timestamp": "2025-12-05T10:30:01Z"
    }
  ],
  "timestamp": 1706742123456
}

Fields:

  • queryId (string): The ID of the query that produced the results
  • results (array): Array of result objects from the query
  • timestamp (number): Unix timestamp in milliseconds when the event was generated

Heartbeat Event

Sent at regular intervals to keep connections alive. Note: Heartbeat messages are sent to all SSE paths and are not affected by custom templates. They always use the standard format shown below:

{
  "type": "heartbeat",
  "ts": 1706742123456
}

Fields:

  • type (string): Always "heartbeat" for heartbeat events
  • ts (number): Unix timestamp in milliseconds when the heartbeat was sent

Usage Examples

Basic Usage with Single Query

use drasi_reaction_sse::SseReaction;

// Create SSE reaction for a single query
let reaction = SseReaction::builder("temperature-monitor")
    .with_query("high-temp-sensors")
    .with_port(8080)
    .build()?;

// The reaction will be available at http://0.0.0.0:8080/events

Multiple Queries with Custom Configuration

use drasi_reaction_sse::SseReaction;

let reaction = SseReaction::builder("multi-query-sse")
    .with_queries(vec![
        "sensor-data".to_string(),
        "alert-events".to_string(),
        "system-metrics".to_string(),
    ])
    .with_host("localhost")
    .with_port(9090)
    .with_sse_path("/api/stream")
    .with_heartbeat_interval_ms(15000)  // 15 second heartbeats
    .build()?;

// Available at http://localhost:9090/api/stream

Custom Priority Queue Capacity

use drasi_reaction_sse::SseReaction;

// For high-volume scenarios, configure a larger priority queue
let reaction = SseReaction::builder("high-volume-sse")
    .with_query("rapid-events")
    .with_priority_queue_capacity(10000)
    .build()?;

Default Template for Multiple Queries

use drasi_reaction_sse::{SseReaction, QueryConfig, TemplateSpec};

// Define a default template that applies to all queries
let default_template = QueryConfig {
    added: Some(TemplateSpec {
        path: None,
        template: r#"{
            "event": "data_added",
            "query": "{{query_name}}",
            "data": {{json after}},
            "timestamp": {{timestamp}}
        }"#.to_string(),
    }),
    updated: Some(TemplateSpec {
        path: None,
        template: r#"{
            "event": "data_updated",
            "query": "{{query_name}}",
            "before": {{json before}},
            "after": {{json after}},
            "timestamp": {{timestamp}}
        }"#.to_string(),
    }),
    deleted: Some(TemplateSpec {
        path: None,
        template: r#"{
            "event": "data_deleted",
            "query": "{{query_name}}",
            "data": {{json before}},
            "timestamp": {{timestamp}}
        }"#.to_string(),
    }),
};

let reaction = SseReaction::builder("multi-query-sse")
    .with_queries(vec![
        "sensor-data".to_string(),
        "alert-events".to_string(),
        "system-metrics".to_string(),
    ])
    .with_default_template(default_template)
    .build()?;

// All queries will use the same custom template format

Per-Query Custom Templates

use drasi_reaction_sse::{SseReaction, QueryConfig, TemplateSpec};

// Define custom templates for different operation types
let sensor_config = QueryConfig {
    added: Some(TemplateSpec {
        path: None,
        template: r#"{
            "type": "new_sensor",
            "id": "{{after.sensor_id}}",
            "name": "{{after.name}}",
            "initial_temp": {{after.temperature}},
            "timestamp": {{timestamp}}
        }"#.to_string(),
    }),
    updated: Some(TemplateSpec {
        path: None,
        template: r#"{
            "type": "sensor_update",
            "id": "{{after.sensor_id}}",
            "temp_change": {
                "from": {{before.temperature}},
                "to": {{after.temperature}}
            },
            "timestamp": {{timestamp}}
        }"#.to_string(),
    }),
    deleted: Some(TemplateSpec {
        path: None,
        template: r#"{
            "type": "sensor_removed",
            "id": "{{before.sensor_id}}",
            "last_temp": {{before.temperature}},
            "timestamp": {{timestamp}}
        }"#.to_string(),
    }),
};

let reaction = SseReaction::builder("sensor-stream")
    .with_query("sensor-readings")
    .with_route("sensor-readings", sensor_config)
    .with_port(8080)
    .build()?;

// Clients will receive formatted events based on operation type

Integration with DrasiLib

use drasi_lib::DrasiLib;
use drasi_reaction_sse::SseReaction;

let drasi = DrasiLib::new()
    .add_query(my_query)
    .add_reaction(
        SseReaction::builder("web-dashboard")
            .with_query("dashboard-data")
            .with_port(8080)
            .build()?
    )
    .build()
    .await?;

Client-Side JavaScript Example

// Connect to SSE endpoint
const eventSource = new EventSource('http://localhost:8080/events');

// Handle query result events
eventSource.onmessage = (event) => {
  const data = JSON.parse(event.data);

  if (data.type === 'heartbeat') {
    console.log('Heartbeat received at', data.ts);
  } else {
    console.log('Query results from', data.queryId);
    console.log('Results:', data.results);
    console.log('Timestamp:', data.timestamp);

    // Update your UI with the new data
    updateDashboard(data.results);
  }
};

// Handle connection events
eventSource.onerror = (error) => {
  console.error('SSE connection error:', error);
};

eventSource.onopen = () => {
  console.log('SSE connection opened');
};

Python Client Example

import sseclient
import json
import requests

def stream_events():
    response = requests.get('http://localhost:8080/events', stream=True)
    client = sseclient.SSEClient(response)

    for event in client.events():
        data = json.loads(event.data)

        if data.get('type') == 'heartbeat':
            print(f"Heartbeat at {data['ts']}")
        else:
            print(f"Query: {data['queryId']}")
            print(f"Results: {data['results']}")
            print(f"Timestamp: {data['timestamp']}")

if __name__ == '__main__':
    stream_events()

Architecture Details

Event Processing Flow

  1. Query Subscription: Upon start, the reaction subscribes to all configured queries
  2. Priority Queue: Query results are queued in timestamp order to maintain event ordering
  3. Broadcasting: Results are broadcast to all connected SSE clients via a Tokio broadcast channel
  4. Heartbeats: A separate task sends periodic heartbeat messages to keep connections alive
  5. HTTP Server: Axum HTTP server handles SSE connections with CORS enabled

Multi-Client Broadcast

The SSE Reaction uses Tokio's broadcast channel to efficiently distribute events to multiple clients:

  • Channel capacity: 1024 messages
  • Late subscribers receive new events only (no replay)
  • Slow clients may experience message lag if they fall too far behind
  • Client disconnections are handled automatically

CORS Configuration

The SSE server is configured with permissive CORS to allow browser connections from any origin:

  • Allowed origins: Any (*)
  • Allowed methods: GET, OPTIONS
  • Allowed headers: Any

Connection Keep-Alive

Two mechanisms ensure connection stability:

  1. Heartbeat messages: Sent at configured intervals (default 30 seconds)
  2. SSE keep-alive: Axum's built-in keep-alive with 30-second intervals

Error Handling

The SSE Reaction handles various error conditions gracefully:

  • Port binding failures: Logged but don't prevent startup
  • Client disconnections: Automatically cleaned up
  • Broadcast channel full: Old messages are dropped (lagging clients)
  • No connected clients: Messages are dropped without error
  • Slow clients: May receive lag errors if they fall too far behind

Performance Considerations

Scalability

  • Client count: Tested with dozens of concurrent clients
  • Message throughput: Handles high-frequency query results via priority queue
  • Memory usage: Broadcast channel has fixed 1024 message capacity
  • CPU usage: Minimal overhead for broadcasting

Best Practices

  1. Heartbeat interval: Balance between connection stability and bandwidth

    • Too short: Unnecessary bandwidth usage
    • Too long: Connections may timeout
  2. Priority queue capacity: Size based on expected query result frequency

    • High-frequency queries: Increase capacity (e.g., 10000)
    • Low-frequency queries: Default is sufficient
  3. Number of queries: Multiple queries share the same SSE connection

    • Clients receive all results from all subscribed queries
    • Filter on client side if needed
  4. Network considerations: SSE uses HTTP/1.1 with chunked encoding

    • Works through most firewalls and proxies
    • Browser limits: ~6 concurrent SSE connections per domain

Troubleshooting

Common Issues

SSE connection immediately closes:

  • Check that the server is running and the port is accessible
  • Verify firewall rules allow inbound connections
  • Check browser console for CORS errors

No events received:

  • Verify queries are producing results
  • Check query subscriptions are correct
  • Review server logs for processing errors

Heartbeat messages but no query results:

  • Confirm queries are configured and running
  • Check query IDs match between reaction and actual queries
  • Verify queries are producing output

Client shows lag errors:

  • Increase broadcast channel capacity
  • Reduce query result frequency
  • Consider multiple SSE reactions for different query groups

Dependencies

  • drasi-lib: Core Drasi library for reaction framework
  • axum: HTTP server for SSE endpoints
  • tower-http: CORS middleware
  • tokio: Async runtime and broadcast channels
  • tokio-stream: Stream utilities for SSE
  • serde_json: JSON serialization
  • chrono: Timestamp generation
  • log: Logging framework

License

Copyright 2025 The Drasi Authors.

Licensed under the Apache License, Version 2.0. See LICENSE file for details.

Commit count: 60

cargo fmt