tokio-sticky-channel

Crates.iotokio-sticky-channel
lib.rstokio-sticky-channel
version0.1.3
created_at2025-07-17 05:58:19.278558+00
updated_at2025-07-17 08:28:56.519232+00
descriptionSticky channel pattern for Tokio - routes messages to specific receivers based on ID hash for consistent message delivery
homepagehttps://github.com/devashishdxt/tokio-sticky-channel
repositoryhttps://github.com/devashishdxt/tokio-sticky-channel
max_upload_size
id1757104
size77,731
Devashish Dixit (devashishdxt)

documentation

https://docs.rs/tokio-sticky-channel

README

tokio-sticky-channel

A sticky channel implementation for Tokio that routes messages to specific receivers based on ID hashing.

This crate provides message passing channels where messages with the same ID are consistently delivered to the same receiver. This is useful for workload distribution scenarios where you need to ensure that related messages are processed by the same consumer for ordering guarantees or stateful processing.

Key Features

  • Deterministic routing: Messages with the same ID always go to the same receiver
  • Multiple producers: Senders can be cloned and used from multiple threads
  • Async and sync receiving: Support for both async and non-blocking receive operations
  • Cancel-safe: All operations work correctly with tokio::select!
  • Bounded and unbounded channels: Choose between memory-bounded or unbounded channels

Channel Types

Unbounded Sticky Channel

use tokio_sticky_channel::unbounded_sticky_channel;
use std::num::NonZeroUsize;

#[tokio::main]
async fn main() {
    // Create an unbounded sticky channel with 3 consumers
    let (sender, mut receivers) = unbounded_sticky_channel::<&str, i32>(
        NonZeroUsize::new(3).unwrap()
    );

    // Send messages with IDs - same ID always goes to same receiver
    sender.send("user-123", 42).unwrap();
    sender.send("user-456", 24).unwrap();
    sender.send("user-123", 84).unwrap(); // Same receiver as first message

    // Receive messages from different consumers
    for receiver in &mut receivers {
        if let Some(message) = receiver.try_recv().ok() {
            println!("Received: {}", message);
        }
    }
}

Bounded Sticky Channel

use tokio_sticky_channel::sticky_channel;
use std::num::NonZeroUsize;

#[tokio::main]
async fn main() {
    // Create a bounded sticky channel with 3 consumers and capacity of 100 per channel
    let (sender, mut receivers) = sticky_channel::<&str, i32>(
        NonZeroUsize::new(3).unwrap(),
        100
    );

    // Send messages with IDs - will block if target channel is full
    sender.send("user-123", 42).await.unwrap();
    sender.send("user-456", 24).await.unwrap();

    // Try to send without blocking - returns error if channel is full
    match sender.try_send("user-789", 99) {
        Ok(_) => println!("Message sent successfully"),
        Err(e) => println!("Failed to send: {}", e),
    }

    // Drop sender to close channels
    drop(sender);

    // Receive messages from all receivers
    for receiver in &mut receivers {
        while let Some(message) = receiver.recv().await {
            println!("Received: {}", message);
        }
    }
}

Architecture

The sticky channel uses consistent hashing to route messages:

  1. Senders: Compute hash(id) % num_consumers to determine the target receiver
  2. Internal channels: Each consumer has its own MPSC channel (bounded or unbounded)
  3. Receivers: Wrap Tokio's receivers with additional convenience methods

Performance Considerations

  • Unbounded channels: Memory usage can grow if consumers can't keep up
  • Bounded channels: Provide backpressure but may block senders when full
  • Hashing overhead: Each send operation computes a hash of the ID
  • Load distribution: Hash distribution may not be perfectly even across consumers

License

Licensed under either of

at your option.

Contribution

Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.

Commit count: 0

cargo fmt