| Crates.io | drasi-source-application |
| lib.rs | drasi-source-application |
| version | 0.1.2 |
| created_at | 2026-01-15 00:50:17.074034+00 |
| updated_at | 2026-01-23 06:17:20.347191+00 |
| description | Application source plugin for Drasi |
| homepage | |
| repository | https://github.com/drasi-project/drasi-core |
| max_upload_size | |
| id | 2044267 |
| size | 137,907 |
The Application Source is a programmatic data injection plugin for Drasi that enables direct, in-process delivery of graph data changes from Rust applications. Unlike network-based sources (HTTP, gRPC) or database-connected sources (PostgreSQL), the Application Source provides a native Rust API through a clonable handle pattern, allowing any part of your application to send graph events directly into Drasi's continuous query processing pipeline.
Ideal for:
Not suitable for:
The Application Source is typically created programmatically using the constructor pattern:
use drasi_source_application::{ApplicationSource, ApplicationSourceConfig};
use std::collections::HashMap;
// Create minimal configuration
let config = ApplicationSourceConfig {
properties: HashMap::new(),
};
// Create source and handle
let (source, handle) = ApplicationSource::new("my-source", config)?;
// Configure bootstrap provider (optional)
source.set_bootstrap_provider(Box::new(my_bootstrap_provider)).await;
pub struct ApplicationSourceConfig {
/// Application-specific properties (flexible key-value map)
pub properties: HashMap<String, serde_json::Value>,
}
| Name | Description | Data Type | Valid Values | Default |
|---|---|---|---|---|
properties |
Custom application-specific properties passed through to Source::properties() |
HashMap<String, serde_json::Value> |
Any JSON-serializable key-value pairs | {} (empty map) |
auto_start |
Whether to start automatically when added to DrasiLib | bool |
true, false |
true |
Note: The Application Source has minimal configuration requirements since it operates entirely in-process. The properties field is primarily for metadata and custom application logic.
Auto-Start Behavior: When auto_start=true (default), the source starts immediately if added to a running DrasiLib instance. If added before drasi.start() is called, it starts when the DrasiLib starts. When auto_start=false, the source must be started manually via drasi.start_source("source-id").
The Application Source accepts graph data changes through the ApplicationSourceHandle API. Events follow Drasi's graph data model:
Node Structure:
Element::Node {
metadata: ElementMetadata {
reference: ElementReference {
source_id: Arc<str>, // Automatically set from handle
element_id: Arc<str>, // Provided by application
},
labels: Arc<[Arc<str>]>, // Node labels (e.g., ["Person", "Employee"])
effective_from: u64, // Timestamp (nanoseconds since epoch, auto-generated)
},
properties: ElementPropertyMap, // Key-value property map
}
Relationship Structure:
Element::Relation {
metadata: ElementMetadata {
reference: ElementReference {
source_id: Arc<str>,
element_id: Arc<str>,
},
labels: Arc<[Arc<str>]>, // Relation types (e.g., ["KNOWS", "WORKS_WITH"])
effective_from: u64,
},
in_node: ElementReference, // Target/end node
out_node: ElementReference, // Source/start node
properties: ElementPropertyMap,
}
Properties are built using PropertyMapBuilder and support the following types:
| Type | Rust Type | Builder Method | Example |
|---|---|---|---|
| String | Arc<str> |
.with_string(key, value) |
.with_string("name", "Alice") |
| Integer | i64 |
.with_integer(key, value) |
.with_integer("age", 30) |
| Float | f64 |
.with_float(key, value) |
.with_float("score", 95.5) |
| Boolean | bool |
.with_bool(key, value) |
.with_bool("active", true) |
| Null | - | .with_null(key) |
.with_null("optional_field") |
The Application Source processes three types of graph changes:
Insert: Add a new node or relationship
SourceChange::Insert { element: Element }
Update: Modify an existing node or relationship (full replacement)
SourceChange::Update { element: Element }
Delete: Remove a node or relationship
SourceChange::Delete { metadata: ElementMetadata }
use drasi_source_application::{
ApplicationSource, ApplicationSourceConfig, ApplicationSourceHandle, PropertyMapBuilder
};
use std::collections::HashMap;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Create the source
let config = ApplicationSourceConfig {
properties: HashMap::new(),
};
let (source, handle) = ApplicationSource::new("app-source", config)?;
// Start the source (required before sending events)
source.start().await?;
// Use the handle to send events...
Ok(())
}
// Insert a person node
let person_props = PropertyMapBuilder::new()
.with_string("name", "Alice Johnson")
.with_integer("age", 30)
.with_string("email", "alice@example.com")
.with_bool("active", true)
.build();
handle.send_node_insert(
"person-1", // Element ID
vec!["Person", "Employee"], // Labels
person_props // Properties
).await?;
// Insert a company node
let company_props = PropertyMapBuilder::new()
.with_string("name", "Acme Corp")
.with_string("industry", "Technology")
.with_integer("founded", 2010)
.build();
handle.send_node_insert("company-1", vec!["Company"], company_props).await?;
// Create an employment relationship
let employment_props = PropertyMapBuilder::new()
.with_string("role", "Software Engineer")
.with_string("start_date", "2020-01-15")
.with_integer("salary", 120000)
.build();
handle.send_relation_insert(
"employment-1", // Relation ID
vec!["WORKS_FOR"], // Relation type
employment_props, // Properties
"person-1", // Start node (source)
"company-1" // End node (target)
).await?;
// Create a social relationship
let knows_props = PropertyMapBuilder::new()
.with_string("since", "2018")
.with_integer("closeness", 8)
.build();
handle.send_relation_insert(
"knows-1",
vec!["KNOWS"],
knows_props,
"person-1",
"person-2"
).await?;
// Update existing node (full replacement)
let updated_props = PropertyMapBuilder::new()
.with_string("name", "Alice Johnson-Smith") // Changed name
.with_integer("age", 31) // Birthday!
.with_string("email", "alice@example.com")
.with_bool("active", true)
.with_string("department", "Engineering") // New property
.build();
handle.send_node_update(
"person-1",
vec!["Person", "Employee", "Manager"], // Can update labels
updated_props
).await?;
// Delete a node
handle.send_delete("person-1", vec!["Person", "Employee"]).await?;
// Delete a relationship
handle.send_delete("knows-1", vec!["KNOWS"]).await?;
use drasi_core::models::{Element, ElementMetadata, ElementReference, SourceChange};
use std::sync::Arc;
let mut changes = Vec::new();
// Build multiple changes
for i in 1..=5 {
let props = PropertyMapBuilder::new()
.with_string("name", format!("Person {}", i))
.with_integer("id", i)
.build();
let element = Element::Node {
metadata: ElementMetadata {
reference: ElementReference {
source_id: Arc::from(handle.source_id()),
element_id: Arc::from(format!("person-{}", i).as_str()),
},
labels: Arc::from(vec![Arc::from("Person")]),
effective_from: chrono::Utc::now().timestamp_nanos_opt().unwrap() as u64,
},
properties: props,
};
changes.push(SourceChange::Insert { element });
}
// Send all changes in sequence
handle.send_batch(changes).await?;
use tokio::task;
// Clone handle for concurrent access
let handle1 = handle.clone();
let handle2 = handle.clone();
let task1 = task::spawn(async move {
let props = PropertyMapBuilder::new()
.with_string("name", "Thread 1 Node")
.build();
handle1.send_node_insert("node-1", vec!["Test"], props).await
});
let task2 = task::spawn(async move {
let props = PropertyMapBuilder::new()
.with_string("name", "Thread 2 Node")
.build();
handle2.send_node_insert("node-2", vec!["Test"], props).await
});
// Wait for both tasks
task1.await??;
task2.await??;
use drasi_lib::DrasiLib;
use std::sync::Arc;
// Create Drasi instance
let drasi = DrasiLib::new("my-app").await?;
// Create application source
let config = ApplicationSourceConfig { properties: HashMap::new() };
let (source, handle) = ApplicationSource::new("events", config)?;
// Add source to Drasi
drasi.add_source(Arc::new(source)).await?;
// Define a query that uses the source
let query = drasi.create_query("high-temp-sensors")
.cypher("MATCH (s:Sensor) WHERE s.temperature > 75 RETURN s")
.from_source("events")
.build()
.await?;
drasi.add_query(query).await?;
// Now send events via handle
let props = PropertyMapBuilder::new()
.with_string("id", "sensor-1")
.with_integer("temperature", 80)
.build();
handle.send_node_insert("sensor-1", vec!["Sensor"], props).await?;
The Application Source supports pluggable bootstrap providers via the BootstrapProvider trait. Any bootstrap provider implementation can be used with this source.
use drasi_source_application::{ApplicationSource, ApplicationSourceConfig};
use drasi_bootstrap_application::ApplicationBootstrapProvider;
// Create source
let config = ApplicationSourceConfig { properties: HashMap::new() };
let (source, handle) = ApplicationSource::new("my-source", config)?;
// Configure bootstrap provider
let bootstrap_provider = ApplicationBootstrapProvider::new();
source.set_bootstrap_provider(Box::new(bootstrap_provider)).await;
ApplicationBootstrapProvider (drasi-bootstrap-application) - Replays stored insert events from shared stateScriptFileBootstrapProvider (drasi-bootstrap-scriptfile) - Loads initial data from JSONL filesNoopBootstrapProvider (drasi-bootstrap-noop) - Skips bootstrap entirelyBootstrapProvider traitWhen a query subscribes with enable_bootstrap: true:
If no bootstrap provider is configured, the query is informed that bootstrap is not available.
The Application Source uses Tokio's bounded mpsc channel for event passing:
// Internal channel (default capacity: 1000 events)
let (tx, rx) = mpsc::channel(1000);
Backpressure: If 1000 events are queued without processing, subsequent .send() calls will await until capacity is available.
ApplicationSourceHandle implements Clone and is safe to share across threads:
let handle2 = handle.clone();
let handle3 = handle.clone();
// All handles send to the same source
tokio::spawn(async move { handle2.send_node_insert(...).await });
tokio::spawn(async move { handle3.send_node_insert(...).await });
All send methods are async and must be awaited:
// Correct
handle.send_node_insert("node-1", vec!["Person"], props).await?;
// Incorrect (won't compile)
handle.send_node_insert("node-1", vec!["Person"], props)?;
When ApplicationSource::stop() is called:
"Failed to send event: channel closed"Best Practice: Handle channel closure errors gracefully in long-running applications.
// Channel closed (source stopped)
let result = handle.send_node_insert("node-1", vec!["Test"], props).await;
match result {
Ok(_) => println!("Event sent successfully"),
Err(e) if e.to_string().contains("channel closed") => {
eprintln!("Source is not running or has been stopped");
}
Err(e) => eprintln!("Unexpected error: {}", e),
}
The source automatically generates timestamps using chrono::Utc::now(). In rare cases where timestamp generation fails, it falls back to millisecond precision:
// Automatic fallback (handled internally)
let effective_from = crate::time::get_current_timestamp_nanos()
.unwrap_or_else(|e| {
log::warn!("Failed to get timestamp: {}, using fallback", e);
(chrono::Utc::now().timestamp_millis() as u64) * 1_000_000
});
use drasi_source_application::{ApplicationSource, ApplicationSourceConfig, PropertyMapBuilder};
use std::collections::HashMap;
#[tokio::test]
async fn test_basic_insert() {
let config = ApplicationSourceConfig { properties: HashMap::new() };
let (source, handle) = ApplicationSource::new("test-source", config)
.expect("Failed to create source");
// Start the source
source.start().await.unwrap();
// Send an event
let props = PropertyMapBuilder::new()
.with_string("name", "Test User")
.build();
let result = handle.send_node_insert("user-1", vec!["User"], props).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_insert_update_delete_sequence() {
let config = ApplicationSourceConfig { properties: HashMap::new() };
let (source, handle) = ApplicationSource::new("test-source", config).unwrap();
source.start().await.unwrap();
// Insert
let props1 = PropertyMapBuilder::new()
.with_string("status", "pending")
.build();
handle.send_node_insert("task-1", vec!["Task"], props1).await.unwrap();
// Update
let props2 = PropertyMapBuilder::new()
.with_string("status", "completed")
.build();
handle.send_node_update("task-1", vec!["Task"], props2).await.unwrap();
// Delete
handle.send_delete("task-1", vec!["Task"]).await.unwrap();
}
#[tokio::test]
async fn test_send_after_stop() {
let config = ApplicationSourceConfig { properties: HashMap::new() };
let (source, handle) = ApplicationSource::new("test-source", config).unwrap();
source.start().await.unwrap();
// Stop the source
source.stop().await.unwrap();
// Attempt to send should fail
let props = PropertyMapBuilder::new().build();
let result = handle.send_node_insert("node-1", vec!["Test"], props).await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("channel closed"));
}
Default capacity: 1000 events. For high-throughput scenarios:
// Currently requires source code modification
// Future enhancement: configurable channel size
let (app_tx, app_rx) = mpsc::channel(10000); // Larger buffer
Individual sends (typical):
for event in events {
handle.send_node_insert(...).await?;
}
Batch sends (better for large volumes):
let changes: Vec<SourceChange> = events.iter().map(|e| { /* ... */ }).collect();
handle.send_batch(changes).await?;
Memory usage depends on the configured bootstrap provider:
ApplicationBootstrapProvider stores events in memoryScriptFileBootstrapProvider reads from disk on demand| Feature | Application | HTTP | gRPC | PostgreSQL |
|---|---|---|---|---|
| Access | Rust API | REST | Protocol Buffers | Database CDC |
| Performance | Highest (in-process) | Moderate | Moderate | Database-dependent |
| Language Support | Rust only | Any | Any | Any (via database) |
| Network | No | Yes | Yes | Yes |
| Bootstrap | Pluggable provider | Pluggable provider | Pluggable provider | Pluggable provider |
| Use Case | Embedded, testing | Cross-language | Microservices | DB integration |
| Setup Complexity | Minimal | Moderate | Moderate | High |
The source automatically adds profiling metadata to events for performance analysis:
let mut profiling = drasi_lib::profiling::ProfilingMetadata::new();
profiling.source_send_ns = Some(drasi_lib::profiling::timestamp_ns());
This metadata flows through the query pipeline and enables end-to-end latency tracking.
The source emits lifecycle events via the component event channel:
Monitor these events for observability in production systems.
After source creation, get additional handles:
let (source, handle1) = ApplicationSource::new("my-source", config)?;
let handle2 = source.get_handle();
let handle3 = source.get_handle();
All handles (including the original) connect to the same source instance.
| Method | Description |
|---|---|
new(id, config) |
Create source and handle |
get_handle() |
Get an additional handle |
set_bootstrap_provider(provider) |
Configure bootstrap provider |
start() |
Start event processing |
stop() |
Stop event processing |
| Method | Description |
|---|---|
send(change) |
Send raw SourceChange event |
send_node_insert(id, labels, props) |
Insert a node |
send_node_update(id, labels, props) |
Update a node |
send_delete(id, labels) |
Delete a node or relation |
send_relation_insert(id, labels, props, start, end) |
Insert a relationship |
send_batch(changes) |
Send multiple changes |
source_id() |
Get source identifier |
| Method | Description |
|---|---|
new() |
Create new builder |
with_string(key, value) |
Add string property |
with_integer(key, value) |
Add integer property (i64) |
with_float(key, value) |
Add float property (f64) |
with_bool(key, value) |
Add boolean property |
with_null(key) |
Add null property |
build() |
Build final property map |
/Users/allenjones/dev/agentofreality/drasi/drasi-server/drasi-core/components/sources/application/src/lib.rs/Users/allenjones/dev/agentofreality/drasi/drasi-server/drasi-core/components/sources/application/src/tests.rsdrasi-core/models/ for graph data type definitionsdrasi-lib/bootstrap/ for bootstrap provider architecture