| Crates.io | memberlist-plumtree |
| lib.rs | memberlist-plumtree |
| version | 0.1.5 |
| created_at | 2026-01-12 00:47:10.367075+00 |
| updated_at | 2026-01-19 08:27:43.866313+00 |
| description | Plumtree (Epidemic Broadcast Trees) implementation built on memberlist for efficient O(n) message broadcast. |
| homepage | |
| repository | https://github.com/johnnywale/memberlist-plumtree |
| max_upload_size | |
| id | 2036791 |
| size | 1,141,079 |
Plumtree (Epidemic Broadcast Trees) implementation built on top of memberlist for efficient O(n) message broadcast in distributed systems.
Plumtree combines the efficiency of tree-based broadcast with the reliability of gossip protocols through a hybrid push/lazy-push approach. This makes it ideal for applications requiring reliable message dissemination across large clusters with minimal network overhead.
bytes::BytesEvery node maintains its own classification of peers as "eager" or "lazy". There is no global tree - the spanning tree emerges from each node's local decisions.
┌─────────────────────────────────────────────────────────────────┐
│ Node A's Local View │
├─────────────────────────────────────────────────────────────────┤
│ │
│ [Node A] │
│ / | \ │
│ / | \ │
│ eager eager lazy │
│ / | \ │
│ [B] [C] [D] │
│ │
│ A sends GOSSIP (full message) to B and C │
│ A sends IHAVE (just message ID) to D │
│ │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ Node B's Local View │
├─────────────────────────────────────────────────────────────────┤
│ │
│ [Node B] │
│ / | \ │
│ / | \ │
│ eager lazy eager │
│ / | \ │
│ [A] [C] [E] │
│ │
│ B has its OWN classification - different from A's view! │
│ B sends GOSSIP to A and E, IHAVE to C │
│ │
└─────────────────────────────────────────────────────────────────┘
When Node A broadcasts a message, here's how it flows:
Step 1: A originates message
A ──GOSSIP──> B (A's eager peer)
A ──GOSSIP──> C (A's eager peer)
A ──IHAVE───> D (A's lazy peer)
Step 2: B receives, forwards to its eager peers
B ──GOSSIP──> E (B's eager peer)
B ──IHAVE───> C (B's lazy peer, but C already has it)
Step 3: Tree optimization
C received from both A and B (redundant!)
C sends PRUNE to B → B demotes C to lazy
Result: More efficient tree for next message
When a lazy peer receives IHAVE, it does NOT immediately request the message. Instead, it waits to see if the message arrives through its eager connections:
┌─────────────────────────────────────────────────────────────────┐
│ What happens when D receives IHAVE from A │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 1. A ──IHAVE(msg_id)──> D │
│ │
│ 2. D starts a timer (graft_timeout, default 500ms) │
│ D thinks: "I'll wait to see if I get this from someone else"│
│ │
│ 3a. IF D receives GOSSIP from another peer before timeout: │
│ ✓ D cancels timer, does nothing │
│ (D got message through its eager connections - normal case)│
│ │
│ 3b. IF timeout expires and D still doesn't have message: │
│ ! D ──GRAFT(msg_id)──> A │
│ ! A ──GOSSIP(msg)──> D (and A promotes D to eager) │
│ (Tree repair - D joins A's eager set for future messages) │
│ │
└─────────────────────────────────────────────────────────────────┘
Example: D has its own eager peer (B)
[A] [B]
│ │
(lazy) (eager)
│ │
└──IHAVE──> [D] <──GOSSIP──┘
│
D receives from B first!
IHAVE from A is ignored (backup not needed)
Why wait instead of immediate pull?
| Approach | Messages | Latency | Used By |
|---|---|---|---|
| Immediate pull on IHAVE | 2x messages (IHAVE + GRAFT + GOSSIP) | Higher | - |
| Wait then pull if needed | Usually just GOSSIP via eager | Lower | Plumtree |
The IHAVE/GRAFT mechanism is a backup path, not the primary delivery. This is what makes Plumtree achieve O(n) message complexity - most messages flow through the eager spanning tree, and lazy links only activate when needed.
| Message | When Sent | Purpose |
|---|---|---|
| GOSSIP | To eager peers | Full message payload - primary delivery |
| IHAVE | To lazy peers | "I have message X" - backup announcement |
| GRAFT | When missing message | "Send me message X, add me as eager" |
| PRUNE | On duplicate receipt | "I got this already, demote me to lazy" |
Add to your Cargo.toml:
[dependencies]
memberlist-plumtree = "0.1"
| Feature | Description |
|---|---|
tokio |
Enable Tokio runtime support (required for Lazarus background task) |
metrics |
Enable metrics collection via the metrics crate |
serde |
Enable serialization/deserialization for config |
use memberlist_plumtree::{Plumtree, PlumtreeConfig, PlumtreeDelegate, MessageId};
use bytes::Bytes;
use std::sync::Arc;
// Define a delegate to handle delivered messages
struct MyDelegate;
impl PlumtreeDelegate for MyDelegate {
fn on_deliver(&self, msg_id: MessageId, payload: Bytes) {
println!("Received message {}: {:?}", msg_id, payload);
}
}
#[tokio::main]
async fn main() {
// Create a Plumtree instance with LAN-optimized config
let (plumtree, handle) = Plumtree::new(
"node-1".to_string(),
PlumtreeConfig::lan(),
Arc::new(MyDelegate),
);
// Add peers discovered via memberlist
plumtree.add_peer("node-2".to_string());
plumtree.add_peer("node-3".to_string());
// Broadcast a message to all nodes
let msg_id = plumtree.broadcast(b"Hello, cluster!").await.unwrap();
println!("Broadcast message: {}", msg_id);
// Run background tasks (IHave scheduler, Graft timer, cleanup)
tokio::spawn({
let pt = plumtree.clone();
async move {
tokio::join!(
pt.run_ihave_scheduler(),
pt.run_graft_timer(),
pt.run_seen_cleanup(),
);
}
});
}
// For LAN environments (low latency, high bandwidth)
let config = PlumtreeConfig::lan();
// For WAN environments (higher latency tolerance)
let config = PlumtreeConfig::wan();
// For large clusters (1000+ nodes)
let config = PlumtreeConfig::large_cluster();
use std::time::Duration;
let config = PlumtreeConfig::default()
.with_eager_fanout(4) // Number of eager peers
.with_lazy_fanout(8) // Number of lazy peers for IHave
.with_ihave_interval(Duration::from_millis(100))
.with_graft_timeout(Duration::from_millis(500))
.with_message_cache_ttl(Duration::from_secs(60))
.with_message_cache_max_size(10000)
.with_optimization_threshold(3)
.with_max_message_size(64 * 1024)
.with_graft_rate_limit_per_second(10.0)
.with_graft_rate_limit_burst(20)
.with_graft_max_retries(5);
| Parameter | Default | Description |
|---|---|---|
eager_fanout |
3 | Number of peers receiving full messages immediately |
lazy_fanout |
6 | Number of peers receiving IHave announcements |
ihave_interval |
100ms | Interval for batched IHave announcements |
message_cache_ttl |
60s | How long to cache messages for Graft requests |
message_cache_max_size |
10000 | Maximum number of cached messages |
optimization_threshold |
3 | Rounds before pruning redundant paths |
graft_timeout |
500ms | Timeout before sending Graft for missing message |
max_message_size |
64KB | Maximum message payload size |
graft_rate_limit_per_second |
10.0 | Rate limit for Graft requests per peer |
graft_rate_limit_burst |
20 | Burst capacity for Graft rate limiting |
graft_max_retries |
5 | Maximum Graft retry attempts with backoff |
Peer scoring enables latency-aware peer selection for optimal spanning tree construction:
use memberlist_plumtree::{PeerScoring, ScoringConfig};
// Peers are scored based on RTT and reliability
let scoring = PeerScoring::new(ScoringConfig::default());
// Record RTT measurement
scoring.record_rtt(&peer_id, Duration::from_millis(15));
// Record failures (affects peer ranking)
scoring.record_failure(&peer_id);
// Get best peers for eager set (sorted by score)
let best_peers = scoring.best_peers(5);
Efficient connection management with per-peer message queues:
use memberlist_plumtree::{PooledTransport, PoolConfig};
let config = PoolConfig::default()
.with_queue_size(1024) // Per-peer queue capacity
.with_batch_size(32) // Messages per batch
.with_flush_interval(Duration::from_millis(10));
let transport = PooledTransport::new(config, inner_transport);
// Messages are automatically batched and sent efficiently
transport.send(peer_id, message).await?;
// Get pool statistics
let stats = transport.stats();
println!("Messages sent: {}, Queue pressure: {:.2}%",
stats.messages_sent, stats.queue_pressure * 100.0);
Dynamic batch size adjustment based on network conditions:
use memberlist_plumtree::{AdaptiveBatcher, BatcherConfig};
let batcher = AdaptiveBatcher::new(BatcherConfig::default());
// Record network feedback
batcher.record_message();
batcher.record_graft_received(); // Successful graft
batcher.record_graft_timeout(); // Failed graft
// Get recommended batch size (adjusts based on latency/success rate)
let batch_size = batcher.recommended_batch_size();
// Scale for cluster size
batcher.set_cluster_size_hint(1000);
// Get statistics
let stats = batcher.stats();
println!("Graft success rate: {:.1}%", stats.graft_success_rate * 100.0);
Lock-free rate tracking with adaptive cleanup intervals:
use memberlist_plumtree::{CleanupTuner, CleanupConfig};
let tuner = CleanupTuner::new(CleanupConfig::default());
// Record messages (fully lock-free, uses atomics only)
tuner.record_message();
tuner.record_messages(100); // Batch recording for high throughput
// Get tuned cleanup parameters based on current state
let params = tuner.get_parameters(cache_utilization, cache_ttl);
println!("Recommended interval: {:?}, batch_size: {}",
params.interval, params.batch_size);
// Check backpressure hint
match params.backpressure_hint() {
BackpressureHint::None => { /* Normal operation */ }
BackpressureHint::DropSome => { /* Consider dropping low-priority messages */ }
BackpressureHint::DropMost => { /* Drop non-critical messages */ }
BackpressureHint::BlockNew => { /* Temporarily block new messages */ }
}
// Get statistics with trend analysis
let stats = tuner.stats();
println!("Efficiency trend: {:?}, Pressure trend: {:?}",
stats.efficiency_trend, stats.pressure_trend);
High-performance deduplication with configurable sharding:
// Seen map is automatically managed by Plumtree
// Configure capacity limits in PlumtreeConfig
let config = PlumtreeConfig::default()
.with_seen_map_soft_cap(100_000) // Soft limit for entries
.with_seen_map_hard_cap(150_000); // Hard limit (emergency eviction)
// Get seen map statistics
let stats = plumtree.seen_map_stats();
println!("Utilization: {:.1}%, Entries: {}",
stats.utilization * 100.0, stats.total_entries);
When compiled with the metrics feature, comprehensive metrics are exposed:
plumtree_messages_broadcast_total - Total broadcasts initiatedplumtree_messages_delivered_total - Total messages deliveredplumtree_messages_duplicate_total - Duplicate messages receivedplumtree_gossip_sent_total - Gossip messages sentplumtree_ihave_sent_total - IHave messages sentplumtree_graft_sent_total - Graft messages sentplumtree_prune_sent_total - Prune messages sentplumtree_graft_success_total - Successful Graft requestsplumtree_graft_failed_total - Failed Graft requestsplumtree_graft_latency_seconds - Graft request to delivery latencyplumtree_message_size_bytes - Message payload size distributionplumtree_message_hops - Number of hops messages travelplumtree_ihave_batch_size - IHave batch size distributionplumtree_eager_peers - Current eager peer countplumtree_lazy_peers - Current lazy peer countplumtree_cache_size - Messages in cacheplumtree_pending_grafts - Pending Graft requestsImplement PlumtreeDelegate to receive protocol events:
impl PlumtreeDelegate for MyDelegate {
// Called when a message is delivered (first time received)
fn on_deliver(&self, message_id: MessageId, payload: Bytes) {
// Process the message
}
// Called when a peer is promoted to eager
fn on_eager_promotion(&self, peer: &[u8]) {
// Peer now receives full messages
}
// Called when a peer is demoted to lazy
fn on_lazy_demotion(&self, peer: &[u8]) {
// Peer now receives only IHave announcements
}
// Called when a Graft is sent (tree repair)
fn on_graft_sent(&self, peer: &[u8], message_id: &MessageId) {
// Requesting message from peer
}
// Called when a Prune is sent (tree optimization)
fn on_prune_sent(&self, peer: &[u8]) {
// Removing redundant path
}
}
This crate provides two main APIs:
Plumtree - Core ProtocolThe Plumtree struct implements the core Epidemic Broadcast Tree protocol. Use this when:
// Low-level API - you handle message transport
let (plumtree, handle) = Plumtree::new(node_id, config, delegate);
plumtree.handle_message(from_peer, message); // Manual message handling
PlumtreeMemberlist - Simplified IntegrationThe PlumtreeMemberlist struct wraps Plumtree with a simpler API designed for integration with memberlist. Use this when:
use memberlist_plumtree::{PlumtreeMemberlist, PlumtreeConfig, NoopDelegate};
// High-level API - simpler to use
let plumtree_ml = PlumtreeMemberlist::new(
node_id,
PlumtreeConfig::default(),
NoopDelegate,
);
// Add peers discovered via memberlist
plumtree_ml.add_peer(peer_id);
// Broadcast - message reaches all nodes via spanning tree
let msg_id = plumtree_ml.broadcast(b"cluster message").await?;
// Access statistics
let peer_stats = plumtree_ml.peer_stats();
let cache_stats = plumtree_ml.cache_stats();
Key difference: PlumtreeMemberlist is a convenience wrapper that provides:
broadcast())add_peer, remove_peer)MemberlistStack - Full Integration (Recommended)The MemberlistStack struct provides the complete integration stack, combining Plumtree with a real Memberlist instance for automatic SWIM-based peer discovery. This is the recommended entry point for production deployments:
use memberlist_plumtree::{
MemberlistStack, PlumtreeConfig, PlumtreeNodeDelegate, NoopDelegate,
};
use memberlist_core::{
transport::net::NetTransport,
Options,
};
use std::net::SocketAddr;
// Create your transport (e.g., NetTransport for real networking)
let transport = NetTransport::new(transport_config).await?;
// Create the stack with PlumtreeNodeDelegate wrapping your delegate
let delegate = PlumtreeNodeDelegate::new(
node_id.clone(),
NoopDelegate, // Or your custom PlumtreeDelegate
pm.clone(), // PlumtreeMemberlist reference for peer sync
);
// Build the complete stack
let stack = MemberlistStack::new(
pm, // PlumtreeMemberlist
memberlist, // Memberlist instance
advertise_addr,
);
// Join the cluster - just pass seed addresses!
let seeds: Vec<SocketAddr> = vec![
"192.168.1.10:7946".parse()?,
"192.168.1.11:7946".parse()?,
];
stack.join(&seeds).await?;
// Broadcast messages - automatically routes via spanning tree
let msg_id = stack.broadcast(b"Hello cluster!").await?;
// Get cluster status
println!("Members: {}", stack.num_members());
println!("Peers: {:?}", stack.peer_stats());
// Graceful shutdown
stack.leave().await?;
stack.shutdown().await?;
Key features of MemberlistStack:
add_peer() callsMaybeResolvedAddressPlumtreeNodeDelegate automatically syncs Plumtree peers when Memberlist membership changesThe "Lazarus" feature solves the Ghost Seed Problem: when a seed node fails and restarts, other nodes have already marked it as dead and stopped probing it. Without intervention, the restarted seed remains isolated.
How it works:
BridgeConfiguse memberlist_plumtree::{BridgeConfig, MemberlistStack};
use std::time::Duration;
use std::path::PathBuf;
// Configure Lazarus seed recovery
let config = BridgeConfig::new()
// Static seeds to monitor
.with_static_seeds(vec![
"192.168.1.100:7946".parse().unwrap(),
"192.168.1.101:7946".parse().unwrap(),
"192.168.1.102:7946".parse().unwrap(),
])
// Enable the Lazarus background task
.with_lazarus_enabled(true)
// Probe interval (default: 30 seconds)
.with_lazarus_interval(Duration::from_secs(30))
// Optional: persist known peers for crash recovery
.with_persistence_path(PathBuf::from("/var/lib/myapp/peers.txt"));
// After creating MemberlistStack, spawn the Lazarus task
let lazarus_handle = stack.spawn_lazarus_task(config);
// Monitor Lazarus statistics
let stats = lazarus_handle.stats();
println!("Probes sent: {}", stats.probes_sent);
println!("Reconnections: {}", stats.reconnections);
println!("Missing seeds: {}", stats.missing_seeds);
// Gracefully shutdown when done
lazarus_handle.shutdown();
BridgeConfig Parameters:
| Parameter | Default | Description |
|---|---|---|
static_seeds |
[] |
List of seed node addresses to monitor |
lazarus_enabled |
false |
Enable the Lazarus background task |
lazarus_interval |
30s | Interval between seed probes |
persistence_path |
None |
Path to persist known peers |
log_changes |
true |
Log topology changes |
auto_promote |
true |
Auto-promote new peers based on fanout |
For crash recovery, MemberlistStack can save known peer addresses to disk. Combined with static seeds, this provides robust bootstrap options:
use memberlist_plumtree::{persistence, BridgeConfig, MemberlistStack};
// Save current peers to file (call periodically or on shutdown)
stack.save_peers_to_file(&PathBuf::from("/var/lib/myapp/peers.txt")).await?;
// On startup, load bootstrap addresses from both sources
let config = BridgeConfig::new()
.with_static_seeds(vec!["192.168.1.100:7946".parse().unwrap()])
.with_persistence_path(PathBuf::from("/var/lib/myapp/peers.txt"));
// Combines static seeds + persisted peers (deduplicated)
let bootstrap_addrs = MemberlistStack::load_bootstrap_addresses(&config);
// Use bootstrap addresses to join the cluster
stack.join(&bootstrap_addrs).await?;
Persistence File Format:
# Comments are supported
192.168.1.100:7946
192.168.1.101:7946
192.168.1.102:7946
Important: Each node should use a unique persistence path to avoid conflicts:
let node_id = "node-42";
let path = PathBuf::from(format!("/var/lib/myapp/{}/peers.txt", node_id));
When to use each API:
| API | Use Case |
|---|---|
Plumtree |
Custom transport, fine-grained control |
PlumtreeMemberlist |
Manual peer management, testing |
MemberlistStack |
Production - full SWIM + Plumtree integration |
An interactive terminal-based chat with fault injection for testing:
cargo run --example chat
Features:
F - Toggle node offline/onlineL - Toggle 20% packet lossE - Promote all peers to eager (triggers prunes)R - Reset metricsM - Toggle metrics panelA web-based visualization with peer tree display:
cargo run --example web-chat
# Then open http://localhost:3000
Features:
examples/static/Web UI Layout:
┌─────────────────┬─────────────────────┬─────────────────┐
│ Peer Tree │ Chat Messages │ Event Log │
│ │ │ │
│ [U3] │ [12:34:56] U1: Hi │ [00:15] GOSSIP │
│ / \ │ [12:34:57] U2: Hey │ [00:16] PRUNE │
│ [U2] [U4] │ │ [00:17] PROMOTE │
│ (eager) (lazy) │ > Type message... │ │
│ │ │ ┌─────────────┐ │
│ Legend: │ │ │ Metrics │ │
│ ● Current │ │ │ Sent: 5 │ │
│ ● Eager │ │ │ Recv: 12 │ │
│ ● Lazy │ │ │ Grafts: 2 │ │
└─────────────────┴─────────────────────┴─────────────────┘
A topic-based publish/subscribe system using PlumtreeMemberlist:
cargo run --example pubsub
This demonstrates:
PlumtreeMemberlist for simplified broadcastingImportant: The pub/sub example uses application-layer filtering, not per-topic routing. This means:
┌─────────────────────────────────────────────────────────────────┐
│ How Pub/Sub Works │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Publisher ──broadcast──> ALL nodes via spanning tree │
│ │ │
│ ┌─────────┼─────────┐ │
│ ▼ ▼ ▼ │
│ Node A Node B Node C │
│ │ │ │ │
│ [subscribed] [not sub] [subscribed] │
│ │ │ │ │
│ PROCESS DISCARD PROCESS │
│ │
└─────────────────────────────────────────────────────────────────┘
on_deliver callback (application layer)For true per-topic routing (separate spanning tree per topic), you would need to:
PlumtreeMemberlist instances (one per topic), orThe application-layer approach is suitable for:
| Aspect | Complexity |
|---|---|
| Message broadcast | O(n) messages |
| Tree repair | O(1) per missing message |
| Memory per node | O(cache_size) |
| Latency | O(log n) hops typical |
This implementation includes several optimizations for large-scale deployments (10,000+ nodes):
Licensed under either of:
at your option.
Contributions are welcome! Please feel free to submit a Pull Request.