| Crates.io | batched-queue |
| lib.rs | batched-queue |
| version | 0.1.0-alpha.1 |
| created_at | 2025-05-11 04:52:14.876974+00 |
| updated_at | 2025-05-29 09:17:11.525743+00 |
| description | A high-performance, highly-concurrent batched queue implementation for Rust. |
| homepage | https://github.com/SeedyROM/batched-queue |
| repository | https://github.com/SeedyROM/batched-queue |
| max_upload_size | |
| id | 1669071 |
| size | 96,882 |
batched-queueA high-performance, highly-concurrent batched queue implementation for Rust.
batched-queue provides an efficient way to collect individual items into batches for processing, which can significantly improve throughput in high-volume systems. The library offers both synchronous and asynchronous implementations, making it suitable for a wide range of applications.
parking_lot and crossbeam-channeltokio (via feature flag)Add batched-queue to your Cargo.toml:
[dependencies]
batched-queue = "0.1.0"
sync (default): Enables the synchronous implementation using parking_lot and crossbeam-channelasync: Enables the asynchronous implementation using tokioTo use the async implementation:
[dependencies]
batched-queue = { version = "0.1.0", default-features = false, features = ["async"] }
use batched_queue::{BatchedQueue, BatchedQueueTrait};
fn main() {
// Create a queue with batch size of 10
let queue = BatchedQueue::new(10).expect("Failed to create queue");
// Create a sender that can be shared across threads
let sender = queue.create_sender();
// Push items to the queue
for i in 0..25 {
sender.push(i).expect("Failed to push item");
}
// Flush any remaining items that haven't formed a complete batch
sender.flush().expect("Failed to flush queue");
// Process batches
while let Ok(batch) = queue.try_next_batch() {
println!("Processing batch of {} items", batch.len());
for item in batch {
println!(" Item: {}", item);
}
}
}
use batched_queue::{BatchedQueue, BatchedQueueTrait};
use std::thread;
use std::time::Duration;
fn main() {
// Create a queue with batch size of 5
let queue = BatchedQueue::new(5).expect("Failed to create queue");
// Create a sender that can be shared across threads
let sender = queue.create_sender();
// Producer thread
let producer = thread::spawn(move || {
for i in 0..100 {
sender.push(i).expect("Failed to push item");
thread::sleep(Duration::from_millis(5));
}
sender.flush().expect("Failed to flush queue"); // Send any remaining items
});
// Consumer thread
let consumer = thread::spawn(move || {
let mut all_items = Vec::new();
// Process batches as they become available
while all_items.len() < 100 {
if let Some(batch) = queue.next_batch_timeout(Duration::from_millis(100)) {
println!("Received batch of {} items", batch.len());
all_items.extend(batch);
}
}
all_items
});
// Wait for threads to complete
producer.join().unwrap();
let result = consumer.join().unwrap();
println!("Processed {} items in total", result.len());
}
use batched_queue::{BatchedQueue, BatchedQueueTrait};
fn main() {
// Create a queue with batch size 10 and at most 5 batches in the channel
let queue = BatchedQueue::new_bounded(10, 5).expect("Failed to create bounded queue");
// When the channel is full, producers will block when attempting to send a full batch
// This provides automatic backpressure to control memory usage
let sender = queue.create_sender();
// ... use queue as normal
}
batched-queue is designed for high performance in concurrent environments:
This project is licensed under the MIT License - see the LICENSE file for details.
Contributions are welcome! Please feel free to submit a Pull Request.