Crates.io | lf-shardedringbuf |
lib.rs | lf-shardedringbuf |
version | 4.1.0 |
created_at | 2025-07-06 14:44:35.402163+00 |
updated_at | 2025-08-11 18:15:10.885481+00 |
description | An async, lock-free, sharded, cache-aware SPSC/MPSC/MPMC ring buffer in Rust. |
homepage | https://github.com/asder8215/lf-shardedringbuf |
repository | https://github.com/asder8215/lf-shardedringbuf |
max_upload_size | |
id | 1740181 |
size | 635,709 |
sharded_ringbuf
instead:https://crates.io/crates/sharded_ringbuf
An async, lock-free, sharded, cache-aware SPSC/MPSC/MPMC ring buffer in Rust.
Sweep
, RandomAndSweep
, and ShiftBy
(see src/shard_policies.rs
for more info)tokio::time::sleep()
works on a millisecond granularity, which is unoptimal for this buffer)The following are examples of how to use LFShardedRingBuf:
If enqueuer and dequeuer tasks are done with a limited number of enqueue/dequeue operations:
let max_items = 1024;
let shards = 8;
let rb: Arc<LFShardedRingBuf<usize>> = Arc::new(LFShardedRingBuf::new(max_items, shards));
let mut deq_threads = Vec::with_capacity(MAX_TASKS);
let mut enq_threads = Vec::with_capacity(MAX_TASKS);
// Spawn enqueuer tasks with ShiftBy policy
for i in 0..MAX_TASKS {
let rb = Arc::clone(&rb);
let handler: tokio::task::JoinHandle<()> = spawn_buffer_task(
ShardPolicy::ShiftBy {
initial_index: Some(i),
shift: MAX_TASKS,
},
async move {
for i in 0..ITEM_PER_TASK {
rb.enqueue(i).await;
}
},
);
enq_threads.push(handler);
}
// Spawn dequeuer tasks with ShiftBy policy
for i in 0..MAX_TASKS {
let rb = Arc::clone(&rb);
let handler: tokio::task::JoinHandle<usize> = spawn_buffer_task(
ShardPolicy::ShiftBy {
initial_index: Some(i),
shift: MAX_TASKS,
},
async move {
let mut counter: usize = 0;
for _i in 0..ITEM_PER_TASK {
let item = rb.dequeue().await;
match item {
Some(_) => counter += 1,
None => break,
}
}
counter
},
);
deq_threads.push(handler);
}
// Wait for enqueuers
for enq in enq_threads {
enq.await.unwrap();
}
// Wait for dequeuers
for deq in deq_threads {
deq.await.unwrap();
}
If dequeuer tasks are performing in a loop and enqueuer task is performing with limited operations:
const MAX_ITEMS: usize = 100;
const MAX_SHARDS: usize = 10;
const MAX_TASKS: usize = 5;
let rb: Arc<LFShardedRingBuf<usize>> = Arc::new(LFShardedRingBuf::new(MAX_ITEMS, MAX_SHARDS));
let mut deq_threads = Vec::with_capacity(MAX_TASKS.try_into().unwrap());
let mut enq_threads = Vec::new();
// Spawn MAX_TASKS dequeuer tasks
for i in 0..MAX_TASKS {
let rb = Arc::clone(&rb);
let handler = spawn_buffer_task(
ShardPolicy::ShiftBy {
initial_index: Some(i),
shift: MAX_TASKS,
},
async move {
let rb = rb.clone();
let mut counter: usize = 0;
loop {
let item = rb.dequeue().await;
match item {
Some(_) => counter += 1,
None => break,
}
}
counter
},
);
deq_threads.push(handler);
}
// Spawn an enqueuer task with Sweep policy
{
let rb = Arc::clone(&rb);
let enq_handler = spawn_buffer_task(
ShardPolicy::Sweep {
initial_index: None,
},
async move {
let rb = rb.clone();
for _i in 0..2 * MAX_ITEMS {
rb.enqueue(20).await;
}
},
);
enq_threads.push(enq_handler);
}
// Wait for enqueuer tasks to complete first
for enq in enq_threads {
enq.await.unwrap();
}
// Poison for dequeuer tasks to exit gracefully, completing any remaining jobs
// on the buffer
rb.poison();
// Wait for dequeuers
for deq in deq_threads {
deq.await.unwrap();
}
If enqueuer tasks need be in a loop, you can use the async_stream
crate and hook up enqueuer tasks to a stream, where you can denote that a None
value returned by the stream means that it is done enqueuing.
I tried benchmarking this ring buffer (and comparing it with kanal async) with the following parameters:
Note I am running this on: Machine: AMD Ryzen 7 5800X 3.8 GHz 8-Core Processor Rust: rustc rustc 1.87.0 (17067e9ac 2025-05-09) OS: Windows 10 Date: July 11, 2025
The cargo benchmarking in GitHub Action may not reflect the full possibility of this buffer due to limited number of threads and small cache size. To check out previous cargo benchmarking results, you can look at benchmark_res
.
The following are timing results using cargo bench
with varying shards in the order mentioned above:
Without barrier synchronization:
kanal_async/1024 time: [22.742 ms 22.978 ms 23.212 ms]
4shard_buffer/1024 time: [12.270 ms 12.300 ms 12.336 ms]
8shard_buffer/1024 time: [14.272 ms 14.308 ms 14.355 ms]
16shard_buffer/1024 time: [18.579 ms 18.605 ms 18.632 ms]
32shard_buffer/1024 time: [23.438 ms 23.517 ms 23.600 ms]
64shard_buffer/1024 time: [33.723 ms 34.235 ms 34.768 ms]
128shard_buffer/1024 time: [25.435 ms 25.763 ms 26.098 ms]
256shard_buffer/1024 time: [21.358 ms 21.715 ms 22.081 ms]
With barrier synchronization on all tasks:
kanal_async/1024 time: [23.162 ms 23.440 ms 23.713 ms]
4shard_buffer/1024 time: [13.580 ms 13.949 ms 14.343 ms]
8shard_buffer/1024 time: [14.133 ms 14.173 ms 14.220 ms]
16shard_buffer/1024 time: [19.156 ms 19.262 ms 19.367 ms]
32shard_buffer/1024 time: [25.103 ms 25.231 ms 25.367 ms]
64shard_buffer/1024 time: [28.810 ms 29.260 ms 29.753 ms]
128shard_buffer/1024 time: [23.246 ms 23.786 ms 24.362 ms]
256shard_buffer/1024 time: [22.845 ms 23.274 ms 23.727 ms]
Assume we have X enqueuer tasks, Y dequeuer tasks.
Now let's assume the following (which is more likely in a real world Tokio project): 1000+ enqueuer tasks, 1000+ dequeuer tasks
self.shard_jobs[current].occupied.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed).is_ok()
. For example, introduce a SweepBy
and SweepAndShiftBy
policies so that the task is yielded through less attempts of acquiring a shard.All contributions (i.e. documentation, testing, providing feedback) are welcome! Check out CONTRIBUTING.md on how to start.
This project is licensed under the MIT License and Apache License.