Crates.io | deadqueue |
lib.rs | deadqueue |
version | 0.2.4 |
source | src |
created_at | 2020-01-21 02:09:04.122019 |
updated_at | 2022-11-07 17:14:06.927716 |
description | Dead simple async queue |
homepage | |
repository | https://github.com/bikeshedder/deadqueue |
max_upload_size | |
id | 200620 |
size | 47,195 |
Deadqueue is a dead simple async queue with back pressure support.
This crate provides three implementations:
Unlimited (deadqueue::unlimited::Queue
)
crossbeam_queue::SegQueue
unlimited
feature in your Cargo.toml
Resizable (deadqueue::resizable::Queue
)
deadqueue::unlimited::Queue
resizable
feature in your Cargo.toml
Limited (deadqueue::limited::Queue
)
crossbeam_queue::ArrayQueue
limited
feature in your Cargo.toml
Feature | Description | Extra dependencies | Default |
---|---|---|---|
unlimited |
Enable unlimited queue implementation | – | yes |
resizable |
Enable resizable queue implementation | deadqueue/unlimited |
yes |
limited |
Enable limited queue implementation | – | yes |
use std::sync::Arc;
use tokio::time::{sleep, Duration};
const TASK_COUNT: usize = 1000;
const WORKER_COUNT: usize = 10;
type TaskQueue = deadqueue::limited::Queue<usize>;
#[tokio::main]
async fn main() {
let queue = Arc::new(TaskQueue::new(TASK_COUNT));
for i in 0..TASK_COUNT {
queue.try_push(i).unwrap();
}
for worker in 0..WORKER_COUNT {
let queue = queue.clone();
tokio::spawn(async move {
loop {
let task = queue.pop().await;
println!("worker[{}] processing task[{}] ...", worker, task);
}
});
}
while queue.len() > 0 {
println!("Waiting for workers to finish...");
sleep(Duration::from_millis(100)).await;
}
println!("All tasks done. :-)");
}
Deadqueue is by no means the only queue implementation available. It does things a little different and provides features that other implementations are lacking:
Resizable queue. Usually you have to pick between limited
and unlimited
queues. This crate features a resizable
Queue which can be resized as needed. This is probably a big unique selling point of this crate.
Introspection support. The methods .len()
, .capacity()
and .available()
provide access the current state of the queue.
Fair scheduling. Tasks calling pop
will receive items in a first-come-first-serve fashion. This is mainly due to the use of tokio::sync::Semaphore
which is fair by nature.
One struct, not two. The channels of tokio
, async_std
and futures-intrusive
split the queue in two structs (Sender
and Receiver
) which makes the usage sligthly more complicated.
Bring your own Arc
. Since there is no separation between Sender
and Receiver
there is also no need for an internal Arc
. (All implementations that split the channel into a Sender
and Receiver
need some kind of Arc
internally.)
Fully concurrent access. No need to wrap the Receiver
part in a Mutex
. All methods support concurrent accesswithout the need for an additional synchronization primitive.
Support for try__
methods. The methods try_push
and try_pop
can be used to access the queue from non-blocking synchroneous code.
Crate | Limitations | Documentation |
---|---|---|
tokio |
No resizable queue. No introspection support. Synchronization of Receiver needed. |
tokio::sync::mpsc::channel , tokio::sync::mpsc::unbounded_channel |
async-std |
No resizable or unlimited queue. No introspection support. No try_send or try_recv methods. |
async_std::sync::channel |
futures |
No resizable queue. No introspection support. | futures::channel::mpsc::channel , futures::channel::mpsc::unbounded |
Licensed under either of
at your option.