| Crates.io | drasi-source-grpc |
| lib.rs | drasi-source-grpc |
| version | 0.1.2 |
| created_at | 2026-01-14 23:27:54.492794+00 |
| updated_at | 2026-01-23 06:15:36.896918+00 |
| description | gRPC source plugin for Drasi |
| homepage | |
| repository | https://github.com/drasi-project/drasi-core |
| max_upload_size | |
| id | 2044166 |
| size | 143,724 |
The gRPC Source provides a high-performance gRPC service endpoint for submitting data change events to Drasi. It exposes a Protocol Buffer-based API that supports both single event submission and streaming for high-throughput scenarios.
The gRPC Source is a server-side component that implements the drasi.v1.SourceService gRPC service. External systems can push data changes (insert, update, delete operations) to Drasi using efficient binary serialization and bidirectional streaming via HTTP/2.
The builder pattern provides the most ergonomic and type-safe way to construct a gRPC source:
use drasi_source_grpc::GrpcSource;
use drasi_lib::channels::DispatchMode;
// Basic configuration
let source = GrpcSource::builder("my-grpc-source")
.with_host("0.0.0.0")
.with_port(50051)
.build()?;
// Advanced configuration with custom dispatch settings
let source = GrpcSource::builder("production-grpc")
.with_host("0.0.0.0")
.with_port(50051)
.with_timeout_ms(10000)
.with_dispatch_mode(DispatchMode::Channel)
.with_dispatch_buffer_capacity(2000)
.with_auto_start(true)
.build()?;
// With bootstrap provider
let source = GrpcSource::builder("bootstrapped-grpc")
.with_host("127.0.0.1")
.with_port(50052)
.with_bootstrap_provider(my_bootstrap_provider)
.build()?;
For programmatic configuration or deserialization from external sources:
use drasi_source_grpc::{GrpcSource, GrpcSourceConfig};
// Using config struct
let config = GrpcSourceConfig {
host: "0.0.0.0".to_string(),
port: 50051,
endpoint: None,
timeout_ms: 5000,
};
let source = GrpcSource::new("my-grpc-source", config)?;
// With custom dispatch settings
let source = GrpcSource::with_dispatch(
"my-grpc-source",
config,
Some(DispatchMode::Channel),
Some(1500)
)?;
use drasi_lib::DrasiLib;
use drasi_source_grpc::GrpcSource;
let drasi = DrasiLib::builder()
.with_id("my-drasi-instance")
.build()
.await?;
let source = GrpcSource::builder("events-grpc")
.with_host("0.0.0.0")
.with_port(50051)
.build()?;
drasi.add_source(source).await?;
drasi.start_source("events-grpc").await?;
| Name | Description | Data Type | Valid Values | Default |
|---|---|---|---|---|
id |
Unique identifier for the source instance | String |
Any non-empty string | (Required) |
host |
Host address to bind the gRPC server to | String |
Valid hostname or IP address (e.g., "0.0.0.0", "127.0.0.1") | "0.0.0.0" |
port |
Port number for the gRPC server | u16 |
1-65535 | 50051 |
endpoint |
Optional custom service endpoint path | Option<String> |
Any valid path string | None |
timeout_ms |
Request timeout in milliseconds | u64 |
Positive integer (milliseconds) | 5000 |
dispatch_mode |
Event dispatch strategy | Option<DispatchMode> |
Channel (isolated, backpressure) or Broadcast (shared, no backpressure) |
Channel |
dispatch_buffer_capacity |
Buffer size for dispatch channel | Option<usize> |
Positive integer | 1000 |
bootstrap_provider |
Provider for initial data snapshots | Option<Box<dyn BootstrapProvider>> |
Any type implementing BootstrapProvider |
None |
auto_start |
Whether to start automatically when added to DrasiLib | bool |
true, false |
true |
auto_start=true (default), the source starts immediately if added to a running DrasiLib instance. When auto_start=false, start manually with drasi.start_source("source-id")."0.0.0.0" to accept connections from any network interface, or "127.0.0.1" for localhost onlyChannel: Each subscriber gets an isolated channel with backpressure (prevents message loss)Broadcast: Single shared channel across subscribers (faster but may drop messages under load)The gRPC Source accepts events in Protocol Buffer format as defined in proto/drasi/v1/source.proto and proto/drasi/v1/common.proto.
message SourceChange {
ChangeType type = 1; // INSERT, UPDATE, or DELETE
oneof change {
Element element = 2; // For INSERT/UPDATE operations
ElementMetadata metadata = 3; // For DELETE operations
}
google.protobuf.Timestamp timestamp = 4;
string source_id = 5;
}
enum ChangeType {
CHANGE_TYPE_UNSPECIFIED = 0;
CHANGE_TYPE_INSERT = 1;
CHANGE_TYPE_UPDATE = 2;
CHANGE_TYPE_DELETE = 3;
}
message Element {
oneof element {
Node node = 1;
Relation relation = 2;
}
}
message Node {
ElementMetadata metadata = 1;
google.protobuf.Struct properties = 2;
}
Fields:
metadata: Required metadata containing reference, labels, and effective timestampproperties: Key-value properties using google.protobuf.Struct (flexible schema)message Relation {
ElementMetadata metadata = 1;
ElementReference in_node = 2; // Target node (relationship points to)
ElementReference out_node = 3; // Source node (relationship comes from)
google.protobuf.Struct properties = 4;
}
Fields:
metadata: Required metadata for the relationin_node: Reference to the target node (where the arrow points)out_node: Reference to the source node (where the arrow originates)properties: Key-value properties of the relationshipDirection semantics: (out_node)-[relation]->(in_node)
message ElementMetadata {
ElementReference reference = 1;
repeated string labels = 2;
uint64 effective_from = 3; // Unix timestamp in nanoseconds
}
Fields:
reference: Unique identifier (source_id + element_id)labels: Classification labels for pattern matching (e.g., ["User", "Customer"])effective_from: Timestamp in nanoseconds since Unix epochmessage ElementReference {
string source_id = 1;
string element_id = 2;
}
Fields:
source_id: Identifies which source owns this elementelement_id: Unique identifier within the sourceThe gRPC Source implements the drasi.v1.SourceService:
Submit a single event.
rpc SubmitEvent(SubmitEventRequest) returns (SubmitEventResponse);
message SubmitEventRequest {
SourceChange event = 1;
}
message SubmitEventResponse {
bool success = 1;
string message = 2;
string error = 3;
string event_id = 4; // UUID assigned to the event
}
Stream multiple events for bulk processing.
rpc StreamEvents(stream SourceChange) returns (stream StreamEventResponse);
message StreamEventResponse {
bool success = 1;
string message = 2;
string error = 3;
uint64 events_processed = 4;
}
Behavior:
SourceChange messagesRequest initial data snapshot (extensible for future use).
rpc RequestBootstrap(BootstrapRequest) returns (stream BootstrapResponse);
message BootstrapRequest {
string query_id = 1;
repeated string node_labels = 2;
repeated string relation_labels = 3;
}
message BootstrapResponse {
repeated Element elements = 1;
uint32 total_count = 2;
}
Current behavior: Returns empty stream (placeholder for future implementation)
Check service health status.
rpc HealthCheck(google.protobuf.Empty) returns (HealthCheckResponse);
message HealthCheckResponse {
enum Status {
STATUS_UNSPECIFIED = 0;
STATUS_HEALTHY = 1;
STATUS_DEGRADED = 2;
STATUS_UNHEALTHY = 3;
}
Status status = 1;
string message = 2;
string version = 3; // Package version
}
The google.protobuf.Struct supports the following value types:
| Protobuf Type | Drasi ElementValue | Notes |
|---|---|---|
null_value |
Null |
Null/missing value |
bool_value |
Bool |
Boolean true/false |
number_value (integer) |
Integer |
Whole numbers without fractional part |
number_value (float) |
Float |
Numbers with fractional part |
string_value |
String |
UTF-8 text |
list_value |
String (JSON) |
Arrays converted to JSON string |
struct_value |
String (JSON) |
Objects converted to JSON string |
Note: Complex types (lists, structs) are serialized as JSON strings for storage in Drasi's graph model.
use drasi_source_grpc::GrpcSource;
use drasi_lib::DrasiLib;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Create Drasi instance
let drasi = DrasiLib::builder()
.with_id("my-instance")
.build()
.await?;
// Create and start gRPC source
let source = GrpcSource::builder("events")
.with_host("0.0.0.0")
.with_port(50051)
.build()?;
drasi.add_source(source).await?;
drasi.start_source("events").await?;
println!("gRPC source listening on 0.0.0.0:50051");
// Keep server running
tokio::signal::ctrl_c().await?;
Ok(())
}
import grpc
from google.protobuf import struct_pb2
import drasi.v1.source_pb2 as source_pb2
import drasi.v1.source_pb2_grpc as source_pb2_grpc
import drasi.v1.common_pb2 as common_pb2
import time
# Connect to gRPC source
channel = grpc.insecure_channel('localhost:50051')
stub = source_pb2_grpc.SourceServiceStub(channel)
# Create properties
props = struct_pb2.Struct()
props['name'] = 'Alice'
props['email'] = 'alice@example.com'
props['age'] = 30
props['active'] = True
# Create metadata
metadata = common_pb2.ElementMetadata(
reference=common_pb2.ElementReference(
source_id="my-grpc-source",
element_id="user_001"
),
labels=["User", "Customer"],
effective_from=int(time.time() * 1e9) # Nanoseconds
)
# Create node
node = common_pb2.Node(
metadata=metadata,
properties=props
)
# Create source change
change = common_pb2.SourceChange(
type=common_pb2.CHANGE_TYPE_INSERT,
element=common_pb2.Element(node=node)
)
# Submit event
request = source_pb2.SubmitEventRequest(event=change)
response = stub.SubmitEvent(request)
print(f"Success: {response.success}")
print(f"Message: {response.message}")
print(f"Event ID: {response.event_id}")
import grpc
from google.protobuf import struct_pb2
import drasi.v1.source_pb2_grpc as source_pb2_grpc
import drasi.v1.common_pb2 as common_pb2
import time
channel = grpc.insecure_channel('localhost:50051')
stub = source_pb2_grpc.SourceServiceStub(channel)
def event_generator():
"""Generate 1000 user nodes"""
for i in range(1000):
props = struct_pb2.Struct()
props['name'] = f'User {i}'
props['index'] = i
props['active'] = i % 2 == 0
metadata = common_pb2.ElementMetadata(
reference=common_pb2.ElementReference(
source_id="my-grpc-source",
element_id=f"user_{i:04d}"
),
labels=["User"],
effective_from=int(time.time() * 1e9)
)
node = common_pb2.Node(metadata=metadata, properties=props)
yield common_pb2.SourceChange(
type=common_pb2.CHANGE_TYPE_INSERT,
element=common_pb2.Element(node=node)
)
# Stream events
responses = stub.StreamEvents(event_generator())
for response in responses:
print(f"Processed: {response.events_processed} events")
if not response.success:
print(f"Error: {response.error}")
print("Stream completed")
package main
import (
"context"
"log"
"time"
pb "your-module/drasi/v1"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/structpb"
)
func main() {
// Connect to gRPC source
conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
if err != nil {
log.Fatalf("Failed to connect: %v", err)
}
defer conn.Close()
client := pb.NewSourceServiceClient(conn)
// Create relation properties
properties, _ := structpb.NewStruct(map[string]interface{}{
"since": "2024-01-01",
"strength": 0.85,
})
// Create relation (Alice FOLLOWS Bob)
event := &pb.SourceChange{
Type: pb.ChangeType_CHANGE_TYPE_INSERT,
Change: &pb.SourceChange_Element{
Element: &pb.Element{
Element: &pb.Element_Relation{
Relation: &pb.Relation{
Metadata: &pb.ElementMetadata{
Reference: &pb.ElementReference{
SourceId: "my-grpc-source",
ElementId: "follows_001",
},
Labels: []string{"FOLLOWS"},
EffectiveFrom: uint64(time.Now().UnixNano()),
},
OutNode: &pb.ElementReference{
SourceId: "my-grpc-source",
ElementId: "user_001", // Alice
},
InNode: &pb.ElementReference{
SourceId: "my-grpc-source",
ElementId: "user_002", // Bob
},
Properties: properties,
},
},
},
},
}
// Submit event
resp, err := client.SubmitEvent(context.Background(), &pb.SubmitEventRequest{
Event: event,
})
if err != nil {
log.Fatalf("Failed to submit: %v", err)
}
log.Printf("Success: %v, Message: %s", resp.Success, resp.Message)
}
use tonic::transport::Channel;
use prost_types::{Struct, Value};
use std::collections::HashMap;
pub mod drasi {
pub mod v1 {
tonic::include_proto!("drasi.v1");
}
}
use drasi::v1::{
source_service_client::SourceServiceClient,
SubmitEventRequest, SourceChange, Element, Node, ElementMetadata,
ElementReference, ChangeType,
};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = SourceServiceClient::connect("http://localhost:50051").await?;
// Update example
let mut update_props = HashMap::new();
update_props.insert(
"email".to_string(),
Value {
kind: Some(prost_types::value::Kind::StringValue(
"alice.new@example.com".to_string()
)),
},
);
update_props.insert(
"verified".to_string(),
Value {
kind: Some(prost_types::value::Kind::BoolValue(true)),
},
);
let update_event = SourceChange {
r#type: ChangeType::Update as i32,
change: Some(drasi::v1::source_change::Change::Element(Element {
element: Some(drasi::v1::element::Element::Node(Node {
metadata: Some(ElementMetadata {
reference: Some(ElementReference {
source_id: "my-grpc-source".to_string(),
element_id: "user_001".to_string(),
}),
labels: vec!["User".to_string(), "Verified".to_string()],
effective_from: chrono::Utc::now().timestamp_nanos() as u64,
}),
properties: Some(Struct { fields: update_props }),
})),
})),
timestamp: None,
source_id: String::new(),
};
let response = client.submit_event(SubmitEventRequest {
event: Some(update_event),
}).await?;
println!("Update: {}", response.into_inner().message);
// Delete example
let delete_event = SourceChange {
r#type: ChangeType::Delete as i32,
change: Some(drasi::v1::source_change::Change::Metadata(ElementMetadata {
reference: Some(ElementReference {
source_id: "my-grpc-source".to_string(),
element_id: "user_999".to_string(),
}),
labels: vec!["User".to_string()],
effective_from: chrono::Utc::now().timestamp_nanos() as u64,
})),
timestamp: None,
source_id: String::new(),
};
let response = client.submit_event(SubmitEventRequest {
event: Some(delete_event),
}).await?;
println!("Delete: {}", response.into_inner().message);
Ok(())
}
# Install grpcurl
go install github.com/fullstorydev/grpcurl/cmd/grpcurl@latest
# List available services
grpcurl -plaintext localhost:50051 list
# Health check
grpcurl -plaintext localhost:50051 drasi.v1.SourceService/HealthCheck
# Submit a node insert event
grpcurl -plaintext -d '{
"event": {
"type": "CHANGE_TYPE_INSERT",
"element": {
"node": {
"metadata": {
"reference": {
"sourceId": "my-grpc-source",
"elementId": "test_001"
},
"labels": ["TestNode"],
"effectiveFrom": "1234567890000000000"
},
"properties": {
"name": "Test Item",
"value": 42
}
}
}
}
}' localhost:50051 drasi.v1.SourceService/SubmitEvent
Events submitted through the gRPC source flow into Drasi's continuous query engine where they can be matched against Cypher patterns.
// Submit a node with labels ["User", "Premium"]
// This will match Cypher patterns like:
// MATCH (u:User) ...
// MATCH (p:Premium) ...
// MATCH (u:User:Premium) ...
// Submit a node with property active=true
// This will be filtered by Cypher WHERE clauses:
// MATCH (u:User) WHERE u.active = true
// MATCH (u:User) WHERE u.age > 18
// Submit a relation (user_001)-[:FOLLOWS]->(user_002)
// This enables Cypher patterns:
// MATCH (a:User)-[:FOLLOWS]->(b:User)
// MATCH (a)-[f:FOLLOWS]->(b) WHERE f.strength > 0.8
Channel Mode (Default):
Broadcast Mode:
let source = GrpcSource::builder("events")
.with_dispatch_mode(DispatchMode::Broadcast)
.with_dispatch_buffer_capacity(5000)
.build()?;
Enable initial data snapshots for queries:
use drasi_lib::bootstrap::BootstrapProvider;
struct MyBootstrapProvider {
// ... your implementation
}
impl BootstrapProvider for MyBootstrapProvider {
// ... implement trait methods
}
let source = GrpcSource::builder("events")
.with_bootstrap_provider(MyBootstrapProvider::new())
.build()?;
The gRPC source includes built-in profiling metadata:
// Automatically tracked:
// - source_send_ns: Timestamp when event enters the source
// - Propagated through the query pipeline
// - Available in reactions for end-to-end latency measurement
Validation Errors:
Response contains:
SubmitEventResponse {
success: false,
message: "Invalid event data",
error: "Validation error: Node element missing required 'metadata' field",
event_id: ""
}
Streaming behavior:
// The source handles shutdown signals gracefully:
// 1. Stops accepting new connections
// 2. Completes in-flight requests
// 3. Closes channels
// 4. Releases resources
drasi.stop_source("events").await?;
# Install tools
pip install grpcio-tools
# Generate code
python -m grpc_tools.protoc \
-I./proto \
--python_out=./generated \
--grpc_python_out=./generated \
proto/drasi/v1/source.proto \
proto/drasi/v1/common.proto
# Install tools
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
# Generate code
protoc \
--go_out=./generated \
--go-grpc_out=./generated \
--go_opt=paths=source_relative \
--go-grpc_opt=paths=source_relative \
-I ./proto \
proto/drasi/v1/source.proto \
proto/drasi/v1/common.proto
Add to your build.rs:
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::configure()
.build_server(false)
.build_client(true)
.compile(
&[
"proto/drasi/v1/source.proto",
"proto/drasi/v1/common.proto",
],
&["proto"],
)?;
Ok(())
}
# Install tools
npm install -g grpc-tools grpc_tools_node_protoc_ts
# Generate code
grpc_tools_node_protoc \
--js_out=import_style=commonjs,binary:./generated \
--grpc_out=grpc_js:./generated \
--plugin=protoc-gen-grpc=`which grpc_tools_node_protoc_plugin` \
-I ./proto \
proto/drasi/v1/source.proto \
proto/drasi/v1/common.proto
dispatch_buffer_capacity based on throughput| Scenario | Events/sec | Notes |
|---|---|---|
| Unary RPC (single event) | 500-1000 | Round-trip per event |
| Streaming (batched) | 10,000-50,000 | Depends on event size |
| Streaming (small events) | 100,000+ | Minimal properties |
| Feature | gRPC Source | HTTP Source |
|---|---|---|
| Protocol | HTTP/2 + Protocol Buffers | HTTP/1.1 + JSON |
| Type Safety | Strongly typed (protobuf) | JSON schema validation |
| Performance | Higher throughput, lower latency | Good for moderate loads |
| Streaming | Native bidirectional streaming | Batch POST endpoint |
| Message Size | Smaller (binary encoding) | Larger (text JSON) |
| Client Generation | Auto-generated from .proto files | Manual or OpenAPI codegen |
| Browser Support | Limited (requires grpc-web proxy) | Native browser support |
| Debugging | Requires gRPC tools (grpcurl, Postman) | Standard HTTP tools (curl) |
| Best For | High-volume, microservices, IoT | Web apps, simple integration |
The gRPC source currently runs without authentication or encryption. For production deployments:
// Future enhancement: TLS configuration
// Configure server TLS certificates
// Enforce encrypted connections
// Future enhancement: Authentication interceptors
// Implement token-based auth
// Add API key validation
Error: Connection refused
Solutions:
drasi.start_source("my-grpc").await?lsof -i :50051SubmitEventResponse { success: false, error: "Validation error: Node element missing required 'metadata' field" }
Solutions:
Symptoms: High latency, timeouts
Solutions:
dispatch_buffer_capacity[my-grpc-source] Failed to dispatch (no subscribers): No subscribers available
This is normal: Events are dropped if no queries are subscribed. Add a query:
let query = Query::cypher("my-query")
.query("MATCH (n) RETURN n")
.from_source("my-grpc-source")
.build();
drasi.add_query(query).await?;
src/lib.rs: Main source implementation and gRPC servicesrc/config.rs: Configuration structs and validationsrc/tests.rs: Unit testsbuild.rs: Protobuf compilationproto/drasi/v1/: Protobuf schema definitions# Run unit tests
cargo test -p drasi-source-grpc
# Run with logging
RUST_LOG=debug cargo test -p drasi-source-grpc -- --nocapture
# Test specific module
cargo test -p drasi-source-grpc proto_conversion
When modifying the gRPC source:
proto/drasi/v1/cargo build (runs build.rs)cargo clippy and cargo fmtLicensed under the Apache License, Version 2.0. See the LICENSE file for details.