| Crates.io | concurrent-pqueue |
| lib.rs | concurrent-pqueue |
| version | 0.4.0 |
| created_at | 2025-09-06 20:57:08.050761+00 |
| updated_at | 2025-09-06 20:57:08.050761+00 |
| description | A high-performance, thread-safe priority queue with dynamic priority updates |
| homepage | |
| repository | https://github.com/dwayn/pqueue |
| max_upload_size | |
| id | 1827484 |
| size | 24,571 |
A high-performance, thread-safe priority queue implementation in Rust with support for dynamic priority updates. Designed for scenarios where you need to efficiently manage prioritized items with the ability to update priorities after insertion.
Arc<Mutex<T>> for safe concurrent access across multiple threadsBTreeMap for ordered access and HashMap for O(1) lookupsArc<T> to avoid unnecessary item cloningAdd this to your Cargo.toml:
[dependencies]
concurrent-pqueue = "0.4.0"
use concurrent_pqueue::PQueue;
fn main() {
let queue = PQueue::<String>::new();
// Add items with priorities
queue.update("task_1".to_string(), 10);
queue.update("task_2".to_string(), 20);
queue.update("urgent_task".to_string(), 30);
// Process highest priority item
if let Some(item) = queue.next() {
println!("Processing: {}", item); // "urgent_task"
}
// Peek without removing
if let Some(item) = queue.peek() {
println!("Next item: {}", item); // "task_2"
}
// Update priority (additive)
queue.update("task_1".to_string(), 15); // Now has priority 25
// Check current priority
if let Some(score) = queue.score(&"task_1".to_string()) {
println!("Current priority: {}", score); // 25
}
}
PQueue works with any type that implements Eq, Hash, and Clone. Here's an example with a custom struct:
use std::hash::Hash;
use concurrent_pqueue::PQueue;
#[derive(Clone, Debug, Eq)]
pub struct Task {
id: u64,
name: String,
category: String,
}
impl Hash for Task {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.id.hash(state); // Use ID as the unique identifier
}
}
impl PartialEq for Task {
fn eq(&self, other: &Self) -> bool {
self.id == other.id
}
}
fn main() {
let queue = PQueue::<Task>::new();
let task = Task {
id: 1,
name: "Process data".to_string(),
category: "computation".to_string(),
};
queue.update(task, 100);
if let Some(next_task) = queue.next() {
println!("Processing task: {}", next_task.name);
}
}
PQueue is designed for concurrent use. Simply clone the queue to share it across threads:
use concurrent_pqueue::PQueue;
use std::sync::Arc;
use tokio;
#[tokio::main]
async fn main() {
let queue = PQueue::<String>::new();
// Producer task
let producer_queue = queue.clone();
tokio::spawn(async move {
for i in 0..10 {
producer_queue.update(format!("task_{}", i), i * 10);
}
});
// Consumer task
let consumer_queue = queue.clone();
tokio::spawn(async move {
while let Some(item) = consumer_queue.next() {
println!("Processing: {}", item);
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
});
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
}
Track queue performance with built-in statistics:
use concurrent_pqueue::PQueue;
fn main() {
let queue = PQueue::<String>::new();
queue.update("item1".to_string(), 10);
queue.update("item2".to_string(), 20);
queue.next();
let stats = queue.stats();
println!("Queue Statistics:");
println!(" Uptime: {} seconds", stats.uptime.num_seconds());
println!(" Total updates: {}", stats.updates);
println!(" Items in queue: {}", stats.items);
println!(" Priority pools: {}", stats.pools);
println!(" Version: {}", stats.version);
}
new() -> PQueue<T> - Creates a new empty priority queueupdate(item: T, score: i64) -> (Option<i64>, i64) - Updates item priority (additive)next() -> Option<T> - Removes and returns highest priority itempeek() -> Option<T> - Returns highest priority item without removingscore(item: &T) -> Option<i64> - Gets current priority for an itemstats() -> PQueueStats - Returns queue statisticsThis repository includes a complete TCP server/client implementation demonstrating PQueue usage in a networked environment.
cargo run --bin pqueue_server -- --host 0.0.0.0 --port 8002 --debug
cargo run --bin pqueue_client -- --host localhost --port 8002 --debug
UPDATE <identifier> <score> # Updates priority (additive)
NEXT # Pops highest priority item
PEEK # Views highest priority item
SCORE <identifier> # Gets current priority
INFO # Server statistics
HELP # Command help
BTreeMap<i64, VecDeque<Arc<T>>> for ordered priority access + HashMap<Arc<T>, i64> for O(1) lookupsArc<Mutex<PriorityQueue<T>>> wrapper for safe concurrent accessArc<T> to minimize cloningchrono for timestamp handlingRun the test suite:
cargo test --workspace
The library includes comprehensive unit tests covering:
git checkout -b feature/amazing-feature)git commit -m 'Add some amazing feature')git push origin feature/amazing-feature)This project is licensed under the MIT License - see the LICENSE.txt file for details.