| Crates.io | allsource-core |
| lib.rs | allsource-core |
| version | 0.7.3 |
| created_at | 2025-12-05 01:36:05.521488+00 |
| updated_at | 2025-12-15 10:54:31.941152+00 |
| description | High-performance event store core built in Rust |
| homepage | https://github.com/all-source-os/chronos-monorepo |
| repository | https://github.com/all-source-os/chronos-monorepo |
| max_upload_size | |
| id | 1967433 |
| size | 1,528,440 |
AI-native event store built in Rust with columnar storage, schema validation, event replay, and stream processing
AllSource is a high-performance event store designed for modern event-sourcing and CQRS architectures. Built with a polyglot architecture:
services/control-plane): Orchestration, monitoring, and management layerThe Rust core provides blazing-fast event ingestion (469K events/sec) and sub-microsecond queries, while the Go control plane handles cluster coordination and operational tasks.
Current Version: v0.7.1 ยท crates.io ยท docs.rs
allsource-core is distributed as a library crate via crates.io.
cargo add allsource-core@0.7
Or add to your Cargo.toml (pin to minor version for stability):
[dependencies]
# allsource-core: High-performance event store
# Pin to minor version - allows patch updates only
allsource-core = "0.7"
Version Pinning Best Practice: We recommend
"0.7"(minor version) rather than"0.7.1"(exact) or"0"(major only). This allows automatic patch updates while avoiding breaking changes.
DashMap for O(1) lookupsThe codebase follows a strict layered architecture for maintainability and testability:
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Infrastructure Layer โ
โ (HTTP handlers, WebSocket, persistence, security) โ
โ infrastructure::web, infrastructure::persistence, โ
โ infrastructure::security, infrastructure::repositories โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ Application Layer โ
โ (Use cases, services, DTOs) โ
โ application::use_cases, application::services, โ
โ application::dto โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ Domain Layer โ
โ (Entities, value objects, repository traits) โ
โ domain::entities, domain::value_objects, โ
โ domain::repositories โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Module Organization:
| Layer | Path | Contents |
|---|---|---|
| Domain | domain::entities |
Event, Tenant, Schema, Projection, AuditEvent, EventStream |
| Domain | domain::value_objects |
EntityId, TenantId, EventType, PartitionKey |
| Domain | domain::repositories |
Repository trait definitions (no implementations) |
| Application | application::use_cases |
IngestEvent, QueryEvents, ManageTenant, ManageSchema |
| Application | application::services |
AuditLogger, Pipeline, Replay, Analytics |
| Application | application::dto |
Request/Response DTOs for API boundaries |
| Infrastructure | infrastructure::web |
HTTP handlers, WebSocket, API routes |
| Infrastructure | infrastructure::persistence |
Storage, WAL, Snapshots, Compaction, Index |
| Infrastructure | infrastructure::security |
Auth, Middleware, Rate Limiting |
| Infrastructure | infrastructure::repositories |
In-memory, PostgreSQL, RocksDB implementations |
Dependency Rule: Inner layers never depend on outer layers. Domain has zero external dependencies.
Based on battle-tested patterns from SierraDB, a production-grade event store:
PartitionKey - Fixed Partition Architecture
EventStream - Gapless Version Guarantees
EventStreamRepository - Infrastructure Implementation
StorageIntegrity - Corruption Prevention
7-Day Stress Tests - Production Hardening
Why These Patterns?
| Pattern | SierraDB's Lesson | Our Benefit |
|---|---|---|
| Fixed Partitions | Sequential writes enable gapless sequences | Horizontal scaling without complex coordination |
| Gapless Versions | Watermark prevents data gaps | Consistent event sourcing guarantees |
| Optimistic Locking | Detect concurrent modifications | Safe concurrent access without heavy locks |
Implemented:
Coming Next:
Measured on Apple Silicon M-series (release build):
| Operation | Throughput/Latency | Details |
|---|---|---|
| Event Ingestion | 442-469K events/sec | Single-threaded |
| Entity Query | 11.9 ฮผs | Indexed lookup |
| Type Query | 2.4 ms | Cross-entity scan |
| State Reconstruction | 3.5 ฮผs | With snapshots |
| State Reconstruction | 3.8 ฮผs | Without snapshots |
| Concurrent Writes (8 workers) | 8.0 ms/batch | 100 events/batch |
| Parquet Batch Write | 3.5 ms | 1000 events |
| Snapshot Creation | 130 ฮผs | Per entity |
| WAL Sync Writes | 413 ms | 100 syncs |
Test Coverage: 250 tests - 100% passing
# Health check
GET /health
# Ingest event
POST /api/v1/events
# Query events
GET /api/v1/events/query?entity_id=user-123
GET /api/v1/events/query?event_type=user.created
GET /api/v1/events/query?since=2024-01-15T00:00:00Z&limit=100
# Entity state
GET /api/v1/entities/:entity_id/state
GET /api/v1/entities/:entity_id/state?as_of=2024-01-15T10:00:00Z
GET /api/v1/entities/:entity_id/snapshot
# Statistics
GET /api/v1/stats
# Real-time event stream
WS /api/v1/events/stream
# Event frequency analysis
GET /api/v1/analytics/frequency?event_type=user.created&bucket_size=3600
# Statistical summary
GET /api/v1/analytics/summary?entity_id=user-123
# Event correlation
GET /api/v1/analytics/correlation?event_a=user.created&event_b=order.placed
# Create snapshot
POST /api/v1/snapshots
# List snapshots
GET /api/v1/snapshots
GET /api/v1/snapshots?entity_id=user-123
# Get latest snapshot
GET /api/v1/snapshots/:entity_id/latest
# Trigger manual compaction
POST /api/v1/compaction/trigger
# Get compaction stats
GET /api/v1/compaction/stats
# Register schema
POST /api/v1/schemas
# List subjects
GET /api/v1/schemas
# Get schema
GET /api/v1/schemas/:subject
GET /api/v1/schemas/:subject?version=2
# List versions
GET /api/v1/schemas/:subject/versions
# Validate event
POST /api/v1/schemas/validate
# Set compatibility mode
PUT /api/v1/schemas/:subject/compatibility
# Start replay
POST /api/v1/replay
# List replays
GET /api/v1/replay
# Get progress
GET /api/v1/replay/:replay_id
# Cancel replay
POST /api/v1/replay/:replay_id/cancel
# Delete replay
DELETE /api/v1/replay/:replay_id
# Register pipeline
POST /api/v1/pipelines
# List pipelines
GET /api/v1/pipelines
# Get pipeline
GET /api/v1/pipelines/:pipeline_id
# Remove pipeline
DELETE /api/v1/pipelines/:pipeline_id
# Get all stats
GET /api/v1/pipelines/stats
# Get pipeline stats
GET /api/v1/pipelines/:pipeline_id/stats
# Reset pipeline state
PUT /api/v1/pipelines/:pipeline_id/reset
Following Clean Architecture principles with clear separation of concerns:
services/core/src/
โโโ main.rs # Application entry point
โโโ lib.rs # Library exports
โโโ error.rs # Error types and Result
โ
โโโ domain/ # ๐๏ธ DOMAIN LAYER (Business Logic)
โ โโโ entities/ # Core business entities
โ โ โโโ event.rs # Event entity (162 tests)
โ โ โโโ tenant.rs # Multi-tenancy entity
โ โ โโโ schema.rs # Schema registry entity
โ โ โโโ projection.rs # Projection entity
โ โ โโโ event_stream.rs # ๐ Gapless event stream (9 tests)
โ โโโ value_objects/ # Self-validating value objects
โ โโโ tenant_id.rs # Tenant identifier
โ โโโ event_type.rs # Event type validation
โ โโโ entity_id.rs # Entity identifier
โ โโโ partition_key.rs # ๐ Fixed partitioning (6 tests)
โ
โโโ application/ # ๐ฏ APPLICATION LAYER (Use Cases)
โ โโโ dto/ # Data Transfer Objects
โ โ โโโ event_dto.rs # Event request/response DTOs
โ โ โโโ tenant_dto.rs # Tenant DTOs
โ โ โโโ schema_dto.rs # Schema DTOs
โ โ โโโ projection_dto.rs # Projection DTOs
โ โโโ use_cases/ # Application business logic
โ โโโ ingest_event.rs # Event ingestion (3 tests)
โ โโโ query_events.rs # Event queries (4 tests)
โ โโโ manage_tenant.rs # Tenant management (5 tests)
โ โโโ manage_schema.rs # Schema operations (4 tests)
โ โโโ manage_projection.rs # Projection ops (4 tests)
โ
โโโ infrastructure/ # ๐ง INFRASTRUCTURE LAYER (Technical)
โโโ repositories/ # ๐ Repository implementations (SierraDB)
โ โโโ in_memory_event_stream_repository.rs # Thread-safe, partitioned (8 tests)
โโโ persistence/ # ๐ Storage integrity (SierraDB)
โ โโโ storage_integrity.rs # Checksum verification (8 tests)
โโโ api.rs # REST API endpoints (38 endpoints)
โโโ store.rs # Event store implementation
โโโ storage.rs # Parquet columnar storage
โโโ wal.rs # Write-ahead log
โโโ snapshot.rs # Snapshot management
โโโ compaction.rs # Storage compaction
โโโ index.rs # High-performance indexing
โโโ projection.rs # Real-time projections
โโโ analytics.rs # Analytics engine
โโโ websocket.rs # WebSocket streaming
โโโ schema.rs # Schema validation service
โโโ replay.rs # Event replay engine
โโโ pipeline.rs # Stream processing
โโโ backup.rs # Backup management
โโโ auth.rs # Authentication/Authorization
โโโ rate_limit.rs # Rate limiting
โโโ tenant.rs # Tenant service
โโโ metrics.rs # Metrics collection
โโโ middleware.rs # HTTP middleware
โโโ config.rs # Configuration
โโโ tenant_api.rs # Tenant API handlers
โโโ auth_api.rs # Auth API handlers
tests/
โโโ integration_tests.rs # End-to-end tests
โโโ stress_tests/ # ๐ Long-running stress tests (SierraDB)
โโโ seven_day_stress.rs # 7-day corruption detection (4 tests)
benches/
โโโ performance_benchmarks.rs # Performance benchmarks
Domain Layer (Core Business Logic)
Application Layer (Orchestration)
Infrastructure Layer (Technical Details)
Add to your Cargo.toml:
[dependencies]
allsource-core = "0.7" # Pin to minor version
Or:
cargo add allsource-core@0.7
# Clone the repository
git clone https://github.com/all-source-os/chronos-monorepo.git
cd chronos-monorepo/apps/core
# Build
cargo build --release
# Run tests
cargo test
# Run benchmarks
cargo bench
# Development mode
cargo run
# Production mode (optimized)
cargo run --release
# With debug logging
RUST_LOG=debug cargo run
# Custom port (modify main.rs)
# Default: 0.0.0.0:8080
curl -X POST http://localhost:3900/api/v1/events \
-H "Content-Type: application/json" \
-d '{
"event_type": "user.created",
"entity_id": "user-123",
"payload": {
"name": "Alice",
"email": "alice@example.com"
}
}'
# Get all events for an entity
curl "http://localhost:3900/api/v1/events/query?entity_id=user-123"
# Time-travel query
curl "http://localhost:3900/api/v1/events/query?entity_id=user-123&as_of=2024-01-15T10:00:00Z"
# Query by type
curl "http://localhost:3900/api/v1/events/query?event_type=user.created&limit=10"
curl -X POST http://localhost:3900/api/v1/schemas \
-H "Content-Type: application/json" \
-d '{
"subject": "user.created",
"schema": {
"type": "object",
"required": ["name", "email"],
"properties": {
"name": {"type": "string"},
"email": {"type": "string", "format": "email"}
}
}
}'
curl -X POST http://localhost:3900/api/v1/replay \
-H "Content-Type: application/json" \
-d '{
"projection_name": "user_snapshot",
"from_timestamp": "2024-01-01T00:00:00Z",
"config": {
"batch_size": 1000,
"emit_progress": true
}
}'
curl -X POST http://localhost:3900/api/v1/pipelines \
-H "Content-Type: application/json" \
-d '{
"name": "user_analytics",
"source_event_types": ["user.created", "user.updated"],
"operators": [
{
"type": "filter",
"field": "country",
"value": "US",
"op": "eq"
},
{
"type": "reduce",
"field": "id",
"function": "count",
"group_by": "country"
}
],
"enabled": true
}'
AllSource Core enforces strict quality gates to maintain code quality, consistency, and reliability.
See: QUALITY_GATES.md for complete documentation.
# Run all quality gates (recommended before commit)
make check
# Auto-fix formatting and sorting
make format
make format-sort
# Full CI pipeline locally
make ci
Quality gates run automatically on:
main or developWorkflow: .github/workflows/rust-quality.yml
.clippy.toml - Clippy linter configuration (MSRV: 1.70.0)rustfmt.toml - Code formatting rules (max width: 100)cargo-sort.toml - Dependency sorting configurationMakefile - Quality gate commands# Run all tests
cargo test
# Run unit tests only
cargo test --lib
# Run integration tests
cargo test --test integration_tests
# Run specific test
cargo test test_replay_progress_tracking
# Run with output
cargo test -- --nocapture
# Run with logging
RUST_LOG=debug cargo test
# Run all benchmarks
cargo bench
# Run specific benchmark suite
cargo bench ingestion_throughput
cargo bench query_performance
cargo bench state_reconstruction
cargo bench concurrent_writes
# View results
open target/criterion/report/index.html
AllSource uses a polyglot architecture with specialized services:
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Go Control Plane (Port 8081) โ
โ โข Cluster Management โ
โ โข Metrics Aggregation โ
โ โข Operation Orchestration โ
โ โข Health Monitoring โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ HTTP Client
โผ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Rust Event Store (Port 8080) โ
โ โข Event Ingestion (469K/sec) โ
โ โข Query Engine (<12ฮผs) โ
โ โข Schema Registry โ
โ โข Stream Processing โ
โ โข Event Replay โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โผ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Storage Layer โ
โ โข Parquet (Columnar) โ
โ โข WAL (Durability) โ
โ โข Snapshots (Point-in-time) โ
โ โข In-Memory Indexes โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
When an event is ingested:
Storage Layer:
โโโ In-Memory Events (Vec<Event>)
โโโ DashMap Indexes (entity_id, event_type)
โโโ Parquet Files (columnar storage)
โโโ WAL Segments (append-only logs)
โโโ Snapshots (point-in-time state)
RwLock<HashMap> for readsuse allsource_core::{EventStore, Event, QueryEventsRequest};
use serde_json::json;
// Create store
let store = EventStore::new();
// Ingest events
let event = Event::new(
"user.created".to_string(),
"user-123".to_string(),
json!({
"name": "Alice",
"email": "alice@example.com"
})
);
store.ingest(event)?;
// Query events
let request = QueryEventsRequest {
entity_id: Some("user-123".to_string()),
..Default::default()
};
let events = store.query(request)?;
// Reconstruct state
let state = store.reconstruct_state("user-123", None)?;
println!("Current state: {}", state);
// Time-travel query
let timestamp = chrono::Utc::now() - chrono::Duration::hours(1);
let past_state = store.reconstruct_state("user-123", Some(timestamp))?;
println!("State 1 hour ago: {}", past_state);
use allsource_core::projection::Projection;
use allsource_core::event::Event;
use serde_json::Value;
use parking_lot::RwLock;
use std::collections::HashMap;
struct RevenueProjection {
totals: RwLock<HashMap<String, f64>>,
}
impl Projection for RevenueProjection {
fn name(&self) -> &str {
"revenue_by_customer"
}
fn process(&self, event: &Event) -> allsource_core::Result<()> {
if event.event_type == "order.completed" {
if let Some(amount) = event.payload["amount"].as_f64() {
let mut totals = self.totals.write();
*totals.entry(event.entity_id.clone()).or_insert(0.0) += amount;
}
}
Ok(())
}
fn get_state(&self, entity_id: &str) -> Option<Value> {
self.totals.read()
.get(entity_id)
.map(|total| json!({ "total_revenue": total }))
}
fn clear(&self) {
self.totals.write().clear();
}
}
# Find process using port 8080
lsof -i :8080
# Kill process
kill -9 <PID>
# Always use release mode for benchmarks
cargo run --release
cargo bench
# Check system resources
top -o cpu
# Monitor memory usage
RUST_LOG=allsource_core=debug cargo run --release
# Reduce batch sizes in config
# Adjust snapshot_config.max_events_before_snapshot
# Clean build
cargo clean
cargo test
# Check for stale processes
killall allsource-core
# Verbose test output
cargo test -- --nocapture --test-threads=1
Contributions are welcome! Areas of interest:
MIT License - see LICENSE file for details
AllSource Core - Event sourcing, done right
Built with ๐ฆ Rust | Clean Architecture | Made for Production
Version 0.7.1 | 469K events/sec | 470+ tests passing | Security & Cloud-Native