swap-queue

Crates.ioswap-queue
lib.rsswap-queue
version1.1.0
sourcesrc
created_at2021-10-02 07:57:25.833146
updated_at2021-11-21 05:29:12.984914
descriptionA lock-free thread-owned queue whereby tasks are taken by stealers in entirety via buffer swapping
homepage
repositoryhttps://github.com/Bajix/swap-queue-rs/
max_upload_size
id459354
size42,937
Thomas Sieverding (Bajix)

documentation

README

Swap Queue

License Cargo Documentation CI

A lock-free thread-owned queue whereby tasks are taken by stealers in entirety via buffer swapping. For batching use-cases, this has the advantage that all tasks can be taken as a single batch in constant time irregardless of batch size, whereas alternatives using crossbeam_deque::Worker and tokio::sync::mpsc need to collect each task separately and situationally lack a clear cutoff point. This design ensures that should you be waiting on a resource such as a connection to be available, that once it is so there is no further delay before a task batch can be processed. While push behavior alone is slower than crossbeam_deque::Worker and faster than tokio::sync::mpsc, overall batching performance is around ~11-19% faster than crossbeam_deque::Worker, and ~28-45% faster than tokio::sync::mpsc on ARM and there is never a slow cutoff between batches.

Example

use swap_queue::Worker;
use tokio::{
  runtime::Handle,
  sync::oneshot::{channel, Sender},
};

// Jemalloc makes this library substantially faster
#[global_allocator]
static GLOBAL: jemallocator::Jemalloc = jemallocator::Jemalloc;

// Worker needs to be thread local because it is !Sync
thread_local! {
  static QUEUE: Worker<(u64, Sender<u64>)> = Worker::new();
}

// This mechanism will batch optimally without overhead within an async-context because spawn will happen after things already scheduled
async fn push_echo(i: u64) -> u64 {
  {
    let (tx, rx) = channel();

    QUEUE.with(|queue| {
      // A new stealer is returned whenever the buffer is new or was empty
      if let Some(stealer) = queue.push((i, tx)) {
        Handle::current().spawn(async move {
          // Take the underlying buffer in entirety; the next push will return a new Stealer
          let batch = stealer.take().await;

          // Some sort of batched operation, such as a database query

          batch.into_iter().for_each(|(i, tx)| {
            tx.send(i).ok();
          });
        });
      }
    });

    rx
  }
  .await
  .unwrap()
}

Benchmarks

Benchmarks ran on t4g.medium using ami-06391d741144b83c2

Async Batching

Benchmarks, 64 tasks Benchmarks, 128 tasks Benchmarks, 256 tasks Benchmarks, 512 tasks Benchmarks, 1024 tasks

Push

Benchmarks, 1024 tasks

Batch collecting

Benchmarks, 1024 tasks

CI tested under ThreadSanitizer, LeakSanitizer, Miri and Loom.

Commit count: 45

cargo fmt