qoxide

Crates.ioqoxide
lib.rsqoxide
version1.3.0
created_at2025-12-02 00:29:17.210259+00
updated_at2025-12-05 04:59:46.614761+00
descriptionA lightweight local job queue backed by SQLite
homepage
repositoryhttps://github.com/Haizzz/qoxide
max_upload_size
id1960914
size67,183
Anh Le (Haizzz)

documentation

README

Qoxide

A lightweight local job queue built in Rust, backed by SQLite.

Guiding Principles

  • Simple - Minimal API surface, easy to understand
  • Fast - SQLite with WAL mode, indexed queries
  • Predictable - FIFO ordering, atomic operations

Features

  • In-memory or file-based persistence
  • SQLite backend with WAL mode for file-based queues
  • Binary payload support (arbitrary Vec<u8>)
  • Atomic reserve-complete/fail workflow
  • Configurable max attempts with dead letter queue

Installation

Add to your Cargo.toml:

[dependencies]
qoxide = "1.0"

Usage

Basic Example

use qoxide::QoxideQueue;

// Create an in-memory queue
let mut queue = QoxideQueue::new();

// Add a message
let id = queue.add(b"my job data".to_vec())?;

// Reserve the next pending message (atomic)
let (id, payload) = queue.reserve()?;

// Process the job...

// Mark as complete on success
queue.complete(id)?;

// Or mark as failed to return to pending state
queue.fail(id)?;

With Builder

use qoxide::QoxideQueue;

// Configure with builder pattern
let mut queue = QoxideQueue::builder()
    .path("./my_queue.db")  // optional: file-backed persistence
    .max_attempts(3)         // optional: move to DLQ after 3 failed attempts
    .build();

let id = queue.add(b"job".to_vec())?;
let (id, _) = queue.reserve()?;

// After 3 failed attempts, message moves to DLQ
let state = queue.fail(id)?;

// Inspect dead letters
let dead_ids = queue.dead_letters()?;
for id in &dead_ids {
    let payload = queue.get(*id)?;
    // Process dead letter...
}

// Requeue or remove dead letters
queue.requeue_dead_letters(&dead_ids)?;
for id in dead_ids {
    queue.remove(id)?;
}

Queue Inspection

let sizes = queue.size()?;
println!("Total: {}", sizes.total);
println!("Pending: {}", sizes.pending);
println!("Reserved: {}", sizes.reserved);
println!("Completed: {}", sizes.completed);
println!("Dead: {}", sizes.dead);

API Reference

Method Description
QoxideQueue::new() Create in-memory queue
QoxideQueue::builder() Create queue with builder pattern
builder.path(path) Set file path for persistence
builder.max_attempts(n) Set max attempts before DLQ
builder.build() Build the queue
add(payload) Add message, returns message ID
reserve() Atomically reserve next pending message
complete(id) Mark message as completed
fail(id) Fail message (requeue or move to DLQ)
get(id) Get payload by message ID
remove(id) Remove a message permanently
size() Get queue size breakdown by state
dead_letters() Get IDs of all dead letter messages
requeue_dead_letters(&[ids]) Move dead letters back to pending

Message States

PENDING → RESERVED → COMPLETED
            ↓
         (fail)
            ↓
    ┌───────┴───────┐
    ↓               ↓
 PENDING          DEAD
(retry)      (max retries)
  • Pending: Message is waiting to be processed
  • Reserved: Message is being processed by a worker
  • Completed: Message has been successfully processed
  • Dead: Message exceeded max attempts (dead letter queue)

Behaviour

Ordering

Messages are processed in FIFO order. reserve() always returns the oldest pending message.

Atomicity

The reserve() operation is atomic - it selects and updates the message state in a single SQL statement using UPDATE ... RETURNING, preventing race conditions.

Persistence

  • In-memory (:memory:): Data is lost when the queue is dropped
  • File-backed: Uses SQLite WAL mode for better concurrent read performance

Attempts

  • No limit (default): fail() always returns message to pending
  • With max attempts: fail() moves message to DLQ after n failed attempts

Limitations

  • Write contention: SQLite allows only one writer at a time. Multi-process access works but may block under heavy write load
  • No visibility timeout: Reserved messages stay reserved forever until explicitly completed or failed. If a worker crashes, messages must be manually recovered
  • No message priorities: Strictly FIFO ordering
  • No delayed/scheduled messages: Messages are immediately available
  • No TTL/expiration: Messages never expire automatically
  • Completed messages are not cleaned up: Use remove() to clean up

Scaling

What works well

  • High throughput for single-writer scenarios
  • Large payloads (up to 1MB+ tested in benchmarks)
  • Queue sizes of 100k+ messages

What doesn't scale

  • Multiple concurrent writers (SQLite write lock contention)
  • Distributed workers (single SQLite file)
  • Very high QPS requirements (>10k/sec may hit SQLite limits)

Recommendations

  • For multi-process: Use one queue per process or implement connection pooling
  • For distributed: Consider Redis, RabbitMQ, or other distributed queues
  • For high throughput: Batch operations where possible

Benchmarks

Run benchmarks with:

cargo bench

Benchmarks include:

  • queue_add: Single message enqueue
  • queue_add_large_payload: 1MB payload enqueue
  • queue_reserve: Reserve from queues of 1k, 10k, 100k messages
  • queue_interactions: Full add→reserve→fail→reserve→complete cycle

Development

# Run tests
cargo test

# Run benchmarks
cargo bench

Roadmap

  • Retry count / max attempts
  • Dead letter queue (DLQ)
  • Visibility timeout (auto-return reserved messages after timeout)
  • Delayed/scheduled messages
  • Priority queues
  • Message TTL / expiration
  • Batch operations
  • Message deduplication
  • Named queues

License

MIT License - see LICENSE for details.

Commit count: 0

cargo fmt