| Crates.io | drasi-source-http |
| lib.rs | drasi-source-http |
| version | 0.1.2 |
| created_at | 2026-01-14 23:21:10.663372+00 |
| updated_at | 2026-01-23 06:14:58.528204+00 |
| description | HTTP source plugin for Drasi |
| homepage | |
| repository | https://github.com/drasi-project/drasi-core |
| max_upload_size | |
| id | 2044157 |
| size | 193,617 |
A Drasi source plugin that exposes HTTP endpoints for receiving data change events. It provides both single-event and batch submission modes with adaptive batching for optimized throughput.
The HTTP Source is a plugin for the Drasi continuous query system that allows applications to submit graph data changes (nodes and relations) via REST API endpoints. It features:
The builder pattern provides a fluent, type-safe API for constructing HTTP sources:
use drasi_source_http::HttpSource;
// Basic HTTP source
let source = HttpSource::builder("my-source")
.with_host("0.0.0.0")
.with_port(8080)
.with_auto_start(true)
.build()?;
// With adaptive batching tuning
let source = HttpSource::builder("high-throughput-source")
.with_host("0.0.0.0")
.with_port(9000)
.with_adaptive_max_batch_size(2000)
.with_adaptive_min_batch_size(50)
.with_adaptive_max_wait_ms(200)
.with_adaptive_enabled(true)
.build()?;
// With custom dispatch settings
let source = HttpSource::builder("custom-source")
.with_host("localhost")
.with_port(8080)
.with_dispatch_mode(DispatchMode::Channel)
.with_dispatch_buffer_capacity(5000)
.build()?;
// With bootstrap provider
let source = HttpSource::builder("bootstrapped-source")
.with_host("0.0.0.0")
.with_port(8080)
.with_bootstrap_provider(postgres_provider)
.build()?;
Alternatively, use HttpSourceConfig directly:
use drasi_source_http::{HttpSource, HttpSourceConfig};
let config = HttpSourceConfig {
host: "0.0.0.0".to_string(),
port: 8080,
endpoint: None,
timeout_ms: 10000,
adaptive_max_batch_size: Some(1000),
adaptive_min_batch_size: Some(10),
adaptive_max_wait_ms: Some(100),
adaptive_min_wait_ms: Some(1),
adaptive_window_secs: Some(5),
adaptive_enabled: Some(true),
};
let source = HttpSource::new("my-source", config)?;
When using DrasiServer, configure HTTP sources via YAML:
sources:
- id: "my-http-source"
source_type: "http"
auto_start: true
host: "0.0.0.0"
port: 8080
endpoint: "/events"
timeout_ms: 30000
adaptive_enabled: true
adaptive_max_batch_size: 1000
adaptive_min_batch_size: 10
adaptive_max_wait_ms: 100
adaptive_min_wait_ms: 1
adaptive_window_secs: 5
| Name | Description | Data Type | Valid Values | Default |
|---|---|---|---|---|
host |
HTTP server host address to bind to | String | Any valid hostname or IP | Required |
port |
HTTP server port number | u16 | 1-65535 | 8080 |
endpoint |
Optional custom endpoint path | Option |
Any valid path | None |
timeout_ms |
Request timeout in milliseconds | u64 | Any positive integer | 10000 |
auto_start |
Whether to start automatically when added to DrasiLib | bool | true, false |
true |
| Name | Description | Data Type | Valid Values | Default |
|---|---|---|---|---|
adaptive_enabled |
Enable/disable adaptive batching | Option |
true, false | true |
adaptive_max_batch_size |
Maximum events per batch | Option |
Any positive integer | 1000 |
adaptive_min_batch_size |
Minimum events per batch | Option |
Any positive integer | 10 |
adaptive_max_wait_ms |
Maximum wait time before dispatching (ms) | Option |
Any positive integer | 100 |
adaptive_min_wait_ms |
Minimum wait time between batches (ms) | Option |
Any positive integer | 1 |
adaptive_window_secs |
Throughput measurement window (seconds) | Option |
Any positive integer | 5 |
| Name | Description | Data Type | Valid Values | Default |
|---|---|---|---|---|
dispatch_mode |
Event routing mode | DispatchMode | Channel, Broadcast | Channel |
dispatch_buffer_capacity |
Buffer size for dispatch channel | usize | Any positive integer | 1000 |
| Name | Description | Data Type | Valid Values | Default |
|---|---|---|---|---|
bootstrap_provider |
Bootstrap provider for initial data | Box |
Any provider implementation | None |
The HTTP source accepts JSON data in the HttpSourceChange format. All events use a tagged union structure with an operation field.
{
"operation": "insert",
"element": {
"type": "node",
"id": "user-123",
"labels": ["User", "Customer"],
"properties": {
"name": "Alice",
"email": "alice@example.com",
"age": 30,
"active": true
}
},
"timestamp": 1699900000000000000
}
{
"operation": "insert",
"element": {
"type": "relation",
"id": "follows-1",
"labels": ["FOLLOWS"],
"from": "user-123",
"to": "user-456",
"properties": {
"since": "2024-01-01",
"weight": 1.0
}
}
}
{
"operation": "update",
"element": {
"type": "node",
"id": "user-123",
"labels": ["User", "Premium"],
"properties": {
"name": "Alice Updated",
"membership": "premium"
}
},
"timestamp": 1699900001000000000
}
{
"operation": "delete",
"id": "user-123",
"labels": ["User"],
"timestamp": 1699900002000000000
}
{
"events": [
{
"operation": "insert",
"element": {
"type": "node",
"id": "1",
"labels": ["Test"],
"properties": {}
}
},
{
"operation": "insert",
"element": {
"type": "node",
"id": "2",
"labels": ["Test"],
"properties": {}
}
}
]
}
use drasi_source_http::HttpSource;
use drasi_lib::Source;
// Create and start the source
let source = HttpSource::builder("my-source")
.with_host("0.0.0.0")
.with_port(8080)
.build()?;
source.start().await?;
// Check status
assert_eq!(source.status().await, ComponentStatus::Running);
# Submit single node
curl -X POST http://localhost:8080/sources/my-source/events \
-H "Content-Type: application/json" \
-d '{
"operation": "insert",
"element": {
"type": "node",
"id": "user-1",
"labels": ["User"],
"properties": {"name": "Alice"}
}
}'
# Submit batch of events
curl -X POST http://localhost:8080/sources/my-source/events/batch \
-H "Content-Type: application/json" \
-d '{
"events": [
{
"operation": "insert",
"element": {
"type": "node",
"id": "user-1",
"labels": ["User"],
"properties": {"name": "Alice"}
}
},
{
"operation": "insert",
"element": {
"type": "node",
"id": "user-2",
"labels": ["User"],
"properties": {"name": "Bob"}
}
}
]
}'
# Health check
curl http://localhost:8080/health
import requests
import json
# Submit a node
event = {
"operation": "insert",
"element": {
"type": "node",
"id": "sensor-42",
"labels": ["Sensor", "IoT"],
"properties": {
"temperature": 72.5,
"location": "Building A",
"active": True
}
}
}
response = requests.post(
"http://localhost:8080/sources/my-source/events",
json=event
)
print(response.json())
# {"success": true, "message": "All 1 events processed successfully"}
const axios = require('axios');
async function submitEvent() {
const event = {
operation: 'insert',
element: {
type: 'node',
id: 'product-123',
labels: ['Product'],
properties: {
name: 'Widget',
price: 29.99,
inStock: true
}
}
};
const response = await axios.post(
'http://localhost:8080/sources/my-source/events',
event
);
console.log(response.data);
}
use drasi_source_http::HttpSource;
use drasi_lib::{DrasiLib, Query};
#[tokio::main]
async fn main() -> Result<()> {
// Create HTTP source
let http_source = HttpSource::builder("http-source")
.with_host("0.0.0.0")
.with_port(8080)
.with_adaptive_enabled(true)
.build()?;
// Create continuous query
let query = Query::cypher("active-users")
.query("MATCH (u:User) WHERE u.active = true RETURN u.name")
.from_source("http-source")
.build();
// Initialize Drasi
let drasi = DrasiLib::new()
.with_source(http_source)
.with_query(query)
.build()
.await?;
// Start processing
drasi.start().await?;
// Now submit events via HTTP POST to localhost:8080
// Query results will update automatically as events arrive
Ok(())
}
The HTTP source exposes the following endpoints:
| Method | Path | Description |
|---|---|---|
| POST | /sources/{source_id}/events |
Submit a single event |
| POST | /sources/{source_id}/events/batch |
Submit multiple events |
| GET | /health |
Health check (returns service status and features) |
Success:
{
"success": true,
"message": "All 2 events processed successfully",
"error": null
}
Partial success (batch):
{
"success": true,
"message": "Processed 8 events successfully, 2 failed",
"error": "Invalid element type"
}
Error:
{
"success": false,
"message": "All 1 events failed",
"error": "Source name mismatch"
}
The HTTP source includes intelligent batching that automatically adjusts based on throughput:
| Level | Messages/Second | Batch Size | Wait Time |
|---|---|---|---|
| Idle | < 1 | Minimum | Minimum (1ms) |
| Low | 1-100 | Small (2x min) | 1ms |
| Medium | 100-1,000 | Moderate (25% of max) | 10ms |
| High | 1,000-10,000 | Large (50% of max) | 25ms |
| Burst | > 10,000 | Maximum | 50ms |
For low latency (real-time dashboards):
.with_adaptive_max_wait_ms(10)
.with_adaptive_min_batch_size(1)
For high throughput (bulk data ingestion):
.with_adaptive_max_batch_size(5000)
.with_adaptive_max_wait_ms(500)
To disable adaptive batching:
.with_adaptive_enabled(false)
The HTTP source supports universal bootstrap - any bootstrap provider can be used to load initial data before streaming begins.
Bootstrap from PostgreSQL, stream via HTTP:
let postgres_provider = PostgresBootstrapProvider::new(config)?;
let source = HttpSource::builder("http-source")
.with_host("0.0.0.0")
.with_port(8080)
.with_bootstrap_provider(postgres_provider)
.build()?;
Bootstrap from files:
let file_provider = ScriptFileBootstrapProvider::new(file_paths)?;
let source = HttpSource::builder("http-source")
.with_host("0.0.0.0")
.with_port(8080)
.with_bootstrap_provider(file_provider)
.build()?;
Port already in use:
Failed to bind HTTP server to 0.0.0.0:8080: Address already in use
Solution: Change port or stop conflicting service
Invalid JSON:
{"success": false, "message": "Failed to parse JSON", "error": "..."}
Solution: Validate JSON structure against schema
Source name mismatch:
{"success": false, "message": "Source name mismatch", "error": "Expected 'my-source', got 'wrong-source'"}
Solution: Ensure URL path matches source ID
Validation errors:
# Run all HTTP source tests
cargo test -p drasi-source-http
# Run specific test module
cargo test -p drasi-source-http construction
# Run with logging
RUST_LOG=debug cargo test -p drasi-source-http -- --nocapture
# Start test server
cargo run --example http_source_example
# In another terminal, submit test events
curl -X POST http://localhost:8080/sources/test-source/events \
-H "Content-Type: application/json" \
-d '{"operation":"insert","element":{"type":"node","id":"1","labels":["Test"],"properties":{}}}'
The internal batch channel capacity is automatically calculated as max_batch_size × 5:
| Max Batch Size | Channel Capacity | Memory (1KB/event) |
|---|---|---|
| 100 | 500 | ~500 KB |
| 1,000 | 5,000 | ~5 MB |
| 5,000 | 25,000 | ~25 MB |
HttpSourceChangedrasi_core::models::SourceChange