| Crates.io | photon-etcd-cluster |
| lib.rs | photon-etcd-cluster |
| version | 0.1.0 |
| created_at | 2025-12-01 05:32:18.593688+00 |
| updated_at | 2025-12-01 05:32:18.593688+00 |
| description | Lightweight cluster coordination library providing leader election and worker membership via etcd |
| homepage | https://github.com/rgushel/photon-etcd-cluster |
| repository | https://github.com/rgushel/photon-etcd-cluster |
| max_upload_size | |
| id | 1959346 |
| size | 234,295 |
A lightweight Rust library for cluster coordination using etcd. Provides leader election and node registry with minimal dependencies and no platform lock-in (unlike Kubernetes-native solutions).
watch::Receiver::borrow()Add to your Cargo.toml:
[dependencies]
photon-etcd-cluster = "0.1"
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }
For system metrics collection (CPU, memory, load average), enable the system-metrics feature:
[dependencies]
photon-etcd-cluster = { version = "0.1", features = ["system-metrics"] }
use photon_etcd_cluster::ClusterNode;
use photon_etcd_cluster::HealthStatus;
use tokio::sync::broadcast;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (shutdown_tx, _) = broadcast::channel(1);
let node = ClusterNode::new(
vec!["http://localhost:2379".to_string()],
"node-1".to_string(),
"192.168.1.10".parse()?,
"my-service".to_string(),
Some(5), // TTL in seconds
);
// Run node in background
let n = node.clone();
let mut shutdown_rx = shutdown_tx.subscribe();
tokio::spawn(async move {
n.run(&mut shutdown_rx).await
});
// Use in your application
if node.is_leader() {
println!("I am the leader!");
// Perform leader-only tasks
}
match node.current_health() {
HealthStatus::Healthy => println!("Connected to etcd"),
HealthStatus::Unhealthy => println!("Connection issues"),
HealthStatus::Unknown => println!("Initializing..."),
}
Ok(())
}
use photon_etcd_cluster::{ServiceDiscovery, ClusterEvent};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let discovery = ServiceDiscovery::new(
vec!["http://localhost:2379".to_string()],
"my-service".to_string(),
);
// Run discovery in background
let d = discovery.clone();
tokio::spawn(async move { d.run(None).await });
// Wait for initial sync (event-driven, no polling)
discovery.wait_ready().await;
// Query nodes (lock-free, O(1))
for node in discovery.nodes().iter() {
println!("Node {} at {}", node.id, node.ip);
}
// Get current leader
if let Some(leader) = discovery.leader() {
println!("Current leader: {}", leader.id);
}
Ok(())
}
React to cluster changes as they happen using broadcast events:
use photon_etcd_cluster::{ServiceDiscovery, ClusterEvent};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let discovery = ServiceDiscovery::new(
vec!["http://localhost:2379".to_string()],
"my-service".to_string(),
);
// Subscribe to events BEFORE running
let mut events = discovery.subscribe();
// Run discovery in background
let d = discovery.clone();
tokio::spawn(async move { d.run(None).await });
// React to cluster changes
loop {
match events.recv().await {
Ok(ClusterEvent::Ready) => {
println!("Initial sync complete");
}
Ok(ClusterEvent::NodeJoined(n)) => {
println!("Node joined: {} at {}", n.id, n.ip);
// Update load balancer backends
}
Ok(ClusterEvent::NodeLeft(n)) => {
println!("Node left: {}", n.id);
// Remove from backend pool
}
Ok(ClusterEvent::LeaderElected(n)) => {
println!("New leader: {}", n.id);
// Route writes to new leader
}
Ok(ClusterEvent::LeaderLost) => {
println!("Leader lost, awaiting election");
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
println!("Missed {} events, refreshing state", n);
// Re-sync from discovery.nodes()
}
Err(_) => break,
_ => {}
}
}
Ok(())
}
Use watch channels for metrics or state observation - more efficient than polling:
use photon_etcd_cluster::ServiceDiscovery;
async fn update_metrics(discovery: ServiceDiscovery) {
let mut watch = discovery.watch_nodes();
// Set initial value
let count = watch.borrow().len();
NODE_GAUGE.set(count as i64);
// React only when state changes (no polling!)
while watch.changed().await.is_ok() {
let count = watch.borrow().len();
NODE_GAUGE.set(count as i64);
}
}
Nodes can report system metrics (CPU, memory, load average) that load balancers can use for weighted traffic distribution. Enable the system-metrics feature and use ClusterNodeBuilder:
use photon_etcd_cluster::{ClusterNodeBuilder, SystemMetricsCollector};
use tokio::sync::broadcast;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (shutdown_tx, _) = broadcast::channel(1);
// Create node with system metrics collection
let node = ClusterNodeBuilder::new(
vec!["http://localhost:2379".to_string()],
"worker-1".to_string(),
"192.168.1.10".parse()?,
"workers".to_string(),
)
.ttl(5)
.metrics_collector(SystemMetricsCollector::new())
.metrics_update_interval(5) // Update metrics every 5 seconds
.build();
// Run node in background
let n = node.clone();
let mut shutdown_rx = shutdown_tx.subscribe();
tokio::spawn(async move {
n.run(&mut shutdown_rx).await
});
Ok(())
}
Implement MetricsCollector trait for application-specific metrics:
use photon_etcd_cluster::{ClusterNodeBuilder, MetricsCollector, NodeMetadata};
use serde_json::json;
struct AppMetricsCollector {
// Your app state
}
impl MetricsCollector for AppMetricsCollector {
fn collect(&self) -> NodeMetadata {
json!({
"cpu_usage_percent": 45.0,
"memory_usage_percent": 60.0,
"queue_depth": 150,
"active_connections": 42,
"requests_per_second": 1000.0
})
}
}
let node = ClusterNodeBuilder::new(endpoints, id, ip, group)
.metrics_collector(AppMetricsCollector { /* ... */ })
.metrics_update_interval(5)
.build();
use photon_etcd_cluster::{ServiceDiscovery, ClusterEvent, metadata_keys};
let discovery = ServiceDiscovery::new(endpoints, "workers".into());
// React to metric changes
let mut events = discovery.subscribe();
while let Ok(event) = events.recv().await {
match event {
ClusterEvent::NodeUpdated { old, new } => {
// Metadata changed - update backend weights
if let Some(cpu) = new.metadata.get(metadata_keys::CPU_USAGE_PERCENT) {
println!("Node {} CPU: {}%", new.id, cpu);
}
}
_ => {}
}
}
// Or query directly
for node in discovery.nodes().iter() {
let cpu = node.metadata.get("cpu_usage_percent")
.and_then(|v| v.as_f64())
.unwrap_or(0.0);
let mem = node.metadata.get("memory_usage_percent")
.and_then(|v| v.as_f64())
.unwrap_or(0.0);
println!("Node {}: CPU={:.1}%, Memory={:.1}%", node.id, cpu, mem);
}
The metadata_keys module provides standard key names:
| Key | Type | Description |
|---|---|---|
cpu_usage_percent |
f64 |
CPU usage (0-100%) |
memory_usage_percent |
f64 |
Memory usage (0-100%) |
memory_available_bytes |
u64 |
Available memory in bytes |
load_avg_1m |
f64 |
1-minute load average |
active_connections |
u32 |
Active connection count |
requests_per_second |
f64 |
Request throughput |
queue_depth |
u32 |
Pending work queue size |
This example shows how to build a load balancer that automatically discovers backend servers using ServiceDiscovery. Based on actual production usage with Pingora.
use photon_etcd_cluster::{ServiceDiscovery, ClusterEvent};
use std::net::SocketAddr;
/// Manages dynamic backend discovery for a load balancer
struct BackendDiscovery {
discovery: ServiceDiscovery,
}
impl BackendDiscovery {
fn new(discovery: ServiceDiscovery) -> Self {
Self { discovery }
}
/// Returns healthy backend addresses for load balancing
/// Called on every incoming request - must be fast!
fn get_backends(&self) -> Vec<SocketAddr> {
// Lock-free O(1) read - safe for high-frequency calls
self.discovery
.nodes()
.iter()
.map(|node| SocketAddr::new(node.ip, 8080))
.collect()
}
/// Optionally route leader-only requests (e.g., write operations)
fn get_leader_backend(&self) -> Option<SocketAddr> {
self.discovery
.leader()
.map(|leader| SocketAddr::new(leader.ip, 8080))
}
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
let rt = tokio::runtime::Runtime::new()?;
// Create discovery for different service groups
let etcd_endpoints = vec!["http://localhost:2379".to_string()];
let workers_discovery = ServiceDiscovery::new(
etcd_endpoints.clone(),
"workers".to_string(), // Group: backend workers
);
let cache_discovery = ServiceDiscovery::new(
etcd_endpoints.clone(),
"cache-nodes".to_string(), // Group: cache servers
);
// Spawn discovery background tasks
rt.block_on(async {
let w = workers_discovery.clone();
let c = cache_discovery.clone();
tokio::spawn(async move { w.run(None).await });
tokio::spawn(async move { c.run(None).await });
// Wait for at least 1 backend (built-in, no polling!)
workers_discovery.wait_for_nodes(1).await;
cache_discovery.wait_for_nodes(1).await;
});
// Create backend discovery instances for load balancer
let worker_backends = BackendDiscovery::new(workers_discovery.clone());
let cache_backends = BackendDiscovery::new(cache_discovery.clone());
// Use in your load balancer's request routing
println!("Worker backends: {:?}", worker_backends.get_backends());
println!("Cache backends: {:?}", cache_backends.get_backends());
if let Some(leader) = worker_backends.get_leader_backend() {
println!("Leader backend for writes: {}", leader);
}
Ok(())
}
Key patterns demonstrated:
discovery.nodes() is safe to call on every requestwait_for_nodes() uses watch channels internally - no polling┌─────────────────────────────────────────────────────────────┐
│ etcd Cluster │
│ ┌──────────────────┐ ┌──────────────────┐ │
│ │ registry/{group} │ │ election/{group} │ │
│ │ /node-1 │ │ (leader key) │ │
│ │ /node-2 │ │ │ │
│ │ /node-N │ │ │ │
│ └──────────────────┘ └──────────────────┘ │
└─────────────────────────────────────────────────────────────┘
▲ ▲
│ watch │ campaign/proclaim
│ │
┌──────────┴──────────┐ ┌───────┴────────┐
│ ServiceDiscovery │ │ ClusterNode │
│ - subscribe() │ │ - run() │
│ - watch_nodes() │ │ - is_leader() │
│ - nodes() │ │ - health │
│ - leader() │ │ │
└─────────────────────┘ └────────────────┘
| Component | Purpose | Used By |
|---|---|---|
| ClusterNode | Node registration, leader election, health management | Worker processes |
| ClusterNodeBuilder | Fluent builder for ClusterNode with metrics configuration | Worker processes |
| ServiceDiscovery | Reactive cluster state via events and watch channels | Load balancers |
| ClusterEvent | Enum for cluster state changes (join/leave/leader/updated) | Event subscribers |
| Node | Serializable node data (id, ip, last_seen, metadata) | Both |
| MetricsCollector | Trait for collecting node metrics | Worker processes |
| SystemMetricsCollector | Built-in collector for CPU, memory, load (requires system-metrics feature) |
Worker processes |
Uses etcd's native campaign/proclaim APIs for distributed consensus:
Heartbeat-based health tracking:
Nodes can publish arbitrary JSON metadata (CPU, memory, custom metrics) for load balancer consumption:
serde_json::ValueMetricsCollector trait for custom metricsSystemMetricsCollector provides CPU, memory, load average (requires system-metrics feature)NodeUpdated events emitted when metadata changesServiceDiscovery provides three ways to observe cluster state:
subscribe(): Push-based ClusterEvent notificationswatch_nodes()/watch_leader(): Efficient state observationnodes()/leader(): O(1) lock-free readsEvents emitted:
NodeJoined(Node) / NodeLeft(Node) / NodeUpdated { old, new }LeaderElected(Node) / LeaderLostReady / Disconnected / Reconnectedregistry/
└── {group_name}/
├── node-1 → {"id":"node-1","ip":"192.168.1.10","last_seen":1234567890,"metadata":{...}}
├── node-2 → {"id":"node-2","ip":"192.168.1.11","last_seen":1234567891,"metadata":{...}}
└── ...
election/
└── {group_name} → (etcd election key, value = current leader ID)
Node metadata example (with SystemMetricsCollector):
{
"id": "worker-1",
"ip": "192.168.1.10",
"last_seen": 1234567890,
"metadata": {
"cpu_usage_percent": 45.2,
"memory_usage_percent": 62.8,
"memory_available_bytes": 8589934592,
"load_avg_1m": 2.5
}
}
| Parameter | Type | Default | Description |
|---|---|---|---|
etcd_endpoints |
Vec<String> |
Required | etcd cluster endpoints |
node_id |
String |
Required | Unique node identifier |
node_ip |
IpAddr |
Required | Node's IP address |
group_name |
String |
Required | Logical group for nodes |
ttl |
i64 |
5 |
Lease TTL in seconds |
metrics_collector |
impl MetricsCollector |
NoopMetricsCollector |
Metrics collection implementation |
metrics_update_interval |
u64 |
0 (disabled) |
Seconds between metrics updates |
watch::Receiver::borrow()Tested with etcd v3.5.21.
# Cargo
cargo build
cargo test
## Testing
```bash
# Unit tests
cargo test
# Integration tests (requires Docker - spawns etcd container)
cargo test -- --test-threads=1
NodeUpdated events| Feature | photon-etcd-cluster | kube-leader-election | memberlist | chitchat |
|---|---|---|---|---|
| Language | Rust | Rust | Go | Rust |
| Backend | etcd | Kubernetes | None (P2P) | None (P2P) |
| Leader Election | Yes | Yes | No | No |
| Node Registry | Yes | No | Yes | Yes |
| Node Metrics | Yes | No | Limited | Limited |
| Platform Independent | Yes | No (K8s only) | Yes | Yes |
| External Dependencies | etcd | Kubernetes | None | None |
Contributions are welcome! Please see CONTRIBUTING.md for guidelines.
This project is licensed under the MIT License - see the LICENSE file for details.
Roman Gushel