| Crates.io | drasi-source-mock |
| lib.rs | drasi-source-mock |
| version | 0.1.2 |
| created_at | 2026-01-14 23:26:59.14013+00 |
| updated_at | 2026-01-23 06:17:52.013748+00 |
| description | Mock source plugin for Drasi |
| homepage | |
| repository | https://github.com/drasi-project/drasi-core |
| max_upload_size | |
| id | 2044163 |
| size | 124,505 |
The Mock Source is a synthetic data generator plugin designed for testing, development, and demonstration purposes within the Drasi data processing platform. It generates continuous streams of graph node data without requiring any external systems or databases, making it ideal for rapid prototyping, testing query logic, and demonstrating Drasi capabilities.
Testing and Development
Demonstrations and Presentations
Load Testing
Integration Testing
The builder pattern provides a fluent API for constructing MockSource instances with compile-time validation:
use drasi_source_mock::MockSource;
use drasi_lib::channels::DispatchMode;
// Basic construction with defaults
let source = MockSource::builder("my-source")
.build()?;
// Full configuration with all options
let source = MockSource::builder("sensor-source")
.with_data_type("sensor")
.with_interval_ms(1000)
.with_dispatch_mode(DispatchMode::Channel)
.with_dispatch_buffer_capacity(2000)
.with_bootstrap_provider(my_bootstrap_provider)
.with_auto_start(true)
.build()?;
// Counter source for testing
let counter = MockSource::builder("counter")
.with_data_type("counter")
.with_interval_ms(500)
.build()?;
For programmatic or configuration-file-driven scenarios:
use drasi_source_mock::{MockSource, MockSourceConfig};
// Using MockSourceConfig
let config = MockSourceConfig {
data_type: "sensor".to_string(),
interval_ms: 1000,
};
let source = MockSource::new("sensor-source", config)?;
// With custom dispatch settings
let source = MockSource::with_dispatch(
"sensor-source",
config,
Some(DispatchMode::Channel),
Some(2000),
)?;
| Name | Description | Data Type | Valid Values | Default |
|---|---|---|---|---|
id |
Unique identifier for the source instance | String |
Any non-empty string | (Required) |
data_type |
Type of synthetic data to generate | String |
"counter", "sensor", "generic" |
"generic" |
interval_ms |
Interval between data generation events in milliseconds | u64 |
Any positive integer (minimum 1) | 5000 |
dispatch_mode |
Event dispatch mode for subscribers | DispatchMode |
Channel (isolated with backpressure), Broadcast (shared, no backpressure) |
Channel |
dispatch_buffer_capacity |
Buffer size for dispatch channels | usize |
Any positive integer | 1000 |
bootstrap_provider |
Bootstrap provider for initial data delivery | Box<dyn BootstrapProvider> |
Any bootstrap provider implementation | None |
auto_start |
Whether to start automatically when added to DrasiLib | bool |
true, false |
true |
Configuration Validation:
The MockSourceConfig::validate() method checks:
data_type is one of the valid types: "counter", "sensor", or "generic"interval_ms is greater than 0 (non-zero interval required)The MockSource generates data internally and does not consume external input. However, it produces graph nodes with the following schemas based on the configured data_type:
data_type: "counter")Generated Node Schema:
Label: Counter
Element ID Format: counter_{sequence}
Properties:
- value: Integer (sequential counter starting from 1)
- timestamp: String (RFC3339 formatted UTC timestamp)
Example Node:
{
"id": "counter_42",
"labels": ["Counter"],
"properties": {
"value": 42,
"timestamp": "2025-12-05T10:30:45.123456789Z"
}
}
Characteristics:
data_type: "sensor")Generated Node Schema:
Label: SensorReading
Element ID Format: reading_{sensor_id}_{sequence}
Properties:
- sensor_id: String (randomly selected: "sensor_0" to "sensor_4")
- temperature: Float (random value between 20.0 and 30.0)
- humidity: Float (random value between 40.0 and 60.0)
- timestamp: String (RFC3339 formatted UTC timestamp)
Example Node:
{
"id": "reading_2_42",
"labels": ["SensorReading"],
"properties": {
"sensor_id": "sensor_2",
"temperature": 24.73,
"humidity": 52.18,
"timestamp": "2025-12-05T10:30:45.123456789Z"
}
}
Characteristics:
data_type: "generic")Generated Node Schema:
Label: Generic
Element ID Format: generic_{sequence}
Properties:
- value: Integer (random 32-bit signed integer)
- message: String (fixed: "Generic mock data")
- timestamp: String (RFC3339 formatted UTC timestamp)
Example Node:
{
"id": "generic_42",
"labels": ["Generic"],
"properties": {
"value": -1847392047,
"message": "Generic mock data",
"timestamp": "2025-12-05T10:30:45.123456789Z"
}
}
Characteristics:
use drasi_source_mock::MockSource;
use drasi_lib::Source;
use std::sync::Arc;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Create a counter source
let counter_source = Arc::new(
MockSource::builder("counter-source")
.with_data_type("counter")
.with_interval_ms(1000)
.build()?
);
// Start the source
counter_source.start().await?;
// Let it run for a while
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
// Stop the source
counter_source.stop().await?;
Ok(())
}
use drasi_source_mock::MockSource;
use drasi_lib::DrasiLib;
use std::sync::Arc;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Create Drasi instance
let mut drasi = DrasiLib::builder("my-app")
.build()
.await?;
// Create and add mock source
let sensor_source = Arc::new(
MockSource::builder("sensors")
.with_data_type("sensor")
.with_interval_ms(2000)
.build()?
);
drasi.add_source(sensor_source).await?;
// Add a continuous query
let query = drasi_lib::Query::cypher("high-temp")
.query("MATCH (s:SensorReading) WHERE s.temperature > 25.0 RETURN s")
.from_source("sensors")
.build();
drasi.add_query(query).await?;
// Run the system
drasi.run().await?;
Ok(())
}
use drasi_source_mock::{MockSource, MockSourceConfig};
use drasi_lib::channels::SourceEvent;
use drasi_lib::Source;
#[tokio::test]
async fn test_sensor_data_generation() {
// Create sensor source
let config = MockSourceConfig {
data_type: "sensor".to_string(),
interval_ms: 100,
};
let source = MockSource::new("test-sensor", config).unwrap();
// Subscribe to receive events
let mut rx = source.test_subscribe();
// Start generating data
source.start().await.unwrap();
// Collect some events
let mut events = Vec::new();
for _ in 0..5 {
if let Ok(event) = rx.recv().await {
events.push(event);
}
}
// Stop the source
source.stop().await.unwrap();
// Verify we received events
assert_eq!(events.len(), 5);
}
use drasi_source_mock::MockSource;
use drasi_core::models::{
Element, ElementMetadata, ElementPropertyMap,
ElementReference, SourceChange, ElementValue
};
use std::sync::Arc;
#[tokio::test]
async fn test_event_injection() {
let source = MockSource::builder("test-source")
.with_data_type("counter")
.build()
.unwrap();
let mut rx = source.test_subscribe();
// Manually inject a custom event
let element_id = "custom_1";
let reference = ElementReference::new("test-source", element_id);
let mut properties = ElementPropertyMap::new();
properties.insert("value", ElementValue::Integer(999));
let metadata = ElementMetadata {
reference,
labels: Arc::from(vec![Arc::from("Custom")]),
effective_from: 1234567890,
};
let element = Element::Node { metadata, properties };
let change = SourceChange::Insert { element };
// Inject the event
source.inject_event(change).await.unwrap();
// Receive and verify
let event = rx.recv().await.unwrap();
// Verify event contains custom data
}
use drasi_source_mock::MockSource;
use drasi_lib::bootstrap::{BootstrapProvider, BootstrapRequest};
use async_trait::async_trait;
// Custom bootstrap provider
struct MyBootstrapProvider;
#[async_trait]
impl BootstrapProvider for MyBootstrapProvider {
async fn bootstrap(
&self,
request: BootstrapRequest,
) -> anyhow::Result<Vec<drasi_core::models::SourceChange>> {
// Return initial data based on request labels
Ok(vec![/* initial changes */])
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let source = MockSource::builder("sensor-source")
.with_data_type("sensor")
.with_interval_ms(1000)
.with_bootstrap_provider(MyBootstrapProvider)
.build()?;
// Bootstrap will be called when queries subscribe
source.start().await?;
Ok(())
}
use drasi_source_mock::MockSource;
use drasi_lib::channels::DispatchMode;
// Channel mode - isolated channels per subscriber with backpressure
let channel_source = MockSource::builder("channel-source")
.with_data_type("sensor")
.with_dispatch_mode(DispatchMode::Channel)
.with_dispatch_buffer_capacity(5000)
.build()?;
// Broadcast mode - shared channel, no backpressure
// Events may be dropped if subscribers can't keep up
let broadcast_source = MockSource::builder("broadcast-source")
.with_data_type("counter")
.with_dispatch_mode(DispatchMode::Broadcast)
.with_dispatch_buffer_capacity(100)
.build()?;
The MockSource runs an internal Tokio task that:
tokio::time::interval for precise timingdata_type configurationGeneration Loop:
Start → Set Interval → Tick → Check Status → Generate Data →
Dispatch Event → Increment Sequence → Tick → ...
All generated nodes have predictable element IDs:
counter_{sequence} (e.g., counter_1, counter_2, ...)reading_{sensor_id}_{sequence} (e.g., reading_2_42)generic_{sequence} (e.g., generic_1, generic_2, ...)The sequence number increments with each generation cycle, providing traceability and ordering.
All modes include an RFC3339 formatted timestamp:
2025-12-05T10:30:45.123456789Z
Timestamp Strategy:
chrono::Utc::now().to_rfc3339() for timestamp propertySystemTime::duration_since(UNIX_EPOCH)Sensor Mode:
20.0 + rand::random::<f64>() * 10.0 → [20.0, 30.0)40.0 + rand::random::<f64>() * 20.0 → [40.0, 60.0)rand::random::<u32>() % 5 → {0, 1, 2, 3, 4}Generic Mode:
rand::random::<i32>() → Full i32 rangeNote: Uses the rand crate's default RNG, not cryptographically secure.
Status Transitions:
Stopped → Starting → Running → Stopping → Stopped
Start Process:
Stop Process:
When queries subscribe with bootstrap enabled:
Default Behavior: If no bootstrap provider is configured, bootstrap completes immediately with no initial data.
test_subscribe()Creates a test subscription that receives all generated events:
let source = MockSource::builder("test").build()?;
let mut rx = source.test_subscribe();
source.start().await?;
// Receive events
while let Ok(event) = rx.recv().await {
// Process event
}
Returns: Box<dyn ChangeReceiver<SourceEventWrapper>>
inject_event()Manually injects a custom event for testing:
let source = MockSource::builder("test").build()?;
let mut rx = source.test_subscribe();
let change = SourceChange::Insert { element };
source.inject_event(change).await?;
// Event will be received by subscribers
let event = rx.recv().await?;
Use Cases:
| Interval Setting | Events/Second | Typical Use Case |
|---|---|---|
| 10-50ms | 20-100 | High-volume load testing |
| 100-500ms | 2-10 | Rapid testing, development |
| 1000-3000ms | 0.33-1 | Demos, presentations |
| 5000-10000ms | 0.1-0.2 | Slow background generation |
Default: 5000ms (0.2 events/second)
Testing: 100-1000ms intervals
Demonstrations: 1000-3000ms intervals
Load Testing: 10-100ms intervals
Warning: Very short intervals (<10ms) can saturate CPU and memory depending on query complexity and reaction processing time.
Channel Mode (Default):
Broadcast Mode:
rand crate)Symptoms:
Checks:
source.start().await?source.status().await should be ComponentStatus::RunningDebug Commands:
// Check status
let status = source.status().await;
println!("Source status: {:?}", status);
// Check properties
let props = source.properties();
println!("Data type: {:?}", props.get("data_type"));
println!("Interval: {:?}", props.get("interval_ms"));
Symptoms:
Checks:
data_type spelling is exact (case-sensitive)"counter", "sensor", "generic""generic"Verification:
let props = source.properties();
assert_eq!(
props.get("data_type"),
Some(&serde_json::Value::String("sensor".to_string()))
);
Symptoms:
Common Causes:
Wrong Label:
-- Wrong: Looking for wrong label
MATCH (n:Sensor) RETURN n // Should be SensorReading
-- Correct:
MATCH (n:SensorReading) RETURN n
Wrong Property Names:
-- Wrong: Property name typo
MATCH (s:SensorReading) WHERE s.temp > 25 RETURN s // Should be temperature
-- Correct:
MATCH (s:SensorReading) WHERE s.temperature > 25 RETURN s
Impossible Filter:
-- Wrong: Temperature range is 20-30, this filters everything
MATCH (s:SensorReading) WHERE s.temperature > 100 RETURN s
-- Correct: Use realistic range
MATCH (s:SensorReading) WHERE s.temperature > 25 RETURN s
Invalid Data Type:
Error: Validation error: data_type 'sensors' is not valid.
Valid options are: counter, sensor, generic
Solution: Use exact spelling: "counter", "sensor", or "generic"
Zero Interval:
Error: Validation error: interval_ms cannot be 0.
Please specify a positive interval in milliseconds (minimum 1)
Solution: Use positive interval (minimum 1ms)
Symptoms:
Common Causes:
Solutions:
// Increase interval for lower CPU usage
let source = MockSource::builder("sensor")
.with_interval_ms(1000) // Was: 10ms
.build()?;
// Use broadcast mode to reduce dispatch overhead
let source = MockSource::builder("sensor")
.with_dispatch_mode(DispatchMode::Broadcast)
.build()?;
/Users/allenjones/dev/agentofreality/drasi/drasi-server/drasi-core/lib/src/sources/README.md/Users/allenjones/dev/agentofreality/drasi/drasi-server/drasi-core/lib/src/bootstrap/README.md/Users/allenjones/dev/agentofreality/drasi/drasi-server/drasi-core/lib/src/channels/README.md/Users/allenjones/dev/agentofreality/drasi/drasi-server/drasi-core/lib/src/sources/base.rsSee the test suite in src/tests.rs for comprehensive usage examples including: