| Crates.io | drasi-source-platform |
| lib.rs | drasi-source-platform |
| version | 0.1.2 |
| created_at | 2026-01-14 23:22:12.539478+00 |
| updated_at | 2026-01-23 06:16:11.484184+00 |
| description | Platform source plugin for Drasi |
| homepage | |
| repository | https://github.com/drasi-project/drasi-core |
| max_upload_size | |
| id | 2044161 |
| size | 217,161 |
A Redis Streams-based source plugin for Drasi that consumes CloudEvent-wrapped change events from the Drasi platform infrastructure.
The Platform Source provides integration between external Drasi platform sources and drasi-lib's continuous query engine. It consumes events from Redis Streams using consumer groups, transforming platform SDK event formats into drasi-core's SourceChange format for processing by continuous queries.
External Drasi Platform Source
↓
Redis Stream (CloudEvents)
↓
Platform Source (Consumer Group)
↓
Event Transformation
↓
drasi-lib Queries
The platform source acts as a bridge between external Drasi platform sources and drasi-lib queries:
SourceChange formatConsumer groups enable coordinated consumption across multiple instances:
The builder pattern provides a type-safe, fluent API for constructing platform sources:
use drasi_source_platform::PlatformSource;
let source = PlatformSource::builder("my-platform-source")
.with_redis_url("redis://localhost:6379")
.with_stream_key("sensor-changes")
.with_consumer_group("drasi-consumers")
.with_consumer_name("consumer-1")
.with_batch_size(50)
.with_block_ms(10000)
.with_dispatch_mode(drasi_lib::channels::DispatchMode::Channel)
.with_dispatch_buffer_capacity(1500)
.with_auto_start(true)
.build()?;
drasi_lib.add_source(source).await?;
For programmatic configuration or deserialization from files:
use drasi_source_platform::{PlatformSource, PlatformSourceConfig};
let config = PlatformSourceConfig {
redis_url: "redis://localhost:6379".to_string(),
stream_key: "sensor-changes".to_string(),
consumer_group: "drasi-consumers".to_string(),
consumer_name: Some("consumer-1".to_string()),
batch_size: 50,
block_ms: 10000,
};
let source = PlatformSource::new("my-platform-source", config)?;
drasi_lib.add_source(source).await?;
For declarative configuration in YAML files:
sources:
- id: platform_source
source_type: platform
auto_start: true
dispatch_mode: channel # or "broadcast"
dispatch_buffer_capacity: 1500
properties:
redis_url: "redis://localhost:6379"
stream_key: "sensor-changes"
consumer_group: "drasi-consumers"
consumer_name: "consumer-1"
batch_size: 50
block_ms: 10000
| Name | Type | Description | Valid Values | Default |
|---|---|---|---|---|
redis_url |
String | Redis connection URL (standard redis:// format) | Valid Redis URL | Required |
stream_key |
String | Redis stream key to consume events from | Any valid stream key | Required |
consumer_group |
String | Consumer group name for coordinated consumption | Any identifier | "drasi-core" |
consumer_name |
Option<String> | Unique consumer name within the group | Any unique ID | Auto-generated from source ID |
batch_size |
usize | Number of events to read per XREADGROUP call | 1-10000 (recommended) | 100 |
block_ms |
u64 | Milliseconds to block waiting for new events | 100-60000 (recommended) | 5000 |
dispatch_mode |
DispatchMode | Event dispatch strategy | Channel, Broadcast |
Channel |
dispatch_buffer_capacity |
usize | Buffer size for dispatch channels | Any positive integer | 1000 |
auto_start |
bool | Whether to start automatically when added to DrasiLib | true, false |
true |
Standard Redis connection string formats are supported:
redis://localhost:6379 - Local Redis without authenticationredis://:password@host:6379 - Redis with password authenticationredis://user:password@host:6379 - Redis with username and passwordrediss://host:6379 - Redis with TLS encryptionThe consumer name should be unique within a consumer group:
${HOSTNAME} or pod name for automatic uniquenessControls throughput vs. latency tradeoff:
Controls responsiveness vs. CPU usage:
All events are wrapped in CloudEvent format with the following structure:
{
"id": "5095316c-f4b6-43db-9887-f2730cf1dc2b",
"source": "hello-world-reactivator",
"type": "com.dapr.event.sent",
"specversion": "1.0",
"datacontenttype": "application/json",
"time": "2025-10-03T14:58:12Z",
"pubsubname": "drasi-pubsub",
"topic": "hello-world-change",
"data": [ /* array of change events */ ]
}
The data array contains one or more change events. Each event has:
"i" = insert, "u" = update, "d" = delete){
"op": "i",
"payload": {
"after": {
"id": "user-123",
"labels": ["User"],
"properties": {
"name": "Alice",
"email": "alice@example.com",
"age": 30
}
},
"source": {
"db": "mydb",
"table": "node",
"ts_ns": 1699900000000000000
}
}
}
{
"op": "u",
"payload": {
"after": {
"id": "user-123",
"labels": ["User", "Premium"],
"properties": {
"name": "Alice Updated",
"age": 31,
"premium": true
}
},
"source": {
"db": "mydb",
"table": "node",
"ts_ns": 1699900001000000000
}
}
}
{
"op": "d",
"payload": {
"before": {
"id": "user-123",
"labels": ["User"],
"properties": {}
},
"source": {
"db": "mydb",
"table": "node",
"ts_ns": 1699900002000000000
}
}
}
{
"op": "i",
"payload": {
"after": {
"id": "follows-1",
"labels": ["FOLLOWS"],
"startId": "user-123",
"endId": "user-456",
"properties": {
"since": "2024-01-01"
}
},
"source": {
"db": "mydb",
"table": "rel",
"ts_ns": 1699900003000000000
}
}
}
{
"op": "u",
"payload": {
"after": {
"id": "follows-1",
"labels": ["FOLLOWS"],
"startId": "user-123",
"endId": "user-456",
"properties": {
"since": "2024-01-01",
"strength": 0.8
}
},
"source": {
"db": "mydb",
"table": "rel",
"ts_ns": 1699900004000000000
}
}
}
{
"op": "d",
"payload": {
"before": {
"id": "follows-1",
"labels": ["FOLLOWS"],
"startId": "user-123",
"endId": "user-456",
"properties": {}
},
"source": {
"db": "mydb",
"table": "rel",
"ts_ns": 1699900005000000000
}
}
}
| Field | Type | Required | Description |
|---|---|---|---|
op |
String | Yes | Operation: "i" (insert), "u" (update), "d" (delete) |
payload.after |
Object | Yes (for i/u) | Element state after change |
payload.before |
Object | Yes (for d) | Element state before deletion |
payload.source.db |
String | Yes | Database name (use "Drasi" for control events) |
payload.source.table |
String | Yes | Element type: "node", "rel", or "relation" |
payload.source.ts_ns |
u64 | Yes | Timestamp in nanoseconds (used as effective_from) |
id |
String | Yes | Unique element identifier |
labels |
Array<String> | Yes | Element labels (at least one required) |
properties |
Object | Yes | Element properties (can be empty) |
startId |
String | Yes (relations) | Outgoing node ID for relations |
endId |
String | Yes (relations) | Incoming node ID for relations |
Control events coordinate query subscriptions and are identified by payload.source.db = "Drasi" (case-insensitive).
{
"op": "i",
"payload": {
"after": {
"queryId": "query1",
"queryNodeId": "default",
"nodeLabels": ["Person", "Employee"],
"relLabels": ["KNOWS", "WORKS_FOR"]
},
"source": {
"db": "Drasi",
"table": "SourceSubscription",
"ts_ns": 1000000000
}
}
}
| Field | Type | Required | Description |
|---|---|---|---|
queryId |
String | Yes | Unique query identifier |
queryNodeId |
String | Yes | Query node identifier |
nodeLabels |
Array<String> | No | Node labels query is interested in (defaults to empty) |
relLabels |
Array<String> | No | Relation labels query is interested in (defaults to empty) |
Operations:
"i": Insert subscription (query subscribes to source)"u": Update subscription (query changes label filters)"d": Delete subscription (query unsubscribes from source)Behavior:
| Platform Event | drasi-core SourceChange |
|---|---|
op: "i" |
SourceChange::Insert |
op: "u" |
SourceChange::Update |
op: "d" |
SourceChange::Delete |
payload.source.table: "node" |
Element::Node |
payload.source.table: "rel" or "relation" |
Element::Relation |
startId |
out_node (ElementReference) |
endId |
in_node (ElementReference) |
payload.source.ts_ns |
effective_from (nanoseconds) |
All JSON property types are supported and converted to ElementValue:
{
"properties": {
"string_prop": "hello",
"int_prop": 42,
"float_prop": 3.14,
"bool_prop": true,
"null_prop": null,
"array_prop": [1, 2, 3],
"object_prop": { "nested": "value" }
}
}
use drasi_source_platform::PlatformSource;
use std::sync::Arc;
// Create platform source
let source = PlatformSource::builder("sensor-source")
.with_redis_url("redis://localhost:6379")
.with_stream_key("sensor-changes")
.with_consumer_group("drasi-core")
.with_batch_size(50)
.build()?;
// Add to drasi-lib
drasi_lib.add_source(Arc::new(source)).await?;
use drasi_source_platform::PlatformSource;
use std::env;
// Use hostname for unique consumer name
let consumer_name = env::var("HOSTNAME")
.unwrap_or_else(|_| "consumer-1".to_string());
let source = PlatformSource::builder("platform-source")
.with_redis_url("redis://redis.default.svc.cluster.local:6379")
.with_stream_key("events:changes")
.with_consumer_group("drasi-core-group")
.with_consumer_name(consumer_name)
.with_batch_size(100)
.build()?;
drasi_lib.add_source(Arc::new(source)).await?;
use drasi_source_platform::PlatformSource;
use drasi_lib::bootstrap::InMemoryBootstrapProvider;
let bootstrap_provider = InMemoryBootstrapProvider::new();
let source = PlatformSource::builder("platform-source")
.with_redis_url("redis://localhost:6379")
.with_stream_key("sensor-changes")
.with_bootstrap_provider(bootstrap_provider)
.build()?;
drasi_lib.add_source(Arc::new(source)).await?;
use drasi_source_platform::PlatformSource;
use drasi_lib::channels::DispatchMode;
let source = PlatformSource::builder("platform-source")
.with_redis_url("redis://localhost:6379")
.with_stream_key("sensor-changes")
.with_dispatch_mode(DispatchMode::Broadcast)
.with_dispatch_buffer_capacity(2000)
.build()?;
drasi_lib.add_source(Arc::new(source)).await?;
From external sources, publish events using XADD:
redis-cli XADD sensor-changes * \
data '{
"data": [{
"op": "i",
"payload": {
"after": {
"id": "sensor-1",
"labels": ["Sensor"],
"properties": {
"temperature": 75.5,
"location": "Building A"
}
},
"source": {
"db": "sensors",
"table": "node",
"ts_ns": 1699900000000000000
}
}
}],
"id": "event-1",
"source": "sensor-source",
"type": "com.dapr.event.sent"
}'
The data array can contain multiple events for batch processing:
redis-cli XADD sensor-changes * \
data '{
"data": [
{
"op": "i",
"payload": {
"after": {
"id": "sensor-1",
"labels": ["Sensor"],
"properties": {"temperature": 75.5}
},
"source": {"db": "sensors", "table": "node", "ts_ns": 1000000000}
}
},
{
"op": "i",
"payload": {
"after": {
"id": "sensor-2",
"labels": ["Sensor"],
"properties": {"temperature": 72.0}
},
"source": {"db": "sensors", "table": "node", "ts_ns": 1000000001}
}
}
],
"id": "batch-1",
"source": "sensor-source",
"type": "com.dapr.event.sent"
}'
use drasi_source_platform::PlatformSource;
#[tokio::test]
async fn test_event_consumption() {
let source = PlatformSource::builder("test-source")
.with_redis_url("redis://localhost:6379")
.with_stream_key("test-stream")
.build()?;
// Create test subscription
let mut receiver = source.test_subscribe_async().await;
// Start source
source.start().await?;
// Publish test event to Redis
// ... (use redis-cli or Redis client)
// Receive and verify event
let event = receiver.recv().await?;
// ... assertions
}
When start() is called:
RunningDuring normal operation:
> (new messages)SourceChange formatWhen stop() is called:
StoppedNote: Consumer group position is preserved in Redis. Restarting will resume from last acknowledged position.
batch_size parameterdispatch_buffer_capacityHigh Throughput:
.with_batch_size(500)
.with_block_ms(10000)
.with_dispatch_buffer_capacity(5000)
Low Latency:
.with_batch_size(10)
.with_block_ms(1000)
.with_dispatch_buffer_capacity(500)
Balanced:
.with_batch_size(100)
.with_block_ms(5000)
.with_dispatch_buffer_capacity(1000)
Symptom: "Failed to connect to Redis" errors
Solutions:
redis_url is correct and accessibleredis-cli pingSymptom: BUSYGROUP error or duplicate processing
Solutions:
consumer_name is unique per instance${HOSTNAME} or pod nameredis-cli XINFO CONSUMERS stream_key group_nameredis-cli XGROUP DELCONSUMER stream_key group_name consumer_nameSymptom: Events not appearing in queries
Solutions:
stream_key matches external source's streamredis-cli XINFO GROUPS stream_keyredis-cli XLEN stream_keySymptom: Need to reprocess all events from stream beginning
Solutions:
redis-cli XGROUP DESTROY stream_key group_nameredis-cli XGROUP CREATE stream_key group_name 0 MKSTREAMalways_create_consumer_group: true and start_id: "0" in internal configWarning: Deleting consumer group affects all consumers in the group.
Symptom: High latency or low throughput
Solutions:
batch_size for better throughput (100-500)block_ms for lower latency (1000-3000)redis-cli INFO statsredis-cli XINFO GROUPS stream_keySymptom: "Transformation error" or "Failed to parse JSON" in logs
Solutions:
op, payload, source, table, ts_nsid, labels, properties are presentstartId and endId are presentredis-cli XRANGE stream_key - + COUNT 1Symptom: High memory usage or OOM errors
Solutions:
batch_size to process fewer events at oncedispatch_buffer_capacity to limit bufferingps aux | grep drasiredis-cli INFO memoryConsumer Group Metrics:
redis-cli XINFO GROUPS stream_key
# Check 'lag' field - number of unprocessed messages
# Check 'pending' field - messages delivered but not acknowledged
Consumer Metrics:
redis-cli XINFO CONSUMERS stream_key group_name
# Check 'pending' field per consumer
# Check 'idle' field - time since last activity
Stream Metrics:
redis-cli XLEN stream_key # Total messages in stream
redis-cli XINFO STREAM stream_key # Stream details
status() returns ComponentStatus::RunningThe platform source uses structured logging:
Enable debug logging for troubleshooting:
RUST_LOG=drasi_source_platform=debug cargo run
Create consumer group:
redis-cli XGROUP CREATE stream_key group_name 0 MKSTREAM
# 0 = start from beginning
# $ = start from end
# > = only new messages (used internally)
Read events:
redis-cli XREADGROUP GROUP group_name consumer_name COUNT 10 BLOCK 5000 STREAMS stream_key >
Acknowledge events:
redis-cli XACK stream_key group_name event_id1 event_id2 ...
View pending messages:
redis-cli XPENDING stream_key group_name
Delete consumer:
redis-cli XGROUP DELCONSUMER stream_key group_name consumer_name
Delete consumer group:
redis-cli XGROUP DESTROY stream_key group_name
Redis stream IDs have format {timestamp_ms}-{sequence}:
1699900000000-0)use drasi_lib::{DrasiLib, Query};
use drasi_source_platform::PlatformSource;
use std::sync::Arc;
// Create drasi-lib instance
let mut drasi = DrasiLib::new();
// Add platform source
let source = PlatformSource::builder("platform-source")
.with_redis_url("redis://localhost:6379")
.with_stream_key("events:changes")
.build()?;
drasi.add_source(Arc::new(source)).await?;
// Add query
let query = Query::cypher("monitor-users")
.query("MATCH (u:User) WHERE u.age > 18 RETURN u")
.from_source("platform-source")
.build();
drasi.add_query(query).await?;
// Start all components
drasi.start().await?;
use drasi_server::DrasiServerBuilder;
use drasi_source_platform::PlatformSource;
use std::sync::Arc;
let source = PlatformSource::builder("platform-source")
.with_redis_url("redis://localhost:6379")
.with_stream_key("events:changes")
.build()?;
let server = DrasiServerBuilder::new()
.with_id("my-server")
.with_host_port("0.0.0.0", 8080)
.with_source(source)
.build()
.await?;
server.run().await?;
The platform source captures and propagates timing metadata:
payload.source.ts_nsThis enables end-to-end latency analysis across the entire pipeline.
The platform source efficiently processes batches of events:
{
"data": [
{"op": "i", "payload": {...}},
{"op": "u", "payload": {...}},
{"op": "i", "payload": {...}}
]
}
All events in the data array are transformed and dispatched sequentially.
Consumer group position is persisted in Redis:
Channel Mode (default):
Broadcast Mode: