photon-etcd-cluster

Crates.iophoton-etcd-cluster
lib.rsphoton-etcd-cluster
version0.1.0
created_at2025-12-01 05:32:18.593688+00
updated_at2025-12-01 05:32:18.593688+00
descriptionLightweight cluster coordination library providing leader election and worker membership via etcd
homepagehttps://github.com/rgushel/photon-etcd-cluster
repositoryhttps://github.com/rgushel/photon-etcd-cluster
max_upload_size
id1959346
size234,295
Roman G (rgushel)

documentation

https://docs.rs/photon-etcd-cluster

README

photon-etcd-cluster

Crates.io Documentation License: MIT

A lightweight Rust library for cluster coordination using etcd. Provides leader election and node registry with minimal dependencies and no platform lock-in (unlike Kubernetes-native solutions).

Why photon-etcd-cluster?

  • Platform-agnostic: Works anywhere etcd runs - bare metal, VMs, containers, or cloud
  • Minimal dependencies: Only etcd required, no Kubernetes or other orchestrators
  • Reactive API: Event-driven with broadcast channels and watch streams - no polling required
  • Lock-free reads: O(1) access to cluster state via watch::Receiver::borrow()
  • Node Metrics: Optional system metrics collection (CPU, memory, load)

Use Cases

  • Organize distributed workers into logical groups
  • Elect a single leader per group for coordination tasks (cache invalidation, job scheduling)
  • Dynamic service discovery for load balancers
  • Health monitoring with automatic failure detection
  • Weighted load balancing based on real-time node metrics (CPU, memory, queue depth)

Quick Start

Add to your Cargo.toml:

[dependencies]
photon-etcd-cluster = "0.1"
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }

For system metrics collection (CPU, memory, load average), enable the system-metrics feature:

[dependencies]
photon-etcd-cluster = { version = "0.1", features = ["system-metrics"] }

Worker Process (ClusterNode)

use photon_etcd_cluster::ClusterNode;
use photon_etcd_cluster::HealthStatus;
use tokio::sync::broadcast;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let (shutdown_tx, _) = broadcast::channel(1);

    let node = ClusterNode::new(
        vec!["http://localhost:2379".to_string()],
        "node-1".to_string(),
        "192.168.1.10".parse()?,
        "my-service".to_string(),
        Some(5), // TTL in seconds
    );

    // Run node in background
    let n = node.clone();
    let mut shutdown_rx = shutdown_tx.subscribe();
    tokio::spawn(async move {
        n.run(&mut shutdown_rx).await
    });

    // Use in your application
    if node.is_leader() {
        println!("I am the leader!");
        // Perform leader-only tasks
    }

    match node.current_health() {
        HealthStatus::Healthy => println!("Connected to etcd"),
        HealthStatus::Unhealthy => println!("Connection issues"),
        HealthStatus::Unknown => println!("Initializing..."),
    }

    Ok(())
}

Load Balancer / Service Discovery

use photon_etcd_cluster::{ServiceDiscovery, ClusterEvent};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let discovery = ServiceDiscovery::new(
        vec!["http://localhost:2379".to_string()],
        "my-service".to_string(),
    );

    // Run discovery in background
    let d = discovery.clone();
    tokio::spawn(async move { d.run(None).await });

    // Wait for initial sync (event-driven, no polling)
    discovery.wait_ready().await;

    // Query nodes (lock-free, O(1))
    for node in discovery.nodes().iter() {
        println!("Node {} at {}", node.id, node.ip);
    }

    // Get current leader
    if let Some(leader) = discovery.leader() {
        println!("Current leader: {}", leader.id);
    }

    Ok(())
}

Event-Driven Updates (Recommended)

React to cluster changes as they happen using broadcast events:

use photon_etcd_cluster::{ServiceDiscovery, ClusterEvent};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let discovery = ServiceDiscovery::new(
        vec!["http://localhost:2379".to_string()],
        "my-service".to_string(),
    );

    // Subscribe to events BEFORE running
    let mut events = discovery.subscribe();

    // Run discovery in background
    let d = discovery.clone();
    tokio::spawn(async move { d.run(None).await });

    // React to cluster changes
    loop {
        match events.recv().await {
            Ok(ClusterEvent::Ready) => {
                println!("Initial sync complete");
            }
            Ok(ClusterEvent::NodeJoined(n)) => {
                println!("Node joined: {} at {}", n.id, n.ip);
                // Update load balancer backends
            }
            Ok(ClusterEvent::NodeLeft(n)) => {
                println!("Node left: {}", n.id);
                // Remove from backend pool
            }
            Ok(ClusterEvent::LeaderElected(n)) => {
                println!("New leader: {}", n.id);
                // Route writes to new leader
            }
            Ok(ClusterEvent::LeaderLost) => {
                println!("Leader lost, awaiting election");
            }
            Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
                println!("Missed {} events, refreshing state", n);
                // Re-sync from discovery.nodes()
            }
            Err(_) => break,
            _ => {}
        }
    }

    Ok(())
}

Watch-Based Metrics (Efficient State Observation)

Use watch channels for metrics or state observation - more efficient than polling:

use photon_etcd_cluster::ServiceDiscovery;

async fn update_metrics(discovery: ServiceDiscovery) {
    let mut watch = discovery.watch_nodes();

    // Set initial value
    let count = watch.borrow().len();
    NODE_GAUGE.set(count as i64);

    // React only when state changes (no polling!)
    while watch.changed().await.is_ok() {
        let count = watch.borrow().len();
        NODE_GAUGE.set(count as i64);
    }
}

Node Metrics for Weighted Load Balancing

Nodes can report system metrics (CPU, memory, load average) that load balancers can use for weighted traffic distribution. Enable the system-metrics feature and use ClusterNodeBuilder:

use photon_etcd_cluster::{ClusterNodeBuilder, SystemMetricsCollector};
use tokio::sync::broadcast;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let (shutdown_tx, _) = broadcast::channel(1);

    // Create node with system metrics collection
    let node = ClusterNodeBuilder::new(
        vec!["http://localhost:2379".to_string()],
        "worker-1".to_string(),
        "192.168.1.10".parse()?,
        "workers".to_string(),
    )
    .ttl(5)
    .metrics_collector(SystemMetricsCollector::new())
    .metrics_update_interval(5) // Update metrics every 5 seconds
    .build();

    // Run node in background
    let n = node.clone();
    let mut shutdown_rx = shutdown_tx.subscribe();
    tokio::spawn(async move {
        n.run(&mut shutdown_rx).await
    });

    Ok(())
}

Custom Metrics Collector

Implement MetricsCollector trait for application-specific metrics:

use photon_etcd_cluster::{ClusterNodeBuilder, MetricsCollector, NodeMetadata};
use serde_json::json;

struct AppMetricsCollector {
    // Your app state
}

impl MetricsCollector for AppMetricsCollector {
    fn collect(&self) -> NodeMetadata {
        json!({
            "cpu_usage_percent": 45.0,
            "memory_usage_percent": 60.0,
            "queue_depth": 150,
            "active_connections": 42,
            "requests_per_second": 1000.0
        })
    }
}

let node = ClusterNodeBuilder::new(endpoints, id, ip, group)
    .metrics_collector(AppMetricsCollector { /* ... */ })
    .metrics_update_interval(5)
    .build();

Reading Node Metrics (Load Balancer Side)

use photon_etcd_cluster::{ServiceDiscovery, ClusterEvent, metadata_keys};

let discovery = ServiceDiscovery::new(endpoints, "workers".into());

// React to metric changes
let mut events = discovery.subscribe();
while let Ok(event) = events.recv().await {
    match event {
        ClusterEvent::NodeUpdated { old, new } => {
            // Metadata changed - update backend weights
            if let Some(cpu) = new.metadata.get(metadata_keys::CPU_USAGE_PERCENT) {
                println!("Node {} CPU: {}%", new.id, cpu);
            }
        }
        _ => {}
    }
}

// Or query directly
for node in discovery.nodes().iter() {
    let cpu = node.metadata.get("cpu_usage_percent")
        .and_then(|v| v.as_f64())
        .unwrap_or(0.0);
    let mem = node.metadata.get("memory_usage_percent")
        .and_then(|v| v.as_f64())
        .unwrap_or(0.0);
    println!("Node {}: CPU={:.1}%, Memory={:.1}%", node.id, cpu, mem);
}

Standard Metadata Keys

The metadata_keys module provides standard key names:

Key Type Description
cpu_usage_percent f64 CPU usage (0-100%)
memory_usage_percent f64 Memory usage (0-100%)
memory_available_bytes u64 Available memory in bytes
load_avg_1m f64 1-minute load average
active_connections u32 Active connection count
requests_per_second f64 Request throughput
queue_depth u32 Pending work queue size

Real-World Example: HTTP Load Balancer with Dynamic Backends

This example shows how to build a load balancer that automatically discovers backend servers using ServiceDiscovery. Based on actual production usage with Pingora.

use photon_etcd_cluster::{ServiceDiscovery, ClusterEvent};
use std::net::SocketAddr;

/// Manages dynamic backend discovery for a load balancer
struct BackendDiscovery {
    discovery: ServiceDiscovery,
}

impl BackendDiscovery {
    fn new(discovery: ServiceDiscovery) -> Self {
        Self { discovery }
    }

    /// Returns healthy backend addresses for load balancing
    /// Called on every incoming request - must be fast!
    fn get_backends(&self) -> Vec<SocketAddr> {
        // Lock-free O(1) read - safe for high-frequency calls
        self.discovery
            .nodes()
            .iter()
            .map(|node| SocketAddr::new(node.ip, 8080))
            .collect()
    }

    /// Optionally route leader-only requests (e.g., write operations)
    fn get_leader_backend(&self) -> Option<SocketAddr> {
        self.discovery
            .leader()
            .map(|leader| SocketAddr::new(leader.ip, 8080))
    }
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let rt = tokio::runtime::Runtime::new()?;

    // Create discovery for different service groups
    let etcd_endpoints = vec!["http://localhost:2379".to_string()];

    let workers_discovery = ServiceDiscovery::new(
        etcd_endpoints.clone(),
        "workers".to_string(),      // Group: backend workers
    );
    let cache_discovery = ServiceDiscovery::new(
        etcd_endpoints.clone(),
        "cache-nodes".to_string(),  // Group: cache servers
    );

    // Spawn discovery background tasks
    rt.block_on(async {
        let w = workers_discovery.clone();
        let c = cache_discovery.clone();

        tokio::spawn(async move { w.run(None).await });
        tokio::spawn(async move { c.run(None).await });

        // Wait for at least 1 backend (built-in, no polling!)
        workers_discovery.wait_for_nodes(1).await;
        cache_discovery.wait_for_nodes(1).await;
    });

    // Create backend discovery instances for load balancer
    let worker_backends = BackendDiscovery::new(workers_discovery.clone());
    let cache_backends = BackendDiscovery::new(cache_discovery.clone());

    // Use in your load balancer's request routing
    println!("Worker backends: {:?}", worker_backends.get_backends());
    println!("Cache backends: {:?}", cache_backends.get_backends());

    if let Some(leader) = worker_backends.get_leader_backend() {
        println!("Leader backend for writes: {}", leader);
    }

    Ok(())
}

Key patterns demonstrated:

  • Multiple service groups: Separate discovery instances for different backend types (workers, cache, etc.)
  • Lock-free reads: discovery.nodes() is safe to call on every request
  • Leader routing: Route write operations to the elected leader
  • Built-in wait helpers: wait_for_nodes() uses watch channels internally - no polling

Architecture

┌─────────────────────────────────────────────────────────────┐
│                         etcd Cluster                        │
│  ┌──────────────────┐  ┌──────────────────┐                │
│  │ registry/{group} │  │ election/{group} │                │
│  │   /node-1        │  │   (leader key)   │                │
│  │   /node-2        │  │                  │                │
│  │   /node-N        │  │                  │                │
│  └──────────────────┘  └──────────────────┘                │
└─────────────────────────────────────────────────────────────┘
           ▲                      ▲
           │ watch                │ campaign/proclaim
           │                      │
┌──────────┴──────────┐  ┌───────┴────────┐
│  ServiceDiscovery   │  │   ClusterNode  │
│  - subscribe()      │  │  - run()       │
│  - watch_nodes()    │  │  - is_leader() │
│  - nodes()          │  │  - health      │
│  - leader()         │  │                │
└─────────────────────┘  └────────────────┘

Components

Component Purpose Used By
ClusterNode Node registration, leader election, health management Worker processes
ClusterNodeBuilder Fluent builder for ClusterNode with metrics configuration Worker processes
ServiceDiscovery Reactive cluster state via events and watch channels Load balancers
ClusterEvent Enum for cluster state changes (join/leave/leader/updated) Event subscribers
Node Serializable node data (id, ip, last_seen, metadata) Both
MetricsCollector Trait for collecting node metrics Worker processes
SystemMetricsCollector Built-in collector for CPU, memory, load (requires system-metrics feature) Worker processes

Features

Leader Election

Uses etcd's native campaign/proclaim APIs for distributed consensus:

  • Campaign: Blocks until leadership is acquired
  • Proclaim: Periodic heartbeat to maintain leadership
  • Resign: Graceful leadership handover on shutdown

Health Monitoring

Heartbeat-based health tracking:

  • Lease keep-alive failures mark node as unhealthy after 3 consecutive failures
  • Node triggers reconnection after 10 consecutive failures
  • Automatic recovery when connectivity restores

Resilient Connectivity

  • Exponential backoff on etcd connection failures (1s → 30s max)
  • Graceful reconnection after network partitions
  • Shutdown signal integration for clean termination

Node Metrics (Optional)

Nodes can publish arbitrary JSON metadata (CPU, memory, custom metrics) for load balancer consumption:

  • Schema-less: Any JSON-serializable data via serde_json::Value
  • Pluggable collection: Implement MetricsCollector trait for custom metrics
  • Built-in system metrics: SystemMetricsCollector provides CPU, memory, load average (requires system-metrics feature)
  • Separate update task: Metrics updates run independently from lease keep-alive
  • Change detection: NodeUpdated events emitted when metadata changes

Reactive API

ServiceDiscovery provides three ways to observe cluster state:

  • Event subscription via subscribe(): Push-based ClusterEvent notifications
  • Watch channels via watch_nodes()/watch_leader(): Efficient state observation
  • Direct access via nodes()/leader(): O(1) lock-free reads

Events emitted:

  • NodeJoined(Node) / NodeLeft(Node) / NodeUpdated { old, new }
  • LeaderElected(Node) / LeaderLost
  • Ready / Disconnected / Reconnected

etcd Key Structure

registry/
└── {group_name}/
    ├── node-1  →  {"id":"node-1","ip":"192.168.1.10","last_seen":1234567890,"metadata":{...}}
    ├── node-2  →  {"id":"node-2","ip":"192.168.1.11","last_seen":1234567891,"metadata":{...}}
    └── ...

election/
└── {group_name}  →  (etcd election key, value = current leader ID)

Node metadata example (with SystemMetricsCollector):

{
  "id": "worker-1",
  "ip": "192.168.1.10",
  "last_seen": 1234567890,
  "metadata": {
    "cpu_usage_percent": 45.2,
    "memory_usage_percent": 62.8,
    "memory_available_bytes": 8589934592,
    "load_avg_1m": 2.5
  }
}

Configuration

ClusterNode / ClusterNodeBuilder

Parameter Type Default Description
etcd_endpoints Vec<String> Required etcd cluster endpoints
node_id String Required Unique node identifier
node_ip IpAddr Required Node's IP address
group_name String Required Logical group for nodes
ttl i64 5 Lease TTL in seconds
metrics_collector impl MetricsCollector NoopMetricsCollector Metrics collection implementation
metrics_update_interval u64 0 (disabled) Seconds between metrics updates

Performance

  • ServiceDiscovery.nodes(): O(1) lock-free read via watch::Receiver::borrow()
  • Throughput: 10M accesses in <2s (benchmark validated)
  • Memory: ~5 MiB for 1000 nodes
  • Event channel: 256-message buffer for broadcast subscribers

Requirements

  • Rust 1.85+ (edition 2024)
  • etcd v3.5+
  • Docker (for integration tests only)

Tested with etcd v3.5.21.

Build System

# Cargo
cargo build
cargo test

## Testing

```bash
# Unit tests
cargo test

# Integration tests (requires Docker - spawns etcd container)
cargo test -- --test-threads=1

Integration Test Coverage

  • Single node self-election
  • Leader re-election on failure
  • Node reconnection after etcd restarts
  • Scalability with 20+ concurrent nodes
  • Node metadata storage and retrieval
  • Metadata update propagation via NodeUpdated events

Comparison with Alternatives

Feature photon-etcd-cluster kube-leader-election memberlist chitchat
Language Rust Rust Go Rust
Backend etcd Kubernetes None (P2P) None (P2P)
Leader Election Yes Yes No No
Node Registry Yes No Yes Yes
Node Metrics Yes No Limited Limited
Platform Independent Yes No (K8s only) Yes Yes
External Dependencies etcd Kubernetes None None

Contributing

Contributions are welcome! Please see CONTRIBUTING.md for guidelines.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Author

Roman Gushel

Roadmap

  • Leader Priority/Weighting
  • Graceful Leadership Transfer
  • Node Tagging & Filtering
  • TLS/mTLS Support for etcd Connections
  • etcd Authentication Support
  • Graceful Degradation & Health Check States
  • Application-Level Health Checks
  • Topology/Zone Awareness
  • Observability: Prometheus Metrics
  • Circuit Breaker for etcd Operations
Commit count: 0

cargo fmt