kotoba-db-cluster

Crates.iokotoba-db-cluster
lib.rskotoba-db-cluster
version0.1.21
created_at2025-09-19 15:48:35.512534+00
updated_at2025-09-19 15:48:35.512534+00
descriptionDistributed clustering and consensus for KotobaDB
homepagehttps://github.com/com-junkawasaki/kotoba
repositoryhttps://github.com/com-junkawasaki/kotoba
max_upload_size
id1846629
size128,006
Jun Kawasaki (jun784)

documentation

https://docs.rs/kotoba-db-cluster

README

KotobaDB Cluster

Distributed clustering and consensus for KotobaDB. Provides high availability, fault tolerance, and horizontal scalability through Raft consensus and data partitioning.

Features

  • Raft Consensus: Leader election and log replication for strong consistency
  • Automatic Failover: Transparent leader failover with minimal downtime
  • Horizontal Scaling: Data partitioning across multiple nodes
  • Fault Tolerance: Survives node failures through replication
  • Eventual Consistency: Tunable consistency levels for different workloads
  • gRPC Communication: Efficient protobuf-based network communication

Architecture

┌─────────────────────────────────────────┐
│            Application Layer            │
├─────────────────────────────────────────┤
│        KotobaCluster High-Level API     │
│    ┌─────────────────────────────────┐  │
│    │    Consensus (Raft)            │  │
│    │    Membership Management       │  │
│    │    Data Partitioning           │  │
│    │    Replication Manager         │  │
│    └─────────────────────────────────┘  │
├─────────────────────────────────────────┤
│        Network Communication Layer      │
│    ┌─────────────────────────────────┐  │
│    │    gRPC Services               │  │
│    │    Message Routing             │  │
│    │    Connection Management       │  │
│    └─────────────────────────────────┘  │
└─────────────────────────────────────────┘

Quick Start

Add to your Cargo.toml:

[dependencies]
kotoba-db-cluster = "0.1.0"

Basic Cluster Setup

use kotoba_db_cluster::*;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create cluster configuration
    let config = ClusterConfig {
        replication_factor: 3,
        partition_count: 64,
        ..Default::default()
    };

    // Create and start cluster node
    let node_id = NodeId("node-1".to_string());
    let mut cluster = KotobaCluster::new(node_id, config).await?;

    // Start the cluster on a network address
    let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
    cluster.start(addr).await?;

    println!("Cluster node started on {}", addr);

    // Add database instance
    // let db = /* your KotobaDB instance */;
    // cluster.add_database(db).await?;

    // Keep running
    tokio::signal::ctrl_c().await?;
    cluster.stop().await?;

    Ok(())
}

Cluster Operations

// Execute distributed operations
let operation = Operation::CreateNode {
    properties: {
        let mut props = HashMap::new();
        props.insert("name".to_string(), Value::String("Alice".to_string()));
        props.insert("age".to_string(), Value::Int(30));
        props
    }
};

let result_cid = cluster.execute_operation(operation).await?;
println!("Created node with CID: {}", result_cid);

// Execute distributed queries
let query = DistributedQuery::MultiPartition {
    query: Query // Your query here
};
let results = cluster.execute_query(query).await?;

// Monitor cluster health
let status = cluster.get_status().await;
println!("Cluster has {} active nodes", status.membership.active_nodes);
println!("Replication health: {}", status.replication.is_healthy);

// Subscribe to cluster events
let mut events = cluster.subscribe_events().await;
while let Ok(event) = events.recv().await {
    match event {
        ClusterEvent::NodeJoined(node) => println!("Node {} joined", node.0),
        ClusterEvent::NodeFailed(node) => println!("Node {} failed", node.0),
        _ => {}
    }
}

Configuration

Cluster Configuration

let config = ClusterConfig {
    nodes: HashMap::new(), // Will be populated dynamically
    replication_factor: 3, // Number of data replicas
    partition_count: 64,   // Number of data partitions
};

Membership Configuration

let membership_config = MembershipConfig {
    heartbeat_interval: Duration::from_secs(1),
    failure_detection_interval: Duration::from_secs(5),
    max_missed_heartbeats: 3,
    failure_timeout: Duration::from_secs(15),
    gossip_interval: Duration::from_secs(2),
};

Replication Configuration

let replication_config = ReplicationConfig {
    replication_factor: 3,
    max_retries: 3,
    status_check_interval: Duration::from_secs(5),
    queue_processing_interval: Duration::from_millis(100),
    full_sync_interval: Duration::from_secs(300),
    node_failure_timeout: Duration::from_secs(30),
    failure_rate: 0.01,
};

Consensus Algorithm (Raft)

How It Works

  1. Leader Election: Nodes elect a leader through voting
  2. Log Replication: Leader replicates operations to followers
  3. Commitment: Operations are committed when majority acknowledge
  4. Failover: New leader elected if current leader fails

Safety Guarantees

  • Election Safety: At most one leader per term
  • Leader Append-Only: Leaders never overwrite log entries
  • Log Matching: Logs have consistent prefixes
  • Leader Completeness: Committed entries persist through leader changes
  • State Machine Safety: Operations applied in same order

Performance Characteristics

  • Write Latency: 2 round trips (propose + commit)
  • Read Latency: 1 round trip (from leader)
  • Throughput: Limited by network and storage I/O
  • Scalability: Linear with cluster size (for reads)

Data Partitioning

Consistent Hashing

Data is partitioned using consistent hashing with virtual nodes:

// Each physical node gets multiple virtual nodes on the hash ring
// This ensures even data distribution
partitioning.add_node(node_id, 100).await?; // 100 virtual nodes

Replication Strategy

