| Crates.io | crdt-lite |
| lib.rs | crdt-lite |
| version | 0.8.0 |
| created_at | 2024-09-22 15:38:11.646802+00 |
| updated_at | 2025-11-17 11:27:59.084897+00 |
| description | A lightweight, column-based CRDT implementation in Rust |
| homepage | https://github.com/sinkingsugar/crdt-lite |
| repository | https://github.com/sinkingsugar/crdt-lite |
| max_upload_size | |
| id | 1383057 |
| size | 711,633 |
[!WARNING] This project is in early development and not intended for production use.
A lightweight implementation of Conflict-free Replicated Data Types (CRDTs) in both Rust and C++. CRDT-Lite provides generic CRDT structures for building distributed systems requiring eventual consistency.
CRDT-Lite is currently being used in Formabble, a collaborative game engine, and will be integrated into a derived product that we will announce soon.
This library includes two CRDT implementations:
src/lib.rs, C++: crdt.hpp) - Generic key-value store with fine-grained column-level conflict resolutiontext_crdt.hpp) - Line-based collaborative text editor with fractional positioningBoth Rust and C++ implementations share the same core algorithms and maintain API compatibility.
no_std compatible - Works in embedded systems and other no_std environments (requires alloc)no_std modeno_std EnvironmentsThe Rust implementation supports no_std environments with allocator support.
Requirements:
alloc crate required: This library needs an allocator for Vec, HashMap, Arc, etc.#![no_std]
extern crate alloc;
use crdt_lite::CRDT;
// ... use as normal
Cargo.toml configuration:
[dependencies]
# For no_std with basic CRDT functionality (requires alloc feature)
crdt-lite = { version = "0.8", default-features = false, features = ["alloc"] }
# For no_std with JSON serialization
crdt-lite = { version = "0.8", default-features = false, features = ["alloc", "json"] }
# For no_std with binary serialization (bincode)
crdt-lite = { version = "0.8", default-features = false, features = ["alloc", "binary"] }
# For standard environments (default, uses std::collections::HashMap)
crdt-lite = { version = "0.8", features = ["json"] }
Implementation Notes:
no_std mode uses hashbrown::HashMap, which is the same underlying implementation that std::collections::HashMap uses (identical performance)std feature is enabled by default for backwards compatibilityalloc feature is required for no_std environments and pulls in hashbrown only when neededhashbrown is not compiled, reducing dependency bloatbincode 2.0 which has full no_std supportBy default, NodeId is u64. For applications using UUID-based node identifiers, enable the node-id-u128 feature:
[dependencies]
crdt-lite = { version = "0.8", features = ["node-id-u128"] }
Why u128?
u64 provides 2^64 (~18 quintillion) unique IDsu128 provides 2^128 unique IDs, eliminating collision concerns for UUID-based systemsCrdtNodeId via preprocessor (see crdt.hpp)By default, records are stored in a HashMap for O(1) operations. Enable the sorted-keys feature to use BTreeMap for ordered iteration and range queries:
[dependencies]
crdt-lite = { version = "0.8", features = ["sorted-keys"] }
Use Cases:
"session-{uuid}-{index}"Example:
use crdt_lite::CRDT;
let mut crdt: CRDT<String, String, String> = CRDT::new(1, None);
// Insert records with composite keys
crdt.insert_or_update(&"session-abc-001".to_string(),
vec![("data".to_string(), "first".to_string())]);
crdt.insert_or_update(&"session-abc-002".to_string(),
vec![("data".to_string(), "second".to_string())]);
crdt.insert_or_update(&"session-xyz-001".to_string(),
vec![("data".to_string(), "other".to_string())]);
// Range query - get all session-abc records
for (key, record) in crdt.range("session-abc-".."session-abd-") {
println!("Found: {:?}", record);
}
Performance:
Important Notes:
K: Ord trait bound (String, u64, etc. already implement this)range() only queries local records (not parent CRDT in hierarchies)cargo add crdt-lite
# or add to Cargo.toml: crdt-lite = "0.8"
use crdt_lite::{CRDT, DefaultMergeRule};
// Create two CRDT nodes
let mut node1: CRDT<String, String, String> = CRDT::new(1, None);
let mut node2: CRDT<String, String, String> = CRDT::new(2, None);
// Node 1: Insert data
let changes1 = node1.insert_or_update(
&"user1".to_string(),
vec![
("name".to_string(), "Alice".to_string()),
("age".to_string(), "30".to_string()),
],
);
// Node 2: Insert conflicting data
let changes2 = node2.insert_or_update(
&"user1".to_string(),
vec![
("name".to_string(), "Bob".to_string()),
("age".to_string(), "25".to_string()),
],
);
// Merge changes bidirectionally
let merge_rule = DefaultMergeRule;
node1.merge_changes(changes2, &merge_rule);
node2.merge_changes(changes1, &merge_rule);
// Both nodes converge to same state (node2 wins due to higher node_id)
assert_eq!(node1.get_data(), node2.get_data());
Run tests:
cargo test
Compile and run:
# Column CRDT tests
g++ -std=c++20 -o crdt tests.cpp && ./crdt
# Text CRDT tests
g++ -std=c++20 -o text_crdt test_text_crdt.cpp && ./text_crdt
Column CRDT example:
#include "crdt.hpp"
CRDT<std::string, std::string> node1(1);
CRDT<std::string, std::string> node2(2);
// Node 1: Insert data
std::vector<Change<std::string, std::string>> changes1;
node1.insert_or_update("user1", changes1,
std::make_pair("name", "Alice"),
std::make_pair("age", "30")
);
// Node 2: Insert conflicting data
std::vector<Change<std::string, std::string>> changes2;
node2.insert_or_update("user1", changes2,
std::make_pair("name", "Bob"),
std::make_pair("age", "25")
);
// Merge changes bidirectionally
node1.merge_changes(std::move(changes2));
node2.merge_changes(std::move(changes1));
// Both nodes converge
assert(node1.get_data() == node2.get_data());
Text CRDT example:
#include "text_crdt.hpp"
TextCRDT<std::string> doc1(1);
TextCRDT<std::string> doc2(2);
// Both nodes insert lines concurrently
auto id1 = doc1.insert_line_at_end("Line from Node 1");
auto id2 = doc2.insert_line_at_end("Line from Node 2");
// Sync changes
uint64_t sync_version = 0;
auto changes1 = doc1.get_changes_since(sync_version);
auto changes2 = doc2.get_changes_since(sync_version);
doc2.merge_changes(changes1);
doc1.merge_changes(changes2);
// Both nodes have both lines
assert(doc1.line_count() == 2);
assert(doc2.line_count() == 2);
Records are stored as maps of columns (field names) to values. Each column has independent version tracking:
// Rust
pub struct Record<C, V> {
pub fields: HashMap<C, V>,
pub column_versions: HashMap<C, ColumnVersion>,
// Version boundaries for efficient sync
pub lowest_local_db_version: u64,
pub highest_local_db_version: u64,
}
Why column-based?
Conflicts are resolved deterministically using a three-tier comparison:
This ordering ensures:
Maintains causality using Lamport-style logical clocks:
impl LogicalClock {
// Local operation
pub fn tick(&mut self) -> u64 {
self.time += 1;
self.time
}
// Receive remote change
pub fn update(&mut self, received_time: u64) -> u64 {
self.time = self.time.max(received_time);
self.time += 1;
self.time
}
}
Important: Always update clock on merge, even for rejected changes (prevents clock drift).
Deleted records are marked with tombstones rather than immediately removed:
pub struct TombstoneInfo {
pub db_version: u64,
pub node_id: NodeId,
pub local_db_version: u64,
}
⚠️ Critical: Tombstone Management
Tombstones accumulate indefinitely unless compacted. To prevent memory exhaustion:
compact_tombstones(min_acknowledged_version) periodicallyIndividual fields can be deleted from records without removing the entire record:
// Rust
let change = crdt.delete_field(&record_id, &"email".to_string());
// C++
std::vector<Change<std::string, std::string>> changes;
bool success = crdt.delete_field(record_id, "email", changes);
How it works:
record.fields mapColumnVersion entry remains in record.column_versions (acts as implicit field tombstone)Change { col_name: Some("field"), value: None }col_version)Distinguished from null values:
Each line in the text CRDT has a position defined by a path of integers:
struct FractionalPosition {
std::vector<uint64_t> path;
// Examples:
// [10000] - First line
// [10000, 5000] - Between first and second level
// [20000] - After [10000]
};
Properties:
The Rust implementation includes an optional persistence layer with WAL (Write-Ahead Log) and snapshots for durability and crash recovery.
[dependencies]
# Basic persistence with bincode (legacy)
crdt-lite = { version = "0.8", features = ["persist"] }
# MessagePack persistence with schema evolution support (recommended)
crdt-lite = { version = "0.8", features = ["persist-msgpack"] }
# With optional compression (50-70% additional size reduction)
crdt-lite = { version = "0.8", features = ["persist-compressed"] }
use crdt_lite::persist::{PersistedCRDT, PersistConfig};
use std::path::PathBuf;
// Open or create a persisted CRDT (uses MessagePack by default)
let mut pcrdt = PersistedCRDT::<String, String, String>::open(
PathBuf::from("./data"),
1, // node_id
PersistConfig::default(),
)?;
// Use like a normal CRDT - all operations are automatically persisted
pcrdt.insert_or_update(
&"user1".to_string(),
vec![
("name".to_string(), "Alice".to_string()),
("email".to_string(), "alice@example.com".to_string()),
],
)?;
// Changes are persisted to WAL and automatically recovered on crash
use crdt_lite::persist::{PersistConfig, SnapshotFormat};
let config = PersistConfig {
// Snapshot creation
snapshot_threshold: 1000, // Create snapshot every 1000 changes
snapshot_interval_secs: Some(300), // Or every 5 minutes (for low-activity nodes)
// MessagePack + Incremental Snapshots (NEW in v0.6.0)
snapshot_format: SnapshotFormat::MessagePack, // Default: schema evolution support
enable_incremental_snapshots: true, // Default: 95% I/O reduction
full_snapshot_interval: 10, // Full snapshot every 10 incrementals
enable_compression: false, // Optional: further 50-70% reduction
// Cleanup
auto_cleanup_snapshots: Some(3), // Keep 3 most recent snapshots
max_batch_size: Some(10000), // Auto-flush batch at 10k changes
};
let mut pcrdt = PersistedCRDT::<String, String, String>::open(
PathBuf::from("./data"),
1,
config,
)?;
Why MessagePack + Incremental Snapshots?
#[serde(default)])MessagePack allows you to add new fields to your data structures without breaking old snapshots. Here's how:
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
struct User {
name: String,
age: u32,
}
// Use String as value type for simplicity (store as JSON/msgpack bytes)
type UserCRDT = PersistedCRDT<String, String, String>;
Later, you want to add email and premium fields:
#[derive(Debug, Clone, Serialize, Deserialize)]
struct User {
name: String,
age: u32,
// New fields with #[serde(default)]
#[serde(default)]
email: String, // Defaults to "" for old snapshots
#[serde(default)]
premium: bool, // Defaults to false for old snapshots
}
// Old snapshot with v1.0 schema loads fine
let pcrdt = PersistedCRDT::<String, String, String>::open(
PathBuf::from("./data"),
1,
PersistConfig::default(),
)?;
// Records from old snapshot have default values for new fields
let record = pcrdt.crdt().get_record(&"user1".to_string()).unwrap();
// email = "" (default)
// premium = false (default)
✅ DO:
// Add new fields with #[serde(default)]
#[serde(default)]
email: String,
// Use Option for truly optional fields
#[serde(default)]
phone: Option<String>, // Defaults to None
// Provide custom defaults
#[serde(default = "default_role")]
role: String,
fn default_role() -> String {
"user".to_string()
}
❌ DON'T:
// Without #[serde(default)] - BREAKS old snapshots!
email: String, // ❌ Deserialization fails on old data
// Removing fields - BREAKS old snapshots!
// (old snapshots have data for removed field)
// Changing field types - BREAKS old snapshots!
age: String, // Was u32, now String - ❌ fails
If you must make breaking changes (rename/remove fields, change types):
// Option 1: Keep old field, add new field
#[derive(Debug, Clone, Serialize, Deserialize)]
struct User {
#[serde(default)]
age_old: u32, // Keep for compatibility
#[serde(default)]
age: String, // New field
// ... rest
}
// On load, migrate old → new
if user.age.is_empty() && user.age_old > 0 {
user.age = user.age_old.to_string();
}
// Option 2: Version your schema
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "version")]
enum UserVersioned {
#[serde(rename = "1")]
V1 { name: String, age: u32 },
#[serde(rename = "2")]
V2 { name: String, age: String, email: String },
}
MessagePack (Recommended):
// v1.0 snapshot
{ name: "Alice", age: 30 }
// v1.1 code loads v1.0 snapshot
// ✅ Works! Missing fields get defaults
User { name: "Alice", age: 30, email: "", premium: false }
Bincode (Legacy):
// v1.0 snapshot
[2 fields] "Alice" 30
// v1.1 code expects 4 fields
[4 fields] expected ← ❌ ERROR: field count mismatch
This is why we recommend persist-msgpack over persist (bincode) for production use.
The persistence layer provides three types of hooks for integration with backup systems, network layers, etc:
Called after changes are written to WAL (before fsync). Use for broadcasting changes to other nodes:
use std::sync::mpsc::Sender;
let (tx, rx) = std::sync::mpsc::channel();
pcrdt.add_post_hook(Box::new(move |changes| {
// Send to network layer for broadcast
let _ = tx.send(changes.to_vec());
}));
Called after a snapshot is created and sealed. Use for uploading to cloud storage:
pcrdt.add_snapshot_hook(Box::new(move |snapshot_path| {
let path = snapshot_path.clone();
// Spawn async upload task
tokio::spawn(async move {
let data = tokio::fs::read(&path).await.unwrap();
// Upload to R2, S3, etc.
// r2.put(format!("snapshots/{}", path.file_name()?), data).await?;
// Mark as uploaded for safe cleanup
// pcrdt.lock().unwrap().mark_snapshot_uploaded(path);
});
}));
Called after a WAL segment is sealed (rotated). Use for archival:
pcrdt.add_wal_segment_hook(Box::new(move |segment_path| {
let path = segment_path.clone();
tokio::spawn(async move {
let data = tokio::fs::read(&path).await.unwrap();
// Archive to cloud storage
// r2.put(format!("wal/{}", path.file_name()?), data).await?;
});
}));
Accumulate changes for efficient network broadcasting:
// Perform multiple operations
pcrdt.insert_or_update(&"user1".to_string(), fields1)?;
pcrdt.insert_or_update(&"user2".to_string(), fields2)?;
// Collect all changes since last call
let batch = pcrdt.take_batch();
// Broadcast to other nodes
broadcast_to_peers(batch);
Auto-flush protection: By default, the batch is cleared when it reaches 10,000 changes to prevent OOM. Call take_batch() regularly to avoid losing changes.
For cloud backup workflows, track which files have been uploaded before deleting them:
// After successful upload
pcrdt.mark_snapshot_uploaded(snapshot_path);
pcrdt.mark_wal_segment_uploaded(segment_path);
// Cleanup only uploaded files (safe - won't lose data)
pcrdt.cleanup_old_snapshots(2, true)?; // Keep 2, require uploaded
pcrdt.cleanup_old_wal_segments(5, true)?; // Keep 5, require uploaded
// Or cleanup all old files (after snapshot creation)
pcrdt.cleanup_old_snapshots(2, false)?; // Unconditional cleanup
Recovery is automatic on open():
// After crash, simply open again
let pcrdt = PersistedCRDT::<String, String, String>::open(
PathBuf::from("./data"),
1,
PersistConfig::default(),
)?;
// All data is recovered from snapshot + WAL replay
Design choice: WAL writes are buffered by the OS but NOT fsynced per-operation for CRDT performance:
| Failure Type | Data Loss | Why |
|---|---|---|
| Process crash | None | OS page cache survives process termination |
| Kernel panic | ~0-30s | Depends on kernel writeback timing (typically 30s) |
| Power failure | Up to snapshot_threshold ops |
Unflushed WAL + page cache lost |
What this means:
Snapshots provide:
This design prioritizes CRDT convergence and performance over single-node durability. For stronger local durability:
snapshot_threshold (e.g., 10-50 for single-node deployments)After all nodes acknowledge a version, compact tombstones to reclaim memory:
// Track acknowledgments from all nodes
let min_acknowledged = get_min_ack_from_all_nodes();
// Atomically compact tombstones and cleanup WAL
pcrdt.compact_tombstones(min_acknowledged)?;
Critical: Only compact after ALL nodes acknowledge the version, or deleted records will reappear (zombie records).
See working examples in the repository:
examples/persistence_example.rs - Basic persistence with hooksexamples/r2_backup_example.rs - Cloud backup workflow with R2Run with:
cargo run --example persistence_example --features persist
cargo run --example r2_backup_example --features persist
// Rust
let changes = node1.get_changes_since(last_db_version);
node2.merge_changes(changes, &DefaultMergeRule);
// Optionally exclude changes from specific nodes
let excluding = HashSet::from([node2_id]);
let changes = node1.get_changes_since_excluding(last_db_version, &excluding);
// C++
auto changes = node1.get_changes_since(last_db_version);
node2.merge_changes(std::move(changes));
// Or use the helper function
uint64_t last_sync = 0;
sync_nodes(node1, node2, last_sync);
When syncing with parent-child CRDTs or after accumulating many changes:
// Rust
CRDT::<String, String, String>::compress_changes(&mut changes);
// C++
CRDT<K, V>::compress_changes(changes);
This removes redundant operations (O(n log n)):
Create temporary overlays or transaction isolation:
// Rust
use std::sync::Arc;
let parent = Arc::new(CRDT::<String, String, String>::new(1, None));
let child = CRDT::new(2, Some(parent.clone()));
// Child sees parent data but maintains separate modifications
// C++
auto parent = std::make_shared<CRDT<K, V>>(1);
CRDT<K, V> child(2, parent);
// Generate inverse changes to undo child's work
auto revert_changes = child.revert();
// Compute difference between two CRDTs
auto diff = child.diff(other_crdt);
// Rust
struct CustomMergeRule;
impl<K, C, V> MergeRule<K, C, V> for CustomMergeRule {
fn should_accept(
&self,
local_col: u64, local_db: u64, local_node: NodeId,
remote_col: u64, remote_db: u64, remote_node: NodeId,
) -> bool {
// Custom logic here
remote_col > local_col
}
}
crdt.merge_changes(changes, &CustomMergeRule);
// C++
template <typename K, typename V, typename Context = void>
struct CustomMergeRule {
constexpr bool operator()(
const Change<K, V> &local,
const Change<K, V> &remote) const {
// Custom logic here
return remote.col_version > local.col_version;
}
};
crdt.merge_changes<false, void, CustomMergeRule<K, V>>(
std::move(changes), false, CustomMergeRule<K, V>()
);
Last-Write-Wins (default):
doc.merge_changes(remote_changes);
Both-Writes-Win (preserve conflicts):
BothWritesWinMergeRule<std::string, std::string> bww;
doc.merge_changes_with_rule(remote_changes, bww);
// Check for conflicts
auto line = doc.get_line(line_id);
if (line->has_conflicts()) {
auto all_versions = line->get_all_versions();
// Display all concurrent edits to user
}
⚠️ Auto-Merge (EXPERIMENTAL - DO NOT USE):
The AutoMergingTextRule is currently broken and violates CRDT convergence guarantees. See CLAUDE.md for details.
⚠️ This is a data structure library, not a complete distributed system. Security must be implemented at higher layers:
Tombstone Accumulation
crdt.tombstone_count()Resource Exhaustion
Clock Manipulation
db_version to win all conflictsArc<Mutex<CRDT>> in Rust, external locks in C++| Operation | Average Case | Notes |
|---|---|---|
insert_or_update |
O(n) | n = number of fields |
delete_field |
O(1) | HashMap field removal |
delete_record |
O(1) | HashMap record removal |
merge_changes |
O(c) | c = number of changes |
get_changes_since |
O(r × f) | r = records, f = fields (optimized with version bounds) |
compress_changes |
O(n log n) | Uses unstable sort for better performance |
compact_tombstones |
O(t) | t = number of tombstones |
| C++ | Rust |
|---|---|
CRDT<K, V> crdt(node_id); |
let mut crdt = CRDT::<K, C, V>::new(node_id, None); |
crdt.insert_or_update(id, changes, pair1, pair2); |
let changes = crdt.insert_or_update(&id, vec![pair1, pair2]); |
crdt.delete_field(id, "field", changes); |
if let Some(change) = crdt.delete_field(&id, &"field") { ... } |
crdt.delete_record(id, changes); |
if let Some(change) = crdt.delete_record(&id) { ... } |
crdt.merge_changes(std::move(changes)); |
crdt.merge_changes(changes, &DefaultMergeRule); |
auto* record = crdt.get_record(id); |
let record = crdt.get_record(&id); |
CLAUDE.md - Comprehensive technical documentation for developers (and Claude!)async support for network operationsserde integration for serialization (v0.2.0)no_std + alloc (v0.2.0)# Rust - All tests
cargo test
# Rust - Persistence layer tests (bincode)
cargo test --features persist
# Rust - MessagePack + incremental snapshots tests
cargo test --features persist-msgpack
# Rust - With compression
cargo test --features persist-compressed
# C++ - Column CRDT
g++ -std=c++20 tests.cpp -o tests && ./tests
# C++ - Text CRDT
g++ -std=c++20 test_text_crdt.cpp -o test_text_crdt && ./test_text_crdt
Contributions are welcome! Please ensure:
Rust:
cargo test)cargo fmt)cargo clippy)C++:
This project is licensed under the MIT License - see the LICENSE file for details.
CRDT-Lite offers a streamlined approach to conflict-free replicated data types, balancing simplicity and efficiency. By focusing on fine-grained conflict resolution and deterministic merge semantics, CRDT-Lite is well-suited for applications requiring scalability and low overhead in distributed environments.