| Crates.io | elasticq |
| lib.rs | elasticq |
| version | 0.3.0 |
| created_at | 2025-05-21 07:30:43.680743+00 |
| updated_at | 2025-12-11 10:33:40.60532+00 |
| description | Thread-safe, dynamically resizable queues with lock-based and lock-free implementations for high-throughput scenarios |
| homepage | https://github.com/jfabienke/elasticq |
| repository | https://github.com/jfabienke/elasticq |
| max_upload_size | |
| id | 1683006 |
| size | 434,834 |
A thread-safe, dynamically resizable circular buffer (queue) for Rust, designed for high-throughput scenarios. Now featuring both lock-based and lock-free implementations optimized for different use cases, plus advanced features like priority queues, async streams, persistence, and metrics.
push_batch and pop_batch methods for high-throughput.async feature for tokio-based asynchronous methods.parking_lot mutexes for synchronous operationstokio::sync mutexes for asynchronous operations via the async featurelock_free feature flagpriority feature)streams feature)Stream and Sink trait implementationsBufferChannel for channel-like send/recv APItokio-stream and futures ecosystempersistent feature)NoSync, Periodic, EveryWritemetrics feature)metrics crate (Prometheus-compatible)[dependencies]
elasticq = "0.3.0"
[dependencies]
elasticq = { version = "0.3.0", features = ["lock_free"] }
[dependencies]
elasticq = { version = "0.3.0", features = ["async"] }
tokio = { version = "1", features = ["sync", "time"] }
[dependencies]
elasticq = { version = "0.3.0", features = ["priority"] }
[dependencies]
elasticq = { version = "0.3.0", features = ["streams"] }
tokio = { version = "1", features = ["sync", "time", "rt"] }
[dependencies]
elasticq = { version = "0.3.0", features = ["persistent"] }
[dependencies]
elasticq = { version = "0.3.0", features = ["metrics"] }
[dependencies]
elasticq = { version = "0.3.0", features = ["async", "lock_free", "priority", "streams", "persistent", "metrics"] }
tokio = { version = "1", features = ["sync", "time", "rt"] }
use elasticq::{DynamicCircularBuffer, Config, BufferError};
fn main() -> Result<(), BufferError> {
// Create buffer with default configuration
let buffer = DynamicCircularBuffer::<i32>::new(Config::default())?;
// Push some items
buffer.push(10)?;
buffer.push(20)?;
println!("Buffer length: {}", buffer.len()); // Output: 2
// Pop an item
let item = buffer.pop()?;
assert_eq!(item, 10);
println!("Popped: {}", item);
// Batch operations for higher throughput
buffer.push_batch(vec![30, 40, 50])?;
let items = buffer.pop_batch(2)?;
assert_eq!(items, vec![20, 30]);
Ok(())
}
Perfect for MQTT proxy scenarios with multiple publishers and a single message processor:
use elasticq::{LockFreeMPSCQueue, Config, BufferError};
use std::sync::Arc;
use std::thread;
fn main() -> Result<(), BufferError> {
// Configure for MQTT proxy use case
let config = Config::default()
.with_initial_capacity(1024)
.with_max_capacity(1048576); // 1M messages max
let queue = Arc::new(LockFreeMPSCQueue::new(config)?);
// Multiple producers (MQTT publishers)
let mut producers = vec![];
for producer_id in 0..4 {
let queue_clone = Arc::clone(&queue);
let handle = thread::spawn(move || {
for i in 0..1000 {
let message = format!("msg_{}_{}", producer_id, i);
// Non-blocking enqueue with retry
while queue_clone.try_enqueue(message.clone()).is_err() {
thread::yield_now();
}
}
});
producers.push(handle);
}
// Single consumer (MQTT message processor)
let queue_clone = Arc::clone(&queue);
let consumer = thread::spawn(move || {
let mut processed = 0;
while processed < 4000 {
match queue_clone.try_dequeue() {
Ok(Some(message)) => {
// Process message
println!("Processing: {}", message);
processed += 1;
}
Ok(None) => thread::yield_now(), // Queue empty, yield
Err(_) => thread::yield_now(), // Resize in progress
}
}
});
// Wait for completion
for handle in producers {
handle.join().unwrap();
}
consumer.join().unwrap();
// Check statistics
let stats = queue.stats();
println!("Final stats: {:?}", stats);
Ok(())
}
Make sure you have enabled the async feature and have tokio as a dependency.
use elasticq::{DynamicCircularBuffer, Config, BufferError};
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), BufferError> {
let buffer = DynamicCircularBuffer::<String>::new(Config::default())?;
// Asynchronously push
buffer.push_async("hello".to_string()).await?;
buffer.push_async("world".to_string()).await?;
// Asynchronously pop with timeout
match buffer.pop_async_timeout(Duration::from_millis(100)).await {
Ok(item) => println!("Popped async: {}", item), // Expected: "hello"
Err(BufferError::Timeout(_)) => println!("Pop operation timed out"),
Err(e) => return Err(e),
}
// Async batch operations
let messages = vec!["batch1".to_string(), "batch2".to_string()];
buffer.push_batch_async(messages).await?;
let popped_batch = buffer.pop_batch_async(2).await?;
println!("Popped async batch: {:?}", popped_batch); // Expected: ["world", "batch1"]
// Attempt to pop from an empty buffer, which should return BufferError::Empty quickly
// (pop_async and pop_batch_async don't wait if buffer is empty)
match buffer.pop_batch_async_timeout(2, Duration::from_secs(1)).await {
Ok(items) if items.is_empty() => println!("Popped empty batch as expected after draining."),
// Ok(items) => println!("Unexpectedly popped items: {:?}", items), // This case might not be hit if Empty is preferred
Err(BufferError::Empty) => println!("Buffer empty as expected."),
Err(e) => return Err(e),
}
Ok(())
}
Perfect for MQTT QoS handling where messages have different priority levels:
use elasticq::priority::{PriorityCircularBuffer, PriorityConfig};
use elasticq::BufferError;
fn main() -> Result<(), BufferError> {
// Create a priority queue with 3 levels (matching MQTT QoS 0, 1, 2)
let config = PriorityConfig::default()
.with_priority_levels(3)
.with_fair_queuing(true) // Prevent low-priority starvation
.with_max_consecutive_per_priority(5); // Process max 5 messages per priority before switching
let queue = PriorityCircularBuffer::<String>::new(config)?;
// Push messages with different priorities
queue.push_with_priority("QoS 0 message".to_string(), 0)?; // Low priority
queue.push_with_priority("QoS 1 message".to_string(), 1)?; // Medium priority
queue.push_with_priority("QoS 2 message".to_string(), 2)?; // High priority (exactly once)
// Pop returns highest priority first
assert_eq!(queue.pop()?, "QoS 2 message".to_string());
assert_eq!(queue.pop()?, "QoS 1 message".to_string());
assert_eq!(queue.pop()?, "QoS 0 message".to_string());
// Check per-priority statistics
let stats = queue.stats();
println!("Priority stats: {:?}", stats);
Ok(())
}
Integrate with the async Rust ecosystem using Stream and Sink traits:
use elasticq::{DynamicCircularBuffer, Config};
use elasticq::streams::{BufferStream, BufferSink, BufferChannel, BufferStreamExt};
use std::sync::Arc;
use std::time::Duration;
#[tokio::main]
async fn main() {
let buffer = Arc::new(DynamicCircularBuffer::<i32>::new(Config::default()).unwrap());
// Option 1: Use BufferChannel for channel-like API
let channel = BufferChannel::new(buffer.clone());
// Spawn a producer
let sender = channel.clone();
tokio::spawn(async move {
for i in 0..10 {
sender.send(i).await.unwrap();
}
});
// Consume messages
for _ in 0..10 {
let msg = channel.recv_timeout(Duration::from_secs(1)).await.unwrap();
println!("Received: {}", msg);
}
// Option 2: Use Stream/Sink pair with shared notify
let buffer2 = Arc::new(DynamicCircularBuffer::<i32>::new(Config::default()).unwrap());
let (stream, sink) = buffer2.stream_sink_pair();
// The sink notifies the stream when items are pushed
sink.send(42).await.unwrap();
sink.send_batch(vec![1, 2, 3]).await.unwrap();
}
Enable crash recovery with write-ahead logging:
use elasticq::persistent::{PersistentCircularBuffer, PersistentConfig, SyncMode};
use elasticq::BufferError;
use std::path::Path;
fn main() -> Result<(), BufferError> {
let config = PersistentConfig::default()
.with_file_path("/tmp/queue.dat")
.with_sync_mode(SyncMode::Periodic(std::time::Duration::from_secs(1)))
.with_max_log_entries(10000);
// Create persistent buffer (recovers data if file exists)
let buffer = PersistentCircularBuffer::<String>::new(config)?;
// Push messages (persisted to disk)
buffer.push("message 1".to_string())?;
buffer.push("message 2".to_string())?;
// Pop messages
let msg = buffer.pop()?;
println!("Got: {}", msg);
// Force sync to disk
buffer.sync()?;
// Compact the log file (removes processed entries)
buffer.compact()?;
// Check persistence stats
let stats = buffer.stats();
println!("Persistence stats: {:?}", stats);
Ok(())
}
Monitor your queues with Prometheus-compatible metrics:
use elasticq::{DynamicCircularBuffer, Config};
use elasticq::metrics::{MetricsRecorder, InstrumentedBuffer};
use std::sync::Arc;
fn main() {
// Create a metrics recorder
let recorder = MetricsRecorder::new("mqtt_broker");
// Create an instrumented buffer
let buffer = Arc::new(DynamicCircularBuffer::<String>::new(Config::default()).unwrap());
let instrumented = InstrumentedBuffer::new(buffer, recorder);
// All operations are automatically recorded
instrumented.push("message".to_string()).unwrap();
let _ = instrumented.pop().unwrap();
// Or wrap an existing buffer reference
let buffer2 = DynamicCircularBuffer::<i32>::new(Config::default()).unwrap();
let recorder2 = MetricsRecorder::new("events");
let instrumented_ref = recorder2.instrument(&buffer2);
instrumented_ref.push(42).unwrap();
// Metrics are exported via the metrics crate facade
// Use metrics-exporter-prometheus or similar to expose them
}
The buffer's behavior can be customized using the Config struct:
use elasticq::Config; // Note: If Config is public, it's from elasticq directly.
// If it's meant to be constructed differently, adjust this example.
use std::time::Duration;
let config = Config::default()
.with_initial_capacity(512) // Initial number of elements the buffer can hold
.with_min_capacity(256) // Minimum capacity the buffer will shrink to
.with_max_capacity(8192) // Maximum capacity the buffer will grow to
.with_growth_factor(1.5) // Factor by which capacity increases (e.g., 1.5 = 50% increase)
.with_shrink_threshold(0.3) // Shrink if usage is <= 30% of current capacity
.with_pop_timeout(Duration::from_secs(5)) // Default pop timeout (currently not auto-used by methods)
.with_push_timeout(Duration::from_secs(5)); // Default push timeout (currently not auto-used by methods)
// Important: Ensure config is valid before creating the buffer!
// `DynamicCircularBuffer::new(config)` will validate it and return `Err(BufferError::InvalidConfiguration)` if not.
// Key rules:
// - initial_capacity must be between min_capacity and max_capacity.
// - min_capacity cannot be greater than max_capacity.
// - Capacities must be > 0.
// - growth_factor must be > 1.0.
// - shrink_threshold must be between 0.0 and 1.0 (exclusive).
The push_timeout and pop_timeout fields in Config are placeholders for potential future enhancements; currently, timeout methods require an explicit Duration argument.
DynamicCircularBuffer<T>)new(config: Config) -> Result<Self, BufferError>: Creates a new buffer.push(&self, item: T) -> Result<(), BufferError>pop(&self) -> Result<T, BufferError>push_batch(&self, items: Vec<T>) -> Result<(), BufferError>pop_batch(&self, max_items: usize) -> Result<Vec<T>, BufferError>async feature enabled): push_async, pop_async, push_batch_async, pop_batch_async, and *_timeout versions.len(), is_empty(), capacity(), clear(), iter() -> Vec<T> (clones items), drain() -> Vec<T> (consumes items).priority feature)PriorityCircularBuffer<T>: Multi-level priority queuePriorityConfig: Configuration with with_priority_levels(), with_fair_queuing(), with_max_consecutive_per_priority()push_with_priority(&self, item: T, priority: usize): Push with specific prioritypop(&self): Pop highest priority item (with fair queuing if enabled)pop_from_priority(&self, priority: usize): Pop from specific priority levelstats(&self) -> PriorityStats: Per-priority statisticsstreams feature)BufferStream<T>: Implements futures_core::StreamBufferSink<T>: For sending items with send() and send_batch()BufferChannel<T>: Channel-like API with send(), recv(), recv_timeout()BufferStreamExt: Extension trait adding stream_sink_pair() to bufferspersistent feature)PersistentCircularBuffer<T>: File-backed buffer with crash recoveryPersistentConfig: Configuration with with_file_path(), with_sync_mode(), with_max_log_entries()SyncMode: NoSync, Periodic(Duration), EveryWritesync(&self): Force sync to diskcompact(&self): Compact the write-ahead logstats(&self) -> PersistentStats: Persistence statisticsmetrics feature)MetricsRecorder: Records queue metrics with configurable queue nameInstrumentedBuffer<T>: Wrapper that auto-records all operationsInstrumentedBufferRef<T>: Wrapper for borrowed buffer referencesinstrument(&self, buffer: &B): Wrap a buffer for metrics recordingmessages_enqueued, messages_dequeued, queue_depth, queue_capacity, operation_duration_secondsPerformance benchmarks were conducted on a Mac Studio with M1 Ultra (20 CPU cores). Results demonstrate significant improvements with the lock-free implementation.
| Implementation | Single-Threaded | 4 Producers | Advantages |
|---|---|---|---|
| Lock-Free | 46.6M msg/sec | Varies | Wait-free operations, no deadlocks |
| Lock-Based | 22.0M msg/sec | Stable | Predictable under high contention |
| Speedup | ๐ 2.1x | Scenario-dependent | Lock-free wins for MPSC patterns |
Recent benchmarks (1-20 producers, single consumer) demonstrate excellent scalability characteristics:
Throughput (K msg/sec)
500K โค
โ
450K โค โโโ
โ โ โ
400K โค โ โ
โ โ โ
350K โค โ โโโ
โ โ โ
300K โค โ โโ
โ โ โโ
250K โค โ โโโ
โ โ
200K โค โโโโ
โ โ
150K โค
โ โ
100K โค
โ
50K โค
โ
0 โโฌโโโโโโฌโโโโโโฌโโโโโโฌโโโโโโฌโโโโโโฌโโโโโโฌโโโโโโฌโโโโโโฌโโโโโโฌโโโโโโฌโโโโโโฌโโโโ
1 3 5 7 9 11 13 15 17 19 21
Producer Count
| Producers | Zone | Throughput | Success Rate | Use Case |
|---|---|---|---|---|
| 1-4 | Linear Scale | 68K - 260K msg/sec | 100.0% | Real-time systems |
| 5-9 | Peak Zone | 319K - 479K msg/sec | 99.9% - 100% | MQTT proxy optimal |
| 10-16 | Plateau | 285K - 471K msg/sec | 97.2% - 99.9% | High-load scenarios |
| 17-20 | Decline | 205K - 259K msg/sec | 94.9% - 96.8% | Consider sharding |
Real-world MQTT proxy simulation (4 publishers โ 1 processor):
| Benchmark | Time | Throughput |
|---|---|---|
| push_pop_high_priority | 89 ns | ~11.2M ops/sec |
| push_pop_low_priority | 115 ns | ~8.7M ops/sec |
| mixed_priorities_strict | 310 ns | ~3.2M ops/sec |
| mixed_priorities_fair | 341 ns | ~2.9M ops/sec |
| batch_10 | 659 ns | 15.2M elem/sec |
| batch_100 | 5.96 ยตs | 16.8M elem/sec |
| batch_1000 | 59.6 ยตs | 16.8M elem/sec |
| 1000_ops_mixed | 110 ยตs | 9.1M elem/sec |
| Sync Mode | Single Op | Batch 100 | Notes |
|---|---|---|---|
| NoSync | 19.9 ยตs | 2.1 ms | Fastest - no fsync |
| Periodic | 22.0 ยตs | - | Background sync every 100ms |
| EveryWrite | 2.3 ms | - | Full durability guarantee |
| Operation | Baseline | Instrumented | Overhead |
|---|---|---|---|
| Single push/pop | 120 ns | 670 ns | ~5.6x |
| Batch 10 | 153 ns | 720 ns | ~4.7x |
| Batch 100 | 270 ns | 849 ns | ~3.1x |
| Batch 1000 | 1.57 ยตs | 2.17 ยตs | ~1.4x |
ElasticQ includes an extensive test suite that validates correctness, performance, and safety:
proptestloomโ
Zero Critical Bugs - All race conditions and data corruption issues resolved
โ
Perfect Message Conservation - Mathematical guarantee of no phantom messages
โ
Memory Safety - Comprehensive epoch-based garbage collection testing
โ
Performance Validated - 2.1x improvement over lock-based implementation verified
โ
Warning-Free Compilation - Clean codebase with zero compiler warnings
The lock-free implementation includes TLA+ formal specifications located in tla+/ directory:
LockFreeMPSCQueue.tla - Complete formal model of the lock-free algorithmTo run verification:
# Requires TLA+ tools installation
tlc LockFreeMPSCQueue.tla -config LockFreeMPSCQueue.cfg
let config = Config::default()
.with_initial_capacity(1024) // Start with 1K messages
.with_max_capacity(1048576) // Allow up to 1M messages
.with_growth_factor(2.0) // Double capacity when full
.with_min_capacity(512); // Shrink to 512 minimum
let config = Config::default()
.with_initial_capacity(8192) // Larger initial buffer
.with_max_capacity(16777216) // 16M message capacity
.with_growth_factor(1.5) // Moderate growth
.with_shrink_threshold(0.25); // Shrink when 25% utilized
Mutex around the internal VecDeque and an RwLock for its logical capacity. Additionally, push_lock: Mutex<()> and pop_lock: Mutex<()> serialize all push operations against each other and all pop operations against each other. This design prioritizes correctness by ensuring that complex sequences like resize/shrink decisions and actions are atomic with respect to other operations of the same kind.push_lock and pop_lock are the primary reason for limited scalability beyond a few concurrent threads for single-item operations.len(), is_empty(), and capacity() are synchronous. When the async feature is enabled (and thus tokio::sync locks are used internally), these methods use blocking_lock() (or equivalent). This means they can block an async runtime if called from one and the lock is heavily contended. For critical async paths, use with awareness.iter() Performance: iter() clones all items in the buffer. This can be costly for large buffers or items that are expensive to clone. drain() is more efficient if items are to be consumed and removed.Contributions are welcome! Please feel free to submit issues or pull requests. For major changes, please open an issue first to discuss your proposed changes.
# Run all tests
cargo test
# Run with lock-free feature
cargo test --features lock_free
# Run tests for new v0.3.0 features
cargo test --features priority
cargo test --features streams
cargo test --features persistent
cargo test --features metrics
# Run all feature tests
cargo test --all-features
# Run benchmarks
cargo bench
# Run lock-free vs lock-based benchmarks
cargo bench --features lock_free
# Run TLA+ verification (requires TLA+ tools)
cd tla+ && tlc LockFreeMPSCQueue.tla -config LockFreeMPSCQueue.cfg
# Run examples
cargo run --example lock_free_demo --features lock_free
cargo run --example performance_summary --features lock_free
This project is licensed under the MIT License. Please see the LICENSE file in the repository for the full license text.