Data is replicated to N nodes based on proximity on the hash ring:

// For replication_factor = 3
let nodes = partitioning.get_nodes_for_key(&key, 3);
// Returns 3 closest nodes on the ring

Partition Management

// Check partition ownership
let is_owner = partitioning.is_node_responsible(&node_id, &key).await;

// Get partition statistics
let stats = partitioning.get_distribution_stats().await;
println!("Partition variance: {}", stats.variance());

Replication & Fault Tolerance

Replication Queue

Operations are queued and replicated asynchronously:

// Queue operation for replication
replication.replicate_operation(operation, &primary_node).await?;

// Check replication health
let health = replication.check_health().await;
if !health.is_healthy {
    println!("Warning: High replication lag");
}

Failure Handling

Automatic failure detection and recovery:

// Node failure detected
replication.handle_node_failure(&failed_node).await?;

// Partitions redistributed
partitioning.rebalance().await?;

Consistency Levels

Choose appropriate consistency for your use case:

  • Strong Consistency: Wait for majority acknowledgment
  • Eventual Consistency: Asynchronous replication
  • Read-Your-Writes: Read from primary replica

Network Communication

gRPC Protocol

All communication uses efficient protobuf messages:

service ClusterService {
  rpc RequestVote(VoteRequest) returns (VoteResponse);
  rpc AppendEntries(AppendEntriesRequest) returns (AppendEntriesResponse);
  rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse);
  rpc ExecuteOperation(ClientRequest) returns (ClientResponse);
}

Connection Management

Automatic connection handling with reconnection:

// Connect to cluster node
network.connect_to_node(node_id, "127.0.0.1:8080".to_string()).await?;

// Send Raft message
network.send_raft_message(&target_node, message).await?;

Monitoring & Observability

Cluster Metrics

let status = cluster.get_status().await;

println!("Cluster Status:");
println!("  Leader: {:?}", status.leader.map(|n| n.0));
println!("  Active Nodes: {}", status.membership.active_nodes);
println!("  Failed Nodes: {}", status.membership.failed_nodes);
println!("  Replication Lag: {:?}", status.replication.replication_lag);

Health Checks

// Check cluster health
let health = cluster.check_health().await;
if !health.is_healthy {
    // Alert or take corrective action
    println!("Cluster unhealthy: {} failed nodes", health.failed_nodes_count);
}

Event Subscription

// Subscribe to cluster events
let mut events = cluster.subscribe_events().await;
while let Ok(event) = events.recv().await {
    match event {
        ClusterEvent::NodeJoined(node) => log::info!("Node joined: {}", node.0),
        ClusterEvent::NodeFailed(node) => log::error!("Node failed: {}", node.0),
        ClusterEvent::LeaderElected(node) => log::info!("New leader: {}", node.0),
        _ => {}
    }
}

Deployment Patterns

Single Region Cluster

┌─────────┐    ┌─────────┐    ┌─────────┐
│ Node 1  │◄──►│ Node 2  │◄──►│ Node 3  │
│ Leader  │    │ Follower│    │ Follower│
└─────────┘    └─────────┘    └─────────┘

Multi-Region Cluster

Region A                    Region B
┌─────────┐    ┌─────────┐    ┌─────────┐
│ Node 1  │◄──►│ Node 2  │◄──►│ Node 4  │
│ Leader  │    │ Follower│    │ Follower│
└─────────┘    └─────────┘    └─────────┘
                              │
                              ▼
                       ┌─────────┐
                       │ Node 5  │
                       │ Follower│
                       └─────────┘

Development Setup

# Start 3-node cluster for development
./kotoba-cluster --node-id=node1 --address=127.0.0.1:8080 --peers=127.0.0.1:8081,127.0.0.1:8082 &
./kotoba-cluster --node-id=node2 --address=127.0.0.1:8081 --peers=127.0.0.1:8080,127.0.0.1:8082 &
./kotoba-cluster --node-id=node3 --address=127.0.0.1:8082 --peers=127.0.0.1:8080,127.0.0.1:8081 &

Performance Tuning

Network Optimization

// Increase connection pool size
// Configure keep-alive settings
// Use connection multiplexing

Storage Optimization

// Tune LSM compaction settings
// Configure bloom filter sizes
// Optimize WAL sync intervals

Consensus Tuning

// Adjust election timeouts
// Configure heartbeat intervals
// Tune batch sizes

Error Handling

Common Errors

match cluster.execute_operation(operation).await {
    Ok(cid) => println!("Success: {}", cid),
    Err(ClusterError::NotLeader(leader)) => {
        // Redirect to leader
        println!("Redirect to leader: {}", leader);
    }
    Err(ClusterError::NoLeader) => {
        // Wait for leader election
        println!("Waiting for leader election...");
        tokio::time::sleep(Duration::from_secs(1)).await;
    }
    Err(ClusterError::NetworkError(e)) => {
        // Retry with backoff
        println!("Network error, retrying: {}", e);
    }
    _ => println!("Other error occurred"),
}

Future Enhancements

  • Multi-Raft: Multiple independent Raft groups
  • Witness Nodes: Non-voting nodes for read scaling
  • Dynamic Membership: Add/remove nodes without restart
  • Cross-DC Replication: Geographic replication
  • Query Optimization: Distributed query planning
  • Backup/Restore: Cluster-wide backup utilities

Contributing

  1. Fork the repository
  2. Create a feature branch
  3. Add comprehensive tests
  4. Update documentation
  5. Submit a pull request

License

Licensed under the MIT License.


KotobaDB Cluster - Distributed graph database with strong consistency and high availability 🚀

Commit count: 535

cargo fmt