| Crates.io | drasi-lib |
| lib.rs | drasi-lib |
| version | 0.3.2 |
| created_at | 2026-01-14 22:20:07.03899+00 |
| updated_at | 2026-01-23 06:13:44.244115+00 |
| description | Drasi Lib |
| homepage | |
| repository | https://github.com/drasi-project/drasi-core |
| max_upload_size | |
| id | 2044004 |
| size | 789,282 |
DrasiLib is a Rust library for building change-driven solutions that detect and react to data changes with precision.
DrasiLib enables real-time change detection using continuous queries that maintain live result sets. Unlike traditional event-driven systems, you declare what changes matter using Cypher queries, and DrasiLib handles the complexity of tracking state and detecting meaningful transitions.
Sources → Continuous Queries → Reactions
↓ ↓ ↓
Data In Change Detection Actions Out
Key Benefits:
[dependencies]
drasi-lib = { path = "path/to/drasi-lib" }
tokio = { version = "1", features = ["full"] }
DrasiLib can be initialized in two ways:
The builder provides a fluent interface for configuring sources, queries, and reactions.
use drasi_lib::{DrasiLib, Query};
use drasi_source_mock::{MockSource, MockSourceConfig};
use drasi_reaction_log::{LogReaction, LogReactionConfig};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// 1. Create source plugin instance
let source = MockSource::new("sensors", MockSourceConfig {
data_type: "sensor".to_string(),
interval_ms: 1000,
})?;
// 2. Create reaction plugin instance
let reaction = LogReaction::new(
"alerts",
vec!["high-temp".to_string()],
LogReactionConfig::default(),
);
// 3. Build DrasiLib using the builder
let core = DrasiLib::builder()
.with_id("my-app")
.with_source(source) // Ownership transferred
.with_reaction(reaction) // Ownership transferred
.with_query(
Query::cypher("high-temp")
.query("MATCH (s:Sensor) WHERE s.temperature > 75 RETURN s")
.from_source("sensors")
.build()
)
.build()
.await?;
// 4. Start processing
core.start().await?;
// Run until shutdown
tokio::signal::ctrl_c().await?;
core.stop().await?;
Ok(())
}
Create a builder with DrasiLib::builder() and configure using the fluent API.
with_id(id: impl Into<String>)Set a unique identifier for this DrasiLib instance. Used for logging and debugging.
DrasiLib::builder()
.with_id("my-application")
with_source(source: impl Source + 'static)Add a source plugin instance. Ownership is transferred to DrasiLib.
let source = MockSource::new("my-source", config)?;
DrasiLib::builder()
.with_source(source) // source is moved here
You can add multiple sources by chaining:
DrasiLib::builder()
.with_source(source1)
.with_source(source2)
.with_source(source3)
with_reaction(reaction: impl Reaction + 'static)Add a reaction plugin instance. Ownership is transferred to DrasiLib.
let reaction = LogReaction::new("my-reaction", vec!["query1".into()], config);
DrasiLib::builder()
.with_reaction(reaction) // reaction is moved here
with_query(config: QueryConfig)Add a query configuration. Use the Query builder to create configurations:
DrasiLib::builder()
.with_query(
Query::cypher("my-query")
.query("MATCH (n:Person) RETURN n")
.from_source("my-source")
.build()
)
with_priority_queue_capacity(capacity: usize)Set the default priority queue capacity for all components. Controls how many events can be buffered before backpressure is applied.
DrasiLib::builder()
.with_priority_queue_capacity(50000) // Default: 10000
with_dispatch_buffer_capacity(capacity: usize)Set the default dispatch buffer capacity for event routing channels.
DrasiLib::builder()
.with_dispatch_buffer_capacity(5000) // Default: 1000
add_storage_backend(config: StorageBackendConfig)Add a storage backend for persistent query state (RocksDB, Redis/Garnet).
use drasi_lib::StorageBackendConfig;
DrasiLib::builder()
.add_storage_backend(StorageBackendConfig {
id: "rocksdb-backend".to_string(),
spec: StorageBackendSpec::RocksDb { path: "./data".to_string() },
})
build() -> Result<DrasiLib>Build the DrasiLib instance. This validates configuration, creates all components, and initializes the system.
let core = DrasiLib::builder()
.with_id("my-app")
.with_source(source)
.with_query(query)
.build()
.await?;
// Now start processing
core.start().await?;
For scenarios where you need to load configuration from YAML/JSON files or construct configuration programmatically without the builder.
use drasi_lib::{DrasiLibConfig, QueryConfig, RuntimeConfig};
use std::sync::Arc;
// Create configuration directly
let config = DrasiLibConfig {
id: "my-server".to_string(),
priority_queue_capacity: Some(50000),
dispatch_buffer_capacity: Some(5000),
storage_backends: vec![],
queries: vec![
QueryConfig {
id: "my-query".to_string(),
query: "MATCH (n:Person) RETURN n".to_string(),
query_language: QueryLanguage::Cypher,
sources: vec![
SourceSubscriptionConfig {
source_id: "my-source".to_string(),
pipeline: vec![],
}
],
middleware: vec![],
auto_start: true,
joins: None,
enable_bootstrap: true,
bootstrap_buffer_size: 10000,
priority_queue_capacity: None, // Inherits from global
dispatch_buffer_capacity: None, // Inherits from global
dispatch_mode: None,
storage_backend: None,
},
],
};
// Validate the configuration
config.validate()?;
# DrasiLibConfig structure
id: my-server
priority_queue_capacity: 50000 # Optional, default: 10000
dispatch_buffer_capacity: 5000 # Optional, default: 1000
storage_backends: # Optional
- id: rocksdb-backend
spec:
type: rocksdb
path: ./data
queries:
- id: high-temp-alerts
query: |
MATCH (s:Sensor)
WHERE s.temperature > 75
RETURN s.id, s.temperature, s.location
queryLanguage: Cypher # Optional, default: Cypher
sources:
- source_id: sensors
pipeline: [] # Optional middleware pipeline
auto_start: true # Optional, default: true
enableBootstrap: true # Optional, default: true
bootstrapBufferSize: 10000 # Optional, default: 10000
- id: complex-join-query
query: |
MATCH (o:Order)-[:PLACED_BY]->(c:Customer)
WHERE o.status = 'pending'
RETURN o.id, c.email, o.total
sources:
- source_id: orders
- source_id: customers
joins: # Synthetic joins for cross-source queries
- id: PLACED_BY
keys:
- label: Order
property: customer_id
- label: Customer
property: id
use drasi_lib::{DrasiLibConfig, RuntimeConfig};
use std::sync::Arc;
// Load YAML configuration
let yaml_str = std::fs::read_to_string("config.yaml")?;
let config: DrasiLibConfig = serde_yaml::from_str(&yaml_str)?;
// Validate configuration
config.validate()?;
// Convert to RuntimeConfig (applies defaults to queries)
let runtime_config = Arc::new(RuntimeConfig::from(config));
// Note: DrasiLib::new() is internal. For config-based initialization,
// use the builder pattern and add sources/reactions programmatically:
let mut core = DrasiLib::builder()
.with_id(&runtime_config.id);
// Add queries from config
for query_config in &runtime_config.queries {
core = core.with_query(query_config.clone());
}
// Add sources and reactions as plugin instances
// (Sources/reactions must be created from plugin types - they cannot be in YAML)
let source = create_source_from_external_config()?;
let reaction = create_reaction_from_external_config()?;
let core = core
.with_source(source)
.with_reaction(reaction)
.build()
.await?;
core.start().await?;
| Field | Type | Default | Description |
|---|---|---|---|
id |
String |
UUID | Unique identifier for this instance |
priority_queue_capacity |
Option<usize> |
10000 |
Default event queue capacity |
dispatch_buffer_capacity |
Option<usize> |
1000 |
Default channel buffer capacity |
storage_backends |
Vec<StorageBackendConfig> |
[] |
Storage backend definitions |
queries |
Vec<QueryConfig> |
[] |
Query configurations |
| Field | Type | Default | Description |
|---|---|---|---|
id |
String |
Required | Unique query identifier |
query |
String |
Required | Cypher or GQL query string |
query_language |
QueryLanguage |
Cypher |
Query language (Cypher or GQL) |
sources |
Vec<SourceSubscriptionConfig> |
[] |
Source subscriptions |
middleware |
Vec<SourceMiddlewareConfig> |
[] |
Middleware configurations |
auto_start |
bool |
true |
Start automatically with DrasiLib |
joins |
Option<Vec<QueryJoinConfig>> |
None |
Synthetic join definitions |
enable_bootstrap |
bool |
true |
Load initial data from sources |
bootstrap_buffer_size |
usize |
10000 |
Bootstrap buffer size |
priority_queue_capacity |
Option<usize> |
Inherited | Override queue capacity |
dispatch_buffer_capacity |
Option<usize> |
Inherited | Override buffer capacity |
dispatch_mode |
Option<DispatchMode> |
Channel |
Event dispatch mode |
storage_backend |
Option<StorageBackendRef> |
Default | Storage backend reference |
The Query builder creates query configurations with a fluent API.
Query::cypher(id: impl Into<String>)Create a new Cypher query builder:
Query::cypher("my-query")
.query("MATCH (n:Person) WHERE n.age > 21 RETURN n")
.from_source("people-source")
.build()
Query::gql(id: impl Into<String>)Create a new GQL (GraphQL) query builder:
Query::gql("my-gql-query")
.query("MATCH (n:Person) RETURN n.name")
.from_source("people-source")
.build()
| Method | Description | Default |
|---|---|---|
.query(str) |
Set the query string | Required |
.from_source(id) |
Subscribe to a source | Required (at least one) |
.from_source_with_pipeline(id, pipeline) |
Subscribe with middleware pipeline | - |
.with_middleware(config) |
Add middleware transformation | [] |
.auto_start(bool) |
Start automatically with DrasiLib | true |
.enable_bootstrap(bool) |
Load initial data from sources | true |
.with_bootstrap_buffer_size(size) |
Bootstrap buffer size | 10000 |
.with_joins(joins) |
Configure synthetic joins | None |
.with_priority_queue_capacity(cap) |
Override queue capacity | Inherited |
.with_dispatch_buffer_capacity(cap) |
Override buffer capacity | Inherited |
.with_dispatch_mode(mode) |
Set dispatch mode | Channel |
.with_storage_backend(ref) |
Use specific storage backend | Default |
use drasi_lib::{Query, DispatchMode};
Query::cypher("complex-query")
.query(r#"
MATCH (o:Order)-[:PLACED_BY]->(c:Customer)
WHERE o.status = 'pending' AND o.total > 1000
RETURN o.id, c.email, o.total
"#)
.from_source("orders")
.from_source("customers")
.auto_start(true)
.enable_bootstrap(true)
.with_priority_queue_capacity(20000)
.with_dispatch_mode(DispatchMode::Channel)
.build()
After building, use these methods to manage the DrasiLib instance:
// Lifecycle
core.start().await?; // Start all auto-start components
core.stop().await?; // Stop all components
// Component control
core.start_source("my-source").await?;
core.stop_source("my-source").await?;
core.start_query("my-query").await?;
core.stop_query("my-query").await?;
core.start_reaction("my-reaction").await?;
core.stop_reaction("my-reaction").await?;
// Add components at runtime
core.add_source(new_source).await?;
core.add_reaction(new_reaction).await?;
core.add_query(query_config).await?;
// Remove components
core.remove_source("my-source").await?;
core.remove_query("my-query").await?;
core.remove_reaction("my-reaction").await?;
// Inspection
let sources = core.list_sources().await?;
let queries = core.list_queries().await?;
let reactions = core.list_reactions().await?;
let status = core.get_source_status("my-source").await?;
let results = core.get_query_results("my-query").await?;
// Check running state
let is_running = core.is_running().await;
// Get current configuration
let config = core.get_current_config().await?;
Sources ingest data from external systems and emit graph elements (nodes and relationships). DrasiLib provides a trait-based plugin architecture.
Important: Sources are plugins that must be instantiated externally and passed to DrasiLib. DrasiLib has no awareness of which source plugins exist.
Available Source Plugins: See components/sources/ for PostgreSQL, HTTP, gRPC, Mock, Platform, and Application sources.
Queries define what changes matter using Cypher. They track three types of results:
Adding - New rows matching the queryUpdating - Existing rows with changed valuesRemoving - Rows that no longer matchLimitation: ORDER BY, TOP, and LIMIT are not supported in continuous queries.
Reactions respond to query results by triggering actions (webhooks, logging, etc.).
Important: Like sources, reactions are plugins that must be instantiated externally.
Available Reaction Plugins: See components/reactions/ for HTTP, gRPC, SSE, Log, Platform, and Profiler reactions.
Bootstrap providers deliver initial data to queries before streaming begins. Any source can use any bootstrap provider, enabling flexible scenarios like "bootstrap from database, stream changes via HTTP."
DrasiLib supports two dispatch modes for event routing:
| Mode | Backpressure | Message Loss | Best For |
|---|---|---|---|
| Channel (default) | Yes | None | Different subscriber speeds, critical data |
| Broadcast | No | Possible | High fanout (10+ subscribers), uniform speeds |
Configure per-query:
use drasi_lib::DispatchMode;
Query::cypher("my-query")
.with_dispatch_mode(DispatchMode::Channel) // Default
// or
.with_dispatch_mode(DispatchMode::Broadcast)
.build()
For detailed information on building plugins:
State store providers allow plugins (Sources, BootstrapProviders, and Reactions) to persist runtime state that survives restarts of DrasiLib.
By default, DrasiLib uses an in-memory state store that does not persist across restarts:
let core = DrasiLib::builder()
.with_id("my-app")
.build() // Uses MemoryStateStoreProvider by default
.await?;
For ACID-compliant persistent storage, use the redb state store provider:
use drasi_state_store_redb::RedbStateStoreProvider;
use std::sync::Arc;
let state_store = RedbStateStoreProvider::new("/data/state.redb")?;
let core = DrasiLib::builder()
.with_id("my-app")
.with_state_store_provider(Arc::new(state_store))
.build()
.await?;
When a Source or Reaction is added to DrasiLib, a runtime context is provided via the initialize() method. The context contains all DrasiLib-provided services, including the state store (if configured):
use drasi_lib::{Source, SourceRuntimeContext};
#[async_trait]
impl Source for MySource {
async fn initialize(&self, context: SourceRuntimeContext) {
// Store the context for later use
self.base.initialize(context).await;
// Access state store from context (may be None if not configured)
if let Some(state_store) = self.base.state_store().await {
// Use state_store for persistent state
}
}
}
For reactions, use ReactionRuntimeContext which also provides access to QueryProvider:
use drasi_lib::{Reaction, ReactionRuntimeContext};
#[async_trait]
impl Reaction for MyReaction {
async fn initialize(&self, context: ReactionRuntimeContext) {
self.base.initialize(context).await;
// Access query provider for subscribing to query results
let query_provider = self.base.query_provider().await;
}
}
See the State Store Provider Guide for full documentation.
Ensure you've added the instance to DrasiLib before referencing it in queries:
let source = MySource::new("my-source", config)?;
let core = DrasiLib::builder()
.with_source(source) // Must add source before queries reference it
.with_query(
Query::cypher("my-query")
.from_source("my-source") // References the source above
.build()
)
.build()
.await?;
dispatch_buffer_capacity and priority_queue_capacity settingsstd::env::set_var("RUST_LOG", "drasi_lib=debug");
env_logger::init();
Apache License 2.0