| Crates.io | fuse-rule |
| lib.rs | fuse-rule |
| version | 0.1.0 |
| created_at | 2025-12-30 03:23:48.650304+00 |
| updated_at | 2025-12-30 03:23:48.650304+00 |
| description | High-performance, Arrow-native Complex Event Processing (CEP) engine with SQL-powered rules |
| homepage | https://github.com/hamzzy/arrow-rule-agent |
| repository | https://github.com/hamzzy/arrow-rule-agent |
| max_upload_size | |
| id | 2011982 |
| size | 1,363,709 |
FuseRule is a high-performance, developer-first rule engine built for the cloud-native ecosystem. It leverages Apache Arrow and DataFusion to provide a lightning-fast, SQL-expressive core for real-time data auditing and event processing.
Designed as an Infrastructure Primitive, FuseRule decouples its deterministic core from pluggable "edges" like persistence, evaluation engines, and notification agents.
StateStore, RuleEvaluator, and AgentEvaluationTrace logs for every ingestionSIGHUP without restarting the daemonActivated and Deactivated transitionsAdd to your Cargo.toml:
[dependencies]
fuse-rule = "0.1.0"
cargo install fuse-rule
Create fuse_rule_config.yaml:
engine:
persistence_path: "fuserule_state"
ingest_rate_limit: 1000 # requests per second
api_keys:
- "sk_live_abc123..."
schema:
- name: "price"
data_type: "float64"
- name: "symbol"
data_type: "utf8"
- name: "volume"
data_type: "int32"
agents:
- name: "logger"
type: "logger"
- name: "slack-webhook"
type: "webhook"
url: "https://hooks.slack.com/services/YOUR/WEBHOOK/URL"
template: |
{
"text": "π¨ {{rule_name}} triggered!",
"symbol": "{{matched_data.0.symbol}}",
"price": "{{matched_data.0.price}}"
}
rules:
- id: "high_price_alert"
name: "High Price Alert"
predicate: "price > 1000"
action: "slack-webhook"
version: 1
enabled: true
state_ttl_seconds: 3600 # Expire state after 1 hour
- id: "volume_spike"
name: "Volume Spike"
predicate: "AVG(volume) > 10000"
action: "logger"
window_seconds: 60 # 60-second sliding window
version: 1
enabled: true
fuserule run --config fuse_rule_config.yaml --port 3030
curl -X POST http://localhost:3030/ingest \
-H "Content-Type: application/json" \
-H "X-API-Key: sk_live_abc123..." \
-d '[{"price": 1500, "symbol": "AAPL", "volume": 5000}]'
# Get all rule states
curl http://localhost:3030/api/v1/state \
-H "X-API-Key: sk_live_abc123..."
# Get specific rule state
curl http://localhost:3030/api/v1/state/high_price_alert \
-H "X-API-Key: sk_live_abc123..."
use arrow_rule_agent::{RuleEngine, config::FuseRuleConfig};
use arrow::array::{Float64Array, StringArray};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use std::sync::Arc;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Load configuration
let config = FuseRuleConfig::from_file("fuse_rule_config.yaml")?;
// Create engine from config
let mut engine = RuleEngine::from_config(config).await?;
// Create a test batch
let schema = Schema::new(vec![
Field::new("price", DataType::Float64, true),
Field::new("symbol", DataType::Utf8, true),
]);
let price_array = Arc::new(Float64Array::from(vec![1500.0, 500.0]));
let symbol_array = Arc::new(StringArray::from(vec!["AAPL", "GOOGL"]));
let batch = RecordBatch::try_new(
Arc::new(schema),
vec![price_array, symbol_array],
)?;
// Process batch and get evaluation traces
let traces = engine.process_batch(&batch).await?;
for trace in traces {
if trace.action_fired {
println!("Rule '{}' activated!", trace.rule_name);
}
}
Ok(())
}
use arrow_rule_agent::{RuleEngine, rule::Rule};
// Add a rule programmatically
let rule = Rule {
id: "custom_rule".to_string(),
name: "Custom Rule".to_string(),
predicate: "price > 100 AND volume < 50".to_string(),
action: "logger".to_string(),
window_seconds: None,
version: 1,
enabled: true,
};
engine.add_rule(rule).await?;
// Update a rule
engine.update_rule("custom_rule", updated_rule).await?;
// Toggle rule
engine.toggle_rule("custom_rule", false).await?;
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β FuseRule Engine β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β
β β Ingest βββββΆβ Evaluate βββββΆβ Activate β β
β β Sources β β Rules β β Agents β β
β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β
β β β β β
β β β β β
β βΌ βΌ βΌ β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Arrow RecordBatch (Zero-Copy) β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β
β β State β β Windows β β Metrics β β
β β Store β β Buffers β β (Prom) β β
β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
FuseRule is built on the philosophy that the core logic of a rule engine should be a "boring," deterministic primitive, while the integration points (Ingress, Persistence, Notifications) should be flexible and pluggable.
Core (Hard):
Edges (Soft):
StateStore trait (Sled, Redis, etc.)RuleEvaluator trait (DataFusion, custom SQL engines)Agent trait (Webhooks, Loggers, custom actions)fuserule run --config fuse_rule_config.yaml --port 3030
# Validate all rules in config
fuserule validate --config fuse_rule_config.yaml
# Validate specific predicate
fuserule validate --config fuse_rule_config.yaml --predicate "price > 100"
fuserule repl --config fuse_rule_config.yaml
fuserule debug --config fuse_rule_config.yaml
GET /status - Server statusGET /health - Health check with engine statsGET /metrics - Prometheus metricsX-API-Key header)GET /rules - List all rulesPOST /api/v1/rules - Create new rulePUT /api/v1/rules/:id - Update rulePATCH /api/v1/rules/:id - Partial update (e.g., enable/disable)DELETE /api/v1/rules/:id - Delete rulePOST /api/v1/rules/validate - Validate rule predicateGET /api/v1/state - Get all rule statesGET /api/v1/state/:rule_id - Get specific rule statePOST /ingest - Ingest JSON data (rate-limited)FuseRule exposes Prometheus metrics at /metrics:
fuserule_batches_processed_total - Total batches ingestedfuserule_activations_total - Total rule activationsfuserule_agent_failures_total - Total agent failuresfuserule_evaluation_duration_seconds - Evaluation latency histogramfuserule_rule_evaluations_total{rule_id} - Per-rule evaluation countfuserule_rule_activations_total{rule_id} - Per-rule activation countcurl -X POST http://localhost:3030/ingest \
-H "Content-Type: application/json" \
-d '[{"price": 150, "symbol": "AAPL"}]'
Configure in fuse_rule_config.yaml:
sources:
- type: "kafka"
brokers: ["localhost:9092"]
topic: "events"
group_id: "fuserule"
auto_commit: true
Configure in fuse_rule_config.yaml:
sources:
- type: "websocket"
bind: "0.0.0.0:3031"
max_connections: 1000
Connect and send JSON:
const ws = new WebSocket('ws://localhost:3031/ws');
ws.send(JSON.stringify([{"price": 150, "symbol": "AAPL"}]));
Use Handlebars templates for custom webhook payloads:
agents:
- name: "custom-webhook"
type: "webhook"
url: "https://api.example.com/webhook"
template: |
{
"alert": "{{rule_name}}",
"timestamp": "{{timestamp}}",
"data": {{#each matched_data}}
{
"price": {{price}},
"symbol": "{{symbol}}"
}{{#unless @last}},{{/unless}}
{{/each}},
"count": {{count}}
}
cargo test --test unit_test
cargo test --test integration_test
cargo test --test property_test
cargo doc --no-deps --open
This will build and open the documentation in your browser at target/doc/arrow_rule_agent/index.htmlContributions are welcome! Please see CONTRIBUTING.md for guidelines.
Licensed under the Apache License, Version 2.0. See LICENSE for details.