| Crates.io | kotoba-db-cluster |
| lib.rs | kotoba-db-cluster |
| version | 0.1.21 |
| created_at | 2025-09-19 15:48:35.512534+00 |
| updated_at | 2025-09-19 15:48:35.512534+00 |
| description | Distributed clustering and consensus for KotobaDB |
| homepage | https://github.com/com-junkawasaki/kotoba |
| repository | https://github.com/com-junkawasaki/kotoba |
| max_upload_size | |
| id | 1846629 |
| size | 128,006 |
Distributed clustering and consensus for KotobaDB. Provides high availability, fault tolerance, and horizontal scalability through Raft consensus and data partitioning.
┌─────────────────────────────────────────┐
│ Application Layer │
├─────────────────────────────────────────┤
│ KotobaCluster High-Level API │
│ ┌─────────────────────────────────┐ │
│ │ Consensus (Raft) │ │
│ │ Membership Management │ │
│ │ Data Partitioning │ │
│ │ Replication Manager │ │
│ └─────────────────────────────────┘ │
├─────────────────────────────────────────┤
│ Network Communication Layer │
│ ┌─────────────────────────────────┐ │
│ │ gRPC Services │ │
│ │ Message Routing │ │
│ │ Connection Management │ │
│ └─────────────────────────────────┘ │
└─────────────────────────────────────────┘
Add to your Cargo.toml:
[dependencies]
kotoba-db-cluster = "0.1.0"
use kotoba_db_cluster::*;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create cluster configuration
let config = ClusterConfig {
replication_factor: 3,
partition_count: 64,
..Default::default()
};
// Create and start cluster node
let node_id = NodeId("node-1".to_string());
let mut cluster = KotobaCluster::new(node_id, config).await?;
// Start the cluster on a network address
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
cluster.start(addr).await?;
println!("Cluster node started on {}", addr);
// Add database instance
// let db = /* your KotobaDB instance */;
// cluster.add_database(db).await?;
// Keep running
tokio::signal::ctrl_c().await?;
cluster.stop().await?;
Ok(())
}
// Execute distributed operations
let operation = Operation::CreateNode {
properties: {
let mut props = HashMap::new();
props.insert("name".to_string(), Value::String("Alice".to_string()));
props.insert("age".to_string(), Value::Int(30));
props
}
};
let result_cid = cluster.execute_operation(operation).await?;
println!("Created node with CID: {}", result_cid);
// Execute distributed queries
let query = DistributedQuery::MultiPartition {
query: Query // Your query here
};
let results = cluster.execute_query(query).await?;
// Monitor cluster health
let status = cluster.get_status().await;
println!("Cluster has {} active nodes", status.membership.active_nodes);
println!("Replication health: {}", status.replication.is_healthy);
// Subscribe to cluster events
let mut events = cluster.subscribe_events().await;
while let Ok(event) = events.recv().await {
match event {
ClusterEvent::NodeJoined(node) => println!("Node {} joined", node.0),
ClusterEvent::NodeFailed(node) => println!("Node {} failed", node.0),
_ => {}
}
}
let config = ClusterConfig {
nodes: HashMap::new(), // Will be populated dynamically
replication_factor: 3, // Number of data replicas
partition_count: 64, // Number of data partitions
};
let membership_config = MembershipConfig {
heartbeat_interval: Duration::from_secs(1),
failure_detection_interval: Duration::from_secs(5),
max_missed_heartbeats: 3,
failure_timeout: Duration::from_secs(15),
gossip_interval: Duration::from_secs(2),
};
let replication_config = ReplicationConfig {
replication_factor: 3,
max_retries: 3,
status_check_interval: Duration::from_secs(5),
queue_processing_interval: Duration::from_millis(100),
full_sync_interval: Duration::from_secs(300),
node_failure_timeout: Duration::from_secs(30),
failure_rate: 0.01,
};
Data is partitioned using consistent hashing with virtual nodes:
// Each physical node gets multiple virtual nodes on the hash ring
// This ensures even data distribution
partitioning.add_node(node_id, 100).await?; // 100 virtual nodes
Data is replicated to N nodes based on proximity on the hash ring:
// For replication_factor = 3
let nodes = partitioning.get_nodes_for_key(&key, 3);
// Returns 3 closest nodes on the ring
// Check partition ownership
let is_owner = partitioning.is_node_responsible(&node_id, &key).await;
// Get partition statistics
let stats = partitioning.get_distribution_stats().await;
println!("Partition variance: {}", stats.variance());
Operations are queued and replicated asynchronously:
// Queue operation for replication
replication.replicate_operation(operation, &primary_node).await?;
// Check replication health
let health = replication.check_health().await;
if !health.is_healthy {
println!("Warning: High replication lag");
}
Automatic failure detection and recovery:
// Node failure detected
replication.handle_node_failure(&failed_node).await?;
// Partitions redistributed
partitioning.rebalance().await?;
Choose appropriate consistency for your use case:
All communication uses efficient protobuf messages:
service ClusterService {
rpc RequestVote(VoteRequest) returns (VoteResponse);
rpc AppendEntries(AppendEntriesRequest) returns (AppendEntriesResponse);
rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse);
rpc ExecuteOperation(ClientRequest) returns (ClientResponse);
}
Automatic connection handling with reconnection:
// Connect to cluster node
network.connect_to_node(node_id, "127.0.0.1:8080".to_string()).await?;
// Send Raft message
network.send_raft_message(&target_node, message).await?;
let status = cluster.get_status().await;
println!("Cluster Status:");
println!(" Leader: {:?}", status.leader.map(|n| n.0));
println!(" Active Nodes: {}", status.membership.active_nodes);
println!(" Failed Nodes: {}", status.membership.failed_nodes);
println!(" Replication Lag: {:?}", status.replication.replication_lag);
// Check cluster health
let health = cluster.check_health().await;
if !health.is_healthy {
// Alert or take corrective action
println!("Cluster unhealthy: {} failed nodes", health.failed_nodes_count);
}
// Subscribe to cluster events
let mut events = cluster.subscribe_events().await;
while let Ok(event) = events.recv().await {
match event {
ClusterEvent::NodeJoined(node) => log::info!("Node joined: {}", node.0),
ClusterEvent::NodeFailed(node) => log::error!("Node failed: {}", node.0),
ClusterEvent::LeaderElected(node) => log::info!("New leader: {}", node.0),
_ => {}
}
}
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Node 1 │◄──►│ Node 2 │◄──►│ Node 3 │
│ Leader │ │ Follower│ │ Follower│
└─────────┘ └─────────┘ └─────────┘
Region A Region B
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Node 1 │◄──►│ Node 2 │◄──►│ Node 4 │
│ Leader │ │ Follower│ │ Follower│
└─────────┘ └─────────┘ └─────────┘
│
▼
┌─────────┐
│ Node 5 │
│ Follower│
└─────────┘
# Start 3-node cluster for development
./kotoba-cluster --node-id=node1 --address=127.0.0.1:8080 --peers=127.0.0.1:8081,127.0.0.1:8082 &
./kotoba-cluster --node-id=node2 --address=127.0.0.1:8081 --peers=127.0.0.1:8080,127.0.0.1:8082 &
./kotoba-cluster --node-id=node3 --address=127.0.0.1:8082 --peers=127.0.0.1:8080,127.0.0.1:8081 &
// Increase connection pool size
// Configure keep-alive settings
// Use connection multiplexing
// Tune LSM compaction settings
// Configure bloom filter sizes
// Optimize WAL sync intervals
// Adjust election timeouts
// Configure heartbeat intervals
// Tune batch sizes
match cluster.execute_operation(operation).await {
Ok(cid) => println!("Success: {}", cid),
Err(ClusterError::NotLeader(leader)) => {
// Redirect to leader
println!("Redirect to leader: {}", leader);
}
Err(ClusterError::NoLeader) => {
// Wait for leader election
println!("Waiting for leader election...");
tokio::time::sleep(Duration::from_secs(1)).await;
}
Err(ClusterError::NetworkError(e)) => {
// Retry with backoff
println!("Network error, retrying: {}", e);
}
_ => println!("Other error occurred"),
}
Licensed under the MIT License.
KotobaDB Cluster - Distributed graph database with strong consistency and high availability 🚀