Crates.io | sharded_ringbuf |
lib.rs | sharded_ringbuf |
version | 0.1.2 |
created_at | 2025-08-11 19:36:35.504894+00 |
updated_at | 2025-08-11 21:36:31.575018+00 |
description | A Tokio async, sharded SPSC/MPSC/MPMC ring buffer in Rust. |
homepage | https://github.com/asder8215/sharded_ringbuf |
repository | https://github.com/asder8215/sharded_ringbuf |
max_upload_size | |
id | 1790738 |
size | 719,827 |
A Tokio async, sharded SPSC/MPSC/MPMC ring buffer in Rust.
What this buffer does in particular is can allow for the usage of multiple simpler inner ring buffers (denoted as shards) each with capacity = requested capacity / requested # of shards (uneven shards are supported if capacity is not divisible by # of shards). It uses Tokio's task local variables as a shard index reference and to remember the Shard Acquisition policy strategy (i.e. Sweep, ShiftBy, RandomAndSweep, etc.) that the task is using to effectively acquire a shard to enqueue/dequeue on and as a way to inform when a task should context switch.
The essential goal of this buffer is to have a really effective MPMC queue underneath a Tokio async runtime, so that multiple threads can perform work independently in shards in a nonblocking manner. It can also be used in a SPSC/MPSC manner, but mostly with MPMC in mind.
The following are examples of how to use ShardedRingBuf:
If enqueuer and dequeuer tasks are done with a limited number of enqueue/dequeue operations:
let max_items: usize = 1024;
let shards = 8;
let task_count = 100;
let rb = Arc::new(ShardedRingBuf::new(max_items, shards));
let mut deq_tasks = Vec::with_capacity(shards);
let mut enq_tasks = Vec::with_capacity(task_count);
// spawn enq tasks with shift by policy
for i in 0..task_count {
let handle = spawn_enqueuer_with_iterator(
rb.clone(),
ShardPolicy::ShiftBy {
initial_index: Some(i),
shift: task_count,
},
0..max_items,
);
enq_tasks.push(handle);
}
// spawn deq tasks with shift by policy
for i in 0..task_count {
let handle = spawn_dequeuer_bounded(
rb.clone(),
max_items,
ShardPolicy::ShiftBy {
initial_index: Some(i),
shift: shards,
},
|_| {
// can put a function here optionally
// if some work needs to be performed
// on dequeued item
},
);
deq_tasks.push(handle);
}
// Wait for enqueuers
for enq in enq_tasks {
enq.await.unwrap();
}
// Wait for dequeuers
for deq in deq_tasks {
deq.await.unwrap();
}
If dequeuer_full tasks are performing in a loop and enqueuer task is performing with limited operations:
let max_items: usize = 1024;
let shards = 8;
let rb = Arc::new(ShardedRingBuf::new(max_items, shards));
let mut deq_tasks = Vec::with_capacity(shards);
let mut enq_tasks = Vec::with_capacity(task_count);
// spawn enq tasks with shift by policy
for i in 0..task_count {
let handle = spawn_enqueuer_with_iterator(
rb.clone(),
ShardPolicy::ShiftBy {
initial_index: Some(i),
shift: task_count,
},
0..max_items,
);
enq_tasks.push(handle);
}
// spawn deq_full tasks with shift by policy
for i in 0..shards {
let handle = spawn_dequeuer_full_unbounded(
rb.clone(),
ShardPolicy::ShiftBy {
initial_index: Some(i),
shift: shards,
},
|_| {
// can put a function here optionally
// if some work needs to be performed
// on dequeued item
},
);
deq_tasks.push(handle);
}
// Wait for enqueuers
for enq in enq_tasks {
enq.await.unwrap();
}
rb.poison();
// Wait for dequeuers
for deq in deq_tasks {
deq.await.unwrap();
}
You can also take a look at the tests/
or benches/
directory to see examples on how to use this structure.
Benchmark results and plots still needs to be collected thoroughly to see how this structure operates across different number of shards, threads, enqueuer/dequeuer tasks, and capacity.
Waker
to minimize the number of context switching performed within this structure.All contributions (i.e. documentation, testing, providing feedback) are welcome! Check out CONTRIBUTING.md on how to start.
This project is licensed under the MIT License and Apache License.