aegis-streaming

Crates.ioaegis-streaming
lib.rsaegis-streaming
version0.1.7
created_at2026-01-20 03:09:21.097246+00
updated_at2026-01-24 03:50:31.319108+00
descriptionReal-time streaming for Aegis database
homepagehttps://automatanexus.com
repositoryhttps://github.com/AutomataNexus/Aegis-DB
max_upload_size
id2055708
size108,252
Andrew Jewell Sr. (AutomataControls)

documentation

README

AegisDB Logo

aegis-streaming

License Rust Version AegisDB

Real-time streaming engine for the Aegis Database Platform.

Overview

aegis-streaming provides pub/sub messaging, change data capture (CDC), event sourcing, and stream processing capabilities. It enables real-time data flows and reactive applications.

Features

  • Pub/Sub Messaging - Topic-based message publishing and subscription
  • Change Data Capture - Track all data changes in real-time
  • Event Sourcing - Store events as the source of truth
  • Stream Processing - Transform and aggregate streaming data
  • Persistent Subscriptions - Durable message delivery

Architecture

┌─────────────────────────────────────────────────┐
│              Streaming Engine                    │
├─────────────────────────────────────────────────┤
│              Channel Manager                     │
│  ┌──────────┬──────────────┬─────────────────┐  │
│  │  Topics  │  Partitions  │  Consumer       │  │
│  │          │              │  Groups         │  │
│  └──────────┴──────────────┴─────────────────┘  │
├─────────────────────────────────────────────────┤
│              CDC Engine                          │
│  ┌──────────┬──────────────┬─────────────────┐  │
│  │  WAL     │   Change     │  Subscription   │  │
│  │  Reader  │   Tracker    │   Manager       │  │
│  └──────────┴──────────────┴─────────────────┘  │
├─────────────────────────────────────────────────┤
│           Event Store (Append-Only)             │
└─────────────────────────────────────────────────┘

Modules

Module Description
engine Main streaming engine
channel Pub/sub channel management
stream Stream abstraction
subscriber Subscription handling
cdc Change data capture
event Event definitions

Usage

[dependencies]
aegis-streaming = { path = "../aegis-streaming" }

Pub/Sub Messaging

use aegis_streaming::{StreamingEngine, Topic, Message};

let engine = StreamingEngine::new(config)?;

// Create a topic
engine.create_topic("orders", TopicConfig {
    partitions: 4,
    retention: Duration::from_days(7),
    replication_factor: 3,
})?;

// Publish messages
engine.publish("orders", Message {
    key: Some("order-123".into()),
    payload: json!({"id": "123", "amount": 99.99}),
    headers: HashMap::new(),
}).await?;

// Subscribe to messages
let mut subscriber = engine.subscribe("orders", SubscribeConfig {
    group_id: "order-processor",
    from: Offset::Latest,
}).await?;

while let Some(msg) = subscriber.next().await {
    println!("Received: {:?}", msg.payload);
    msg.ack().await?;
}

Change Data Capture (CDC)

use aegis_streaming::cdc::{CDCEngine, ChangeEvent};

let cdc = CDCEngine::new(storage)?;

// Subscribe to changes on a table
let mut changes = cdc.subscribe("users", CDCConfig {
    operations: vec![Operation::Insert, Operation::Update, Operation::Delete],
    include_old_values: true,
}).await?;

while let Some(event) = changes.next().await {
    match event {
        ChangeEvent::Insert { new_row, .. } => {
            println!("New user: {:?}", new_row);
        }
        ChangeEvent::Update { old_row, new_row, .. } => {
            println!("Updated from {:?} to {:?}", old_row, new_row);
        }
        ChangeEvent::Delete { old_row, .. } => {
            println!("Deleted: {:?}", old_row);
        }
    }
}

Event Sourcing

use aegis_streaming::event::{EventStore, Event, AggregateRoot};

let store = EventStore::new(storage)?;

// Append events
store.append("account-123", vec![
    Event::new("AccountCreated", json!({"owner": "Alice"})),
    Event::new("MoneyDeposited", json!({"amount": 100.0})),
    Event::new("MoneyWithdrawn", json!({"amount": 30.0})),
]).await?;

// Load aggregate from events
let events = store.load("account-123").await?;
let account = Account::from_events(events);
println!("Balance: {}", account.balance);

// Subscribe to events
let mut stream = store.subscribe("account-*").await?;
while let Some(event) = stream.next().await {
    println!("Event: {} on {}", event.event_type, event.aggregate_id);
}

Stream Processing

use aegis_streaming::stream::{Stream, StreamProcessor};

let processor = StreamProcessor::new();

// Define processing pipeline
processor
    .source("raw-events")
    .filter(|e| e["type"] == "purchase")
    .map(|e| json!({
        "customer": e["customer_id"],
        "amount": e["total"],
        "timestamp": e["created_at"]
    }))
    .window(Duration::from_mins(5))
    .aggregate(|window| json!({
        "total_purchases": window.len(),
        "total_amount": window.iter().map(|e| e["amount"].as_f64().unwrap()).sum::<f64>()
    }))
    .sink("purchase-metrics")
    .start()
    .await?;

Consumer Groups

// Multiple consumers in a group share the load
let consumer1 = engine.subscribe("orders", SubscribeConfig {
    group_id: "processors",
    ..Default::default()
}).await?;

let consumer2 = engine.subscribe("orders", SubscribeConfig {
    group_id: "processors",  // Same group
    ..Default::default()
}).await?;

// Each message is delivered to only one consumer in the group

Message Delivery Guarantees

Mode Description
AtMostOnce Fire and forget, may lose messages
AtLeastOnce Guaranteed delivery, may duplicate
ExactlyOnce Guaranteed exactly-once (with transactions)

Configuration

[streaming]
max_message_size = "1MB"
default_retention = "7d"

[streaming.consumer]
max_poll_records = 500
auto_commit = false
session_timeout = "30s"

[streaming.cdc]
enabled = true
buffer_size = 10000

Tests

cargo test -p aegis-streaming

Test count: 31 tests

License

Apache-2.0

Commit count: 0

cargo fmt