| Crates.io | disk_backed_queue |
| lib.rs | disk_backed_queue |
| version | 0.1.1 |
| created_at | 2025-12-01 12:06:24.611687+00 |
| updated_at | 2025-12-01 12:06:24.611687+00 |
| description | A robust, crash-resistant queue implementation that persists all data to disk using SQLite |
| homepage | |
| repository | https://github.com/12932/disk-backed-queue |
| max_upload_size | |
| id | 1959709 |
| size | 120,627 |
A robust, crash-resistant queue implementation for Rust that persists all data to disk using SQLite. Provides an mpsc-like channel API while ensuring messages survive application restarts and system failures.
Perfect for scenarios where message loss is unacceptable:
[dependencies]
disk_backed_queue = "*"
tokio = { version = "*", features = ["rt-multi-thread", "macros"] } # For async runtime
serde = { version = "*", features = ["derive"] } # For message serialization
use disk_backed_queue::disk_backed_channel;
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug)]
struct MyMessage {
id: u64,
content: String,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create a disk-backed channel
let (tx, mut rx) = disk_backed_channel::<MyMessage, _>(
"my_queue.db",
"messages".to_string(),
None // No size limit (or Some(100_000) for max 100k messages)
).await?;
// Send messages
tx.send(MyMessage {
id: 1,
content: "Hello, world!".to_string(),
}).await?;
// Receive messages
if let Some(msg) = rx.recv().await? {
println!("Received: {:?}", msg);
}
Ok(())
}
For high-throughput scenarios, use batch operations for massive performance gains:
// Send batch (460x faster than individual sends)
let messages: Vec<MyMessage> = (0..1000)
.map(|i| MyMessage { id: i, content: format!("Message {}", i) })
.collect();
tx.send_batch(messages).await?;
// Receive batch
let received = rx.recv_batch(100).await?; // Get up to 100 messages
println!("Received {} messages", received.len());
Performance: Batch operations are dramatically faster than single operations. Run cargo run --example throughput_demo --release to benchmark on your hardware.
.dlq.db file for corrupted/undeserializable messagesmy_queue.db # Main queue database
├── messages table
│ ├── id (PRIMARY KEY)
│ ├── data (BLOB) # Bincode-serialized message
│ └── created_at (INT) # Unix timestamp
my_queue.dlq.db # Dead letter queue
└── messages_dlq table
├── id (PRIMARY KEY)
├── original_id (INT)
├── data (BLOB)
├── error_message (TEXT)
├── created_at (INT)
└── moved_at (INT)
When messages fail to deserialize (schema changes, corruption, etc.), they're automatically moved to the DLQ instead of blocking the queue.
# Open the DLQ database
sqlite3 my_queue.dlq.db
# View corrupted messages
SELECT * FROM messages_dlq ORDER BY moved_at DESC;
# Count corrupted messages
SELECT COUNT(*) FROM messages_dlq;
# View error patterns
SELECT error_message, COUNT(*) as count
FROM messages_dlq
GROUP BY error_message
ORDER BY count DESC;
# Export corrupted messages for analysis
sqlite3 my_queue.dlq.db "SELECT data FROM messages_dlq" > corrupted.bin
# Clear old DLQ entries after investigation
sqlite3 my_queue.dlq.db "DELETE FROM messages_dlq WHERE moved_at < strftime('%s', 'now', '-7 days')"
# Clear entire DLQ
sqlite3 my_queue.dlq.db "DELETE FROM messages_dlq"
pub async fn disk_backed_channel<T, P: AsRef<Path>>(
db_path: P,
table_name: String,
max_size: Option<usize>,
) -> Result<(DiskBackedSender<T>, DiskBackedReceiver<T>)>
// Send single message
async fn send(&self, item: T) -> Result<()>
// Send batch (much faster)
async fn send_batch(&self, items: Vec<T>) -> Result<()>
// Blocking send (for sync contexts)
fn blocking_send(&self, item: T) -> Result<()>
// Receive single message (returns None if empty)
async fn recv(&mut self) -> Result<Option<T>>
// Receive batch (returns empty Vec if no messages)
async fn recv_batch(&mut self, limit: usize) -> Result<Vec<T>>
// Queue status
async fn len(&self) -> Result<usize>
async fn is_empty(&self) -> Result<bool>
For advanced use cases, you can use DiskBackedQueue directly:
use disk_backed_queue::DiskBackedQueue;
let queue = DiskBackedQueue::new("queue.db", "table".to_string(), None).await?;
// All sender/receiver methods available
queue.send(message).await?;
queue.clear().await?; // Clear all messages
// Unlimited queue (default)
let (tx, rx) = disk_backed_channel("queue.db", "messages".to_string(), None).await?;
// Limited queue (blocks senders when full)
let (tx, rx) = disk_backed_channel("queue.db", "messages".to_string(), Some(100_000)).await?;
When the queue reaches max size, send() will block with exponential backoff until space becomes available.
Control the trade-off between performance and data safety:
use disk_backed_queue::{disk_backed_channel_with_durability, DurabilityLevel};
// Maximum safety (default) - survives power loss
let (tx, rx) = disk_backed_channel_with_durability(
"queue.db",
"messages".to_string(),
None,
DurabilityLevel::Full,
).await?;
// Balanced performance and safety
let (tx, rx) = disk_backed_channel_with_durability(
"queue.db",
"messages".to_string(),
None,
DurabilityLevel::Normal,
).await?;
// Maximum performance - no power loss protection
let (tx, rx) = disk_backed_channel_with_durability(
"queue.db",
"messages".to_string(),
None,
DurabilityLevel::Off,
).await?;
| Durability Level | Performance Impact | Power Loss Safety | SQLite Mode | Use Case |
|---|---|---|---|---|
Full (default) |
Baseline | ✅ Maximum | SYNCHRONOUS=FULL |
Critical data, financial transactions |
Normal |
(untested) | ⚠️ Good | SYNCHRONOUS=NORMAL |
Balanced performance, high-throughput caching |
Off |
(untested) | ❌ None | SYNCHRONOUS=OFF |
Cache/ephemeral data only |
Extra |
(untested) | ✅✅ Paranoid | SYNCHRONOUS=EXTRA |
Regulatory compliance |
Note: Batch operations have similar throughput across all durability levels since fsync cost is amortized. Run the throughput demo to measure actual performance differences on your system.
Table names must:
use disk_backed_queue::DiskQueueError;
match rx.recv().await {
Ok(Some(msg)) => {
// Process message
},
Ok(None) => {
// Queue is empty
},
Err(DiskQueueError::Deserialization(e)) => {
// Message was corrupted and moved to DLQ
eprintln!("Corrupted message moved to DLQ: {}", e);
// Continue processing - queue is not blocked
},
Err(e) => {
// Other errors (database, I/O, etc.)
eprintln!("Queue error: {}", e);
}
}
Database(rusqlite::Error) - SQLite database errorsSerialization(bincode::error::EncodeError) - Failed to serialize messageDeserialization(bincode::error::DecodeError) - Corrupted message (moved to DLQ)InvalidTableName(String) - Invalid table name providedTaskJoin(String) - Internal async task errorQueueClosed - Queue was closedUnexpectedRowCount(String) - Database consistency errorQueueFull(usize) - Queue reached size limit (only with max_size)Performance varies significantly based on your hardware (SSD vs HDD), operating system, and filesystem.
Key Insight: Single operations are limited by fsync overhead. Batch operations use a single transaction, achieving 50-500x speedup depending on your system.
To measure on your hardware:
cargo run --example throughput_demo --release
This will show you actual throughput for single and batch operations with different durability levels on your specific system.
send_batch()# Run tests
cargo test
# Run benchmarks
cargo bench
# Run throughput demo
cargo run --example throughput_demo --release
Sender: Clone + Send + Sync - Safe to clone and share across threads
Receiver: Send but not Sync - Use from a single async task
Internal: All SQLite operations use spawn_blocking to avoid blocking async executor
MIT
Contributions welcome! Areas for improvement: