drasi-lib

Crates.iodrasi-lib
lib.rsdrasi-lib
version0.3.2
created_at2026-01-14 22:20:07.03899+00
updated_at2026-01-23 06:13:44.244115+00
descriptionDrasi Lib
homepage
repositoryhttps://github.com/drasi-project/drasi-core
max_upload_size
id2044004
size789,282
maintainers-core (github:drasi-project:maintainers-core)

documentation

README

DrasiLib

License Rust

DrasiLib is a Rust library for building change-driven solutions that detect and react to data changes with precision.

Overview

DrasiLib enables real-time change detection using continuous queries that maintain live result sets. Unlike traditional event-driven systems, you declare what changes matter using Cypher queries, and DrasiLib handles the complexity of tracking state and detecting meaningful transitions.

Sources → Continuous Queries → Reactions
   ↓              ↓                 ↓
Data In    Change Detection    Actions Out

Key Benefits:

  • Declarative: Define change detection with Cypher queries, not custom code
  • Precise: Get before/after states for every change, not just raw events
  • Pluggable: Use existing source/reaction plugins or build your own
  • Scalable: Built-in backpressure, priority queues, and dispatch modes

Getting Started

Installation

[dependencies]
drasi-lib = { path = "path/to/drasi-lib" }
tokio = { version = "1", features = ["full"] }

Initialization Methods

DrasiLib can be initialized in two ways:

  1. Builder Pattern (Recommended) - Fluent API for programmatic configuration
  2. Config Struct - Direct configuration for YAML/JSON loading scenarios

Method 1: Builder Pattern (Recommended)

The builder provides a fluent interface for configuring sources, queries, and reactions.

Basic Example

use drasi_lib::{DrasiLib, Query};
use drasi_source_mock::{MockSource, MockSourceConfig};
use drasi_reaction_log::{LogReaction, LogReactionConfig};

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    // 1. Create source plugin instance
    let source = MockSource::new("sensors", MockSourceConfig {
        data_type: "sensor".to_string(),
        interval_ms: 1000,
    })?;

    // 2. Create reaction plugin instance
    let reaction = LogReaction::new(
        "alerts",
        vec!["high-temp".to_string()],
        LogReactionConfig::default(),
    );

    // 3. Build DrasiLib using the builder
    let core = DrasiLib::builder()
        .with_id("my-app")
        .with_source(source)      // Ownership transferred
        .with_reaction(reaction)  // Ownership transferred
        .with_query(
            Query::cypher("high-temp")
                .query("MATCH (s:Sensor) WHERE s.temperature > 75 RETURN s")
                .from_source("sensors")
                .build()
        )
        .build()
        .await?;

    // 4. Start processing
    core.start().await?;

    // Run until shutdown
    tokio::signal::ctrl_c().await?;
    core.stop().await?;

    Ok(())
}

DrasiLibBuilder API Reference

Create a builder with DrasiLib::builder() and configure using the fluent API.

with_id(id: impl Into<String>)

Set a unique identifier for this DrasiLib instance. Used for logging and debugging.

DrasiLib::builder()
    .with_id("my-application")

with_source(source: impl Source + 'static)

Add a source plugin instance. Ownership is transferred to DrasiLib.

let source = MockSource::new("my-source", config)?;
DrasiLib::builder()
    .with_source(source)  // source is moved here

You can add multiple sources by chaining:

DrasiLib::builder()
    .with_source(source1)
    .with_source(source2)
    .with_source(source3)

with_reaction(reaction: impl Reaction + 'static)

Add a reaction plugin instance. Ownership is transferred to DrasiLib.

let reaction = LogReaction::new("my-reaction", vec!["query1".into()], config);
DrasiLib::builder()
    .with_reaction(reaction)  // reaction is moved here

with_query(config: QueryConfig)

Add a query configuration. Use the Query builder to create configurations:

DrasiLib::builder()
    .with_query(
        Query::cypher("my-query")
            .query("MATCH (n:Person) RETURN n")
            .from_source("my-source")
            .build()
    )

with_priority_queue_capacity(capacity: usize)

Set the default priority queue capacity for all components. Controls how many events can be buffered before backpressure is applied.

DrasiLib::builder()
    .with_priority_queue_capacity(50000)  // Default: 10000

with_dispatch_buffer_capacity(capacity: usize)

Set the default dispatch buffer capacity for event routing channels.

DrasiLib::builder()
    .with_dispatch_buffer_capacity(5000)  // Default: 1000

add_storage_backend(config: StorageBackendConfig)

Add a storage backend for persistent query state (RocksDB, Redis/Garnet).

use drasi_lib::StorageBackendConfig;

DrasiLib::builder()
    .add_storage_backend(StorageBackendConfig {
        id: "rocksdb-backend".to_string(),
        spec: StorageBackendSpec::RocksDb { path: "./data".to_string() },
    })

build() -> Result<DrasiLib>

Build the DrasiLib instance. This validates configuration, creates all components, and initializes the system.

let core = DrasiLib::builder()
    .with_id("my-app")
    .with_source(source)
    .with_query(query)
    .build()
    .await?;

// Now start processing
core.start().await?;

Method 2: Config Struct Initialization

For scenarios where you need to load configuration from YAML/JSON files or construct configuration programmatically without the builder.

DrasiLibConfig Structure

use drasi_lib::{DrasiLibConfig, QueryConfig, RuntimeConfig};
use std::sync::Arc;

// Create configuration directly
let config = DrasiLibConfig {
    id: "my-server".to_string(),
    priority_queue_capacity: Some(50000),
    dispatch_buffer_capacity: Some(5000),
    storage_backends: vec![],
    queries: vec![
        QueryConfig {
            id: "my-query".to_string(),
            query: "MATCH (n:Person) RETURN n".to_string(),
            query_language: QueryLanguage::Cypher,
            sources: vec![
                SourceSubscriptionConfig {
                    source_id: "my-source".to_string(),
                    pipeline: vec![],
                }
            ],
            middleware: vec![],
            auto_start: true,
            joins: None,
            enable_bootstrap: true,
            bootstrap_buffer_size: 10000,
            priority_queue_capacity: None,  // Inherits from global
            dispatch_buffer_capacity: None, // Inherits from global
            dispatch_mode: None,
            storage_backend: None,
        },
    ],
};

// Validate the configuration
config.validate()?;

YAML Configuration Format

# DrasiLibConfig structure
id: my-server
priority_queue_capacity: 50000      # Optional, default: 10000
dispatch_buffer_capacity: 5000      # Optional, default: 1000

storage_backends:                    # Optional
  - id: rocksdb-backend
    spec:
      type: rocksdb
      path: ./data

queries:
  - id: high-temp-alerts
    query: |
      MATCH (s:Sensor)
      WHERE s.temperature > 75
      RETURN s.id, s.temperature, s.location
    queryLanguage: Cypher           # Optional, default: Cypher
    sources:
      - source_id: sensors
        pipeline: []                # Optional middleware pipeline
    auto_start: true                # Optional, default: true
    enableBootstrap: true           # Optional, default: true
    bootstrapBufferSize: 10000      # Optional, default: 10000

  - id: complex-join-query
    query: |
      MATCH (o:Order)-[:PLACED_BY]->(c:Customer)
      WHERE o.status = 'pending'
      RETURN o.id, c.email, o.total
    sources:
      - source_id: orders
      - source_id: customers
    joins:                          # Synthetic joins for cross-source queries
      - id: PLACED_BY
        keys:
          - label: Order
            property: customer_id
          - label: Customer
            property: id

Loading from YAML and Initializing

use drasi_lib::{DrasiLibConfig, RuntimeConfig};
use std::sync::Arc;

// Load YAML configuration
let yaml_str = std::fs::read_to_string("config.yaml")?;
let config: DrasiLibConfig = serde_yaml::from_str(&yaml_str)?;

// Validate configuration
config.validate()?;

// Convert to RuntimeConfig (applies defaults to queries)
let runtime_config = Arc::new(RuntimeConfig::from(config));

// Note: DrasiLib::new() is internal. For config-based initialization,
// use the builder pattern and add sources/reactions programmatically:

let mut core = DrasiLib::builder()
    .with_id(&runtime_config.id);

// Add queries from config
for query_config in &runtime_config.queries {
    core = core.with_query(query_config.clone());
}

// Add sources and reactions as plugin instances
// (Sources/reactions must be created from plugin types - they cannot be in YAML)
let source = create_source_from_external_config()?;
let reaction = create_reaction_from_external_config()?;

let core = core
    .with_source(source)
    .with_reaction(reaction)
    .build()
    .await?;

core.start().await?;

Configuration Fields Reference

Field Type Default Description
id String UUID Unique identifier for this instance
priority_queue_capacity Option<usize> 10000 Default event queue capacity
dispatch_buffer_capacity Option<usize> 1000 Default channel buffer capacity
storage_backends Vec<StorageBackendConfig> [] Storage backend definitions
queries Vec<QueryConfig> [] Query configurations

QueryConfig Fields Reference

Field Type Default Description
id String Required Unique query identifier
query String Required Cypher or GQL query string
query_language QueryLanguage Cypher Query language (Cypher or GQL)
sources Vec<SourceSubscriptionConfig> [] Source subscriptions
middleware Vec<SourceMiddlewareConfig> [] Middleware configurations
auto_start bool true Start automatically with DrasiLib
joins Option<Vec<QueryJoinConfig>> None Synthetic join definitions
enable_bootstrap bool true Load initial data from sources
bootstrap_buffer_size usize 10000 Bootstrap buffer size
priority_queue_capacity Option<usize> Inherited Override queue capacity
dispatch_buffer_capacity Option<usize> Inherited Override buffer capacity
dispatch_mode Option<DispatchMode> Channel Event dispatch mode
storage_backend Option<StorageBackendRef> Default Storage backend reference

Query Builder

The Query builder creates query configurations with a fluent API.

Query::cypher(id: impl Into<String>)

Create a new Cypher query builder:

Query::cypher("my-query")
    .query("MATCH (n:Person) WHERE n.age > 21 RETURN n")
    .from_source("people-source")
    .build()

Query::gql(id: impl Into<String>)

Create a new GQL (GraphQL) query builder:

Query::gql("my-gql-query")
    .query("MATCH (n:Person) RETURN n.name")
    .from_source("people-source")
    .build()

Query Builder Methods

Method Description Default
.query(str) Set the query string Required
.from_source(id) Subscribe to a source Required (at least one)
.from_source_with_pipeline(id, pipeline) Subscribe with middleware pipeline -
.with_middleware(config) Add middleware transformation []
.auto_start(bool) Start automatically with DrasiLib true
.enable_bootstrap(bool) Load initial data from sources true
.with_bootstrap_buffer_size(size) Bootstrap buffer size 10000
.with_joins(joins) Configure synthetic joins None
.with_priority_queue_capacity(cap) Override queue capacity Inherited
.with_dispatch_buffer_capacity(cap) Override buffer capacity Inherited
.with_dispatch_mode(mode) Set dispatch mode Channel
.with_storage_backend(ref) Use specific storage backend Default

Complete Query Example

use drasi_lib::{Query, DispatchMode};

Query::cypher("complex-query")
    .query(r#"
        MATCH (o:Order)-[:PLACED_BY]->(c:Customer)
        WHERE o.status = 'pending' AND o.total > 1000
        RETURN o.id, c.email, o.total
    "#)
    .from_source("orders")
    .from_source("customers")
    .auto_start(true)
    .enable_bootstrap(true)
    .with_priority_queue_capacity(20000)
    .with_dispatch_mode(DispatchMode::Channel)
    .build()

Runtime Management

After building, use these methods to manage the DrasiLib instance:

// Lifecycle
core.start().await?;           // Start all auto-start components
core.stop().await?;            // Stop all components

// Component control
core.start_source("my-source").await?;
core.stop_source("my-source").await?;
core.start_query("my-query").await?;
core.stop_query("my-query").await?;
core.start_reaction("my-reaction").await?;
core.stop_reaction("my-reaction").await?;

// Add components at runtime
core.add_source(new_source).await?;
core.add_reaction(new_reaction).await?;
core.add_query(query_config).await?;

// Remove components
core.remove_source("my-source").await?;
core.remove_query("my-query").await?;
core.remove_reaction("my-reaction").await?;

// Inspection
let sources = core.list_sources().await?;
let queries = core.list_queries().await?;
let reactions = core.list_reactions().await?;
let status = core.get_source_status("my-source").await?;
let results = core.get_query_results("my-query").await?;

// Check running state
let is_running = core.is_running().await;

// Get current configuration
let config = core.get_current_config().await?;

Core Concepts

Sources

Sources ingest data from external systems and emit graph elements (nodes and relationships). DrasiLib provides a trait-based plugin architecture.

Important: Sources are plugins that must be instantiated externally and passed to DrasiLib. DrasiLib has no awareness of which source plugins exist.

Available Source Plugins: See components/sources/ for PostgreSQL, HTTP, gRPC, Mock, Platform, and Application sources.

Queries

Queries define what changes matter using Cypher. They track three types of results:

  • Adding - New rows matching the query
  • Updating - Existing rows with changed values
  • Removing - Rows that no longer match

Limitation: ORDER BY, TOP, and LIMIT are not supported in continuous queries.

Reactions

Reactions respond to query results by triggering actions (webhooks, logging, etc.).

Important: Like sources, reactions are plugins that must be instantiated externally.

Available Reaction Plugins: See components/reactions/ for HTTP, gRPC, SSE, Log, Platform, and Profiler reactions.

Bootstrap Providers

Bootstrap providers deliver initial data to queries before streaming begins. Any source can use any bootstrap provider, enabling flexible scenarios like "bootstrap from database, stream changes via HTTP."


Dispatch Modes

DrasiLib supports two dispatch modes for event routing:

Mode Backpressure Message Loss Best For
Channel (default) Yes None Different subscriber speeds, critical data
Broadcast No Possible High fanout (10+ subscribers), uniform speeds

Configure per-query:

use drasi_lib::DispatchMode;

Query::cypher("my-query")
    .with_dispatch_mode(DispatchMode::Channel)  // Default
    // or
    .with_dispatch_mode(DispatchMode::Broadcast)
    .build()

Developer Guides

For detailed information on building plugins:


State Store Providers

State store providers allow plugins (Sources, BootstrapProviders, and Reactions) to persist runtime state that survives restarts of DrasiLib.

Default (Memory) Provider

By default, DrasiLib uses an in-memory state store that does not persist across restarts:

let core = DrasiLib::builder()
    .with_id("my-app")
    .build()  // Uses MemoryStateStoreProvider by default
    .await?;

Redb Provider

For ACID-compliant persistent storage, use the redb state store provider:

use drasi_state_store_redb::RedbStateStoreProvider;
use std::sync::Arc;

let state_store = RedbStateStoreProvider::new("/data/state.redb")?;
let core = DrasiLib::builder()
    .with_id("my-app")
    .with_state_store_provider(Arc::new(state_store))
    .build()
    .await?;

Using State Store in Plugins

When a Source or Reaction is added to DrasiLib, a runtime context is provided via the initialize() method. The context contains all DrasiLib-provided services, including the state store (if configured):

use drasi_lib::{Source, SourceRuntimeContext};

#[async_trait]
impl Source for MySource {
    async fn initialize(&self, context: SourceRuntimeContext) {
        // Store the context for later use
        self.base.initialize(context).await;

        // Access state store from context (may be None if not configured)
        if let Some(state_store) = self.base.state_store().await {
            // Use state_store for persistent state
        }
    }
}

For reactions, use ReactionRuntimeContext which also provides access to QueryProvider:

use drasi_lib::{Reaction, ReactionRuntimeContext};

#[async_trait]
impl Reaction for MyReaction {
    async fn initialize(&self, context: ReactionRuntimeContext) {
        self.base.initialize(context).await;

        // Access query provider for subscribing to query results
        let query_provider = self.base.query_provider().await;
    }
}

See the State Store Provider Guide for full documentation.


Troubleshooting

"Source/Reaction not found"

Ensure you've added the instance to DrasiLib before referencing it in queries:

let source = MySource::new("my-source", config)?;
let core = DrasiLib::builder()
    .with_source(source)  // Must add source before queries reference it
    .with_query(
        Query::cypher("my-query")
            .from_source("my-source")  // References the source above
            .build()
    )
    .build()
    .await?;

Events not being received

  • Broadcast mode: May lose events if receivers are slow
  • Channel mode: Provides backpressure but may block senders
  • Check dispatch_buffer_capacity and priority_queue_capacity settings

Debug Logging

std::env::set_var("RUST_LOG", "drasi_lib=debug");
env_logger::init();

License

Apache License 2.0

Related Projects

Commit count: 60

cargo fmt