| Crates.io | replication-engine |
| lib.rs | replication-engine |
| version | 0.1.5 |
| created_at | 2026-01-04 08:21:57.180881+00 |
| updated_at | 2026-01-10 08:10:45.922055+00 |
| description | Mesh replication agent for sync-engine nodes |
| homepage | |
| repository | https://github.com/transilluminate/replication-engine |
| max_upload_size | |
| id | 2021568 |
| size | 578,776 |
Mesh replication agent for sync-engine nodes with bidirectional data synchronization
replication-engine provides reliable data sync across a cluster of sync-engine instances using two complementary strategies:
This dual approach ensures both speed (hot path catches changes immediately) and reliability (cold path catches anything missed due to network issues, stream trimming, or restarts).
Node A Node B
┌─────────────────┐ ┌─────────────────┐
│ sync-engine │ │ sync-engine │
│ writes to cdc │ │ │
└────────┬────────┘ └────────▲────────┘
│ │
│ CDC events │ submit()
▼ │
┌─────────────────┐ tail stream ┌─────────────────┐
│ replication-eng │◄────────────────────│ replication-eng │
│ (A's instance) │ (via A's Redis) │ (B's instance) │
└─────────────────┘ └─────────────────┘
│ │
│ │
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ SQLite Cursors │ │ SQLite Cursors │
│ (crash-safe) │ │ (crash-safe) │
└─────────────────┘ └─────────────────┘
rediss:// URLs for encrypted peer connectionsAdd to your Cargo.toml:
[dependencies]
replication-engine = "0.1.5"
tokio = { version = "1", features = ["full"] }
Basic usage:
use replication_engine::{ReplicationEngine, ReplicationConfig, PeerConfig};
use tokio::sync::watch;
#[tokio::main]
async fn main() {
let config = ReplicationConfig {
local_node_id: "uk.node.london-1".into(),
peers: vec![
PeerConfig::for_testing(
"uk.node.manchester-1",
"redis://peer1:6379"
),
],
..Default::default()
};
let (_tx, rx) = watch::channel(config.clone());
let mut engine = ReplicationEngine::new(config, rx);
engine.start().await.expect("Failed to start");
// Engine runs until shutdown signal
tokio::signal::ctrl_c().await.unwrap();
engine.shutdown().await;
}
The hot path tails each peer's CDC stream for low-latency replication:
┌──────────────────────────────────────────────────────────────┐
│ Hot Path │
│ XREAD cdc → Parse CDC → Dedup → submit() → Cursor │
└──────────────────────────────────────────────────────────────┘
Flow:
XREAD BLOCK 5000 on peer's cdc streamis_current(key, hash) checksubmit()/delete()Adaptive Batch Sizing: When enabled, uses AIMD (Additive Increase, Multiplicative Decrease):
The cold path periodically verifies consistency via Merkle tree comparison:
┌──────────────────────────────────────────────────────────────┐
│ Cold Path │
│ Compare roots → Drill down → Fetch divergent → Submit │
└──────────────────────────────────────────────────────────────┘
Flow:
Optimizations:
All configuration options with defaults:
| Option | Default | Description |
|---|---|---|
| Identity | ||
local_node_id |
Required | This node's unique identifier |
| Hot Path | ||
settings.hot_path.enabled |
true |
Enable CDC stream tailing |
settings.hot_path.batch_size |
100 |
Initial XREAD batch size |
settings.hot_path.block_timeout |
"5s" |
XREAD block timeout |
settings.hot_path.adaptive_batch_size |
false |
Enable AIMD batch sizing |
settings.hot_path.min_batch_size |
50 |
Minimum batch size (AIMD) |
settings.hot_path.max_batch_size |
1000 |
Maximum batch size (AIMD) |
settings.hot_path.rate_limit_enabled |
false |
Enable rate limiting (thundering herd prevention) |
settings.hot_path.rate_limit_per_sec |
10000 |
Max events/second (sustained rate) |
settings.hot_path.rate_limit_burst |
1000 |
Max burst size above sustained rate |
| Cold Path | ||
settings.cold_path.enabled |
true |
Enable Merkle anti-entropy |
settings.cold_path.interval_sec |
60 |
Seconds between repair cycles |
settings.cold_path.max_items_per_cycle |
1000 |
Max items to repair per cycle |
settings.cold_path.backoff_base_sec |
5 |
Base backoff on failure |
settings.cold_path.backoff_max_sec |
300 |
Maximum backoff (5 min) |
| Peer Health | ||
settings.peer_health.enabled |
true |
Enable idle peer health checks |
settings.peer_health.ping_interval_sec |
30 |
Seconds between pings |
settings.peer_health.idle_threshold_sec |
60 |
Idle time before ping |
| SLO Thresholds | ||
settings.slo.max_stream_read_latency_ms |
100 |
Warn if XREAD exceeds (ms) |
settings.slo.max_peer_op_latency_ms |
500 |
Warn if peer op exceeds (ms) |
settings.slo.max_batch_flush_latency_ms |
200 |
Warn if flush exceeds (ms) |
settings.slo.max_replication_lag_sec |
30 |
Warn if lag exceeds (sec) |
| Peers | ||
peers[].node_id |
Required | Peer's unique node ID |
peers[].redis_url |
Required | Redis URL for CDC stream |
peers[].priority |
0 |
Sync priority (lower = higher) |
peers[].circuit_failure_threshold |
5 |
Failures before circuit opens |
peers[].circuit_reset_timeout_sec |
30 |
Seconds before retry |
| Cursor Persistence | ||
cursor.sqlite_path |
"replication_cursors.db" |
SQLite database path |
cursor.wal_mode |
true |
Use WAL mode |
replication:
local_node_id: "uk.node.london-1"
settings:
hot_path:
enabled: true
batch_size: 100
block_timeout: "5s"
adaptive_batch_size: true
min_batch_size: 50
max_batch_size: 1000
rate_limit_enabled: true
rate_limit_per_sec: 10000
rate_limit_burst: 1000
cold_path:
enabled: true
interval_sec: 60
max_items_per_cycle: 1000
peer_health:
enabled: true
ping_interval_sec: 30
slo:
max_replication_lag_sec: 60
peers:
- node_id: "uk.node.manchester-1"
redis_url: "redis://peer1:6379" # or rediss:// for TLS
priority: 0
circuit_failure_threshold: 5
circuit_reset_timeout_sec: 30
- node_id: "uk.node.edinburgh-1"
redis_url: "rediss://user:pass@peer2:6379" # TLS + auth
priority: 1
cursor:
sqlite_path: "/var/lib/redsqrl/replication_cursors.db"
wal_mode: true
Comprehensive test suite with 200+ tests covering unit, property-based, chaos, and integration testing:
| Test Suite | Count | Description |
|---|---|---|
| Unit Tests | 231 ✅ | Fast, no external deps |
| Property Tests | 16 ✅ | Proptest fuzzing for invariants |
| Chaos Tests | 11 ✅ | Failure injection, corruption handling |
| Integration Tests | 44 ✅ | Real Redis via testcontainers |
| Total | 280+ ✅ | ~85% code coverage |
Three fuzz targets with ~4 million runs and zero crashes:
| Target | Runs | Description |
|---|---|---|
fuzz_decompress |
321K | Arbitrary byte decompression |
fuzz_stream_id |
1.7M | Stream ID comparison |
fuzz_lag_calc |
1.86M | Lag calculation |
# Unit tests (fast, no Docker)
cargo test --lib
# Property-based tests
cargo test --test property_tests
# Chaos tests (no Docker)
cargo test --test chaos_tests
# Integration tests (requires Docker/OrbStack)
cargo test --test integration -- --ignored
# All tests
cargo test -- --include-ignored
# Fuzz testing (requires nightly)
cargo +nightly fuzz run fuzz_decompress -- -max_total_time=60
# Coverage
cargo llvm-cov --all-features --lib --tests -- --include-ignored
Prometheus-style metrics exposed for operational visibility:
| Metric | Type | Description |
|---|---|---|
| Hot Path | ||
replication_cdc_events_read_total |
Counter | CDC events read from peers |
replication_cdc_events_applied_total |
Counter | Events applied to sync-engine |
replication_cdc_events_deduped_total |
Counter | Events skipped (already current) |
replication_batch_flush_total |
Counter | Batch flushes to sync-engine |
replication_batch_flush_duration_seconds |
Histogram | Flush latency |
replication_lag_ms |
Gauge | Cursor lag behind stream head |
replication_lag_events |
Gauge | Estimated events behind |
replication_backpressure_pauses_total |
Counter | Ingestion pauses due to sync-engine pressure |
| Cold Path | ||
replication_repair_cycles_total |
Counter | Merkle repair cycles run |
replication_repair_items_fetched_total |
Counter | Items fetched from peers |
replication_repair_items_submitted_total |
Counter | Items submitted to sync-engine |
replication_merkle_divergences_total |
Counter | Divergent branches found |
| Peer Health | ||
replication_peer_connected |
Gauge | 1 if connected, 0 if not |
replication_peer_circuit_state |
Gauge | 0=closed, 1=half-open, 2=open |
replication_peer_latency_seconds |
Histogram | Peer operation latency |
| Engine State | ||
replication_engine_state |
Gauge | 0=stopped, 1=starting, 2=running, 3=stopping |
src/
├── lib.rs # Public API exports
├── config.rs # Configuration types
├── error.rs # Error types
├── sync_engine.rs # SyncEngineRef trait + SyncItem integration
├── cursor.rs # SQLite cursor persistence
├── peer.rs # Peer connection management
├── stream.rs # CDC stream parsing + decompression
├── batch.rs # Batch processor with deduplication
├── circuit_breaker.rs # Circuit breaker pattern
├── resilience.rs # Retry, rate limiting, bulkhead
├── metrics.rs # Prometheus metrics
└── coordinator/
├── mod.rs # Main ReplicationEngine
├── types.rs # State types
├── hot_path.rs # Stream tailing task
└── cold_path.rs # Merkle repair task
tests/
├── property_tests.rs # Proptest-based property testing
├── chaos_tests.rs # Failure injection, corruption handling
├── integration.rs # Testcontainers integration tests
└── common/
├── mod.rs # Test utilities
├── containers.rs # Redis testcontainer helpers
└── mock_sync.rs # Mock SyncEngineRef for testing
fuzz/
└── fuzz_targets/
├── fuzz_decompress.rs # Arbitrary byte decompression
├── fuzz_stream_id.rs # Stream ID comparison
└── fuzz_lag_calc.rs # Lag calculation
The replication-engine is designed to be instantiated by a parent daemon:
// In daemon startup:
let sync_engine = SyncEngine::new(sync_config, sync_config_rx);
sync_engine.start().await?;
let replication_engine = ReplicationEngine::with_sync_engine(
replication_config,
replication_config_rx,
Arc::new(sync_engine),
);
replication_engine.start().await?;
// Graceful shutdown
replication_engine.shutdown().await;
sync_engine.shutdown().await;
GNU Affero General Public License v3.0 (AGPL-3.0)
For commercial licensing options, contact: adrian.j.robinson@gmail.com