| Crates.io | qml-rs |
| lib.rs | qml-rs |
| version | 1.0.1 |
| created_at | 2025-07-18 10:39:42.808298+00 |
| updated_at | 2025-11-26 08:43:07.327888+00 |
| description | A Rust implementation of QML background job processing |
| homepage | |
| repository | https://github.com/queue-me-later/qml |
| max_upload_size | |
| id | 1758851 |
| size | 442,346 |
A production-ready Rust implementation of QML background job processing, designed for high-performance, reliability, and scalability.
qml is a complete, enterprise-grade background job processing system with:
Add to your Cargo.toml:
[dependencies]
qml-rs = "0.1.0"
# Enable PostgreSQL support
qml-rs = { version = "1.0.0", features = ["postgres"] }
Enqueued β Processing β Succeeded | FailedScheduled β Enqueued (time-based activation)AwaitingRetry β Enqueued (retry logic)Deleted (soft deletion with audit trail)SELECT FOR UPDATE SKIP LOCKED with dedicated lock tableuse qml_rs::{
BackgroundJobServer, Job, MemoryStorage, ServerConfig,
Worker, WorkerContext, WorkerResult, WorkerRegistry
};
use async_trait::async_trait;
use std::sync::Arc;
// Define a worker
struct EmailWorker;
#[async_trait]
impl Worker for EmailWorker {
async fn execute(&self, job: &Job, _context: &WorkerContext) -> Result<WorkerResult, qml::QmlError> {
let email = &job.arguments[0];
println!("Sending email to: {}", email);
// Email sending logic here
Ok(WorkerResult::success())
}
fn method_name(&self) -> &str {
"send_email"
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Setup storage and worker registry
let storage = Arc::new(MemoryStorage::new());
let mut registry = WorkerRegistry::new();
registry.register(Box::new(EmailWorker));
// Create job and enqueue
let job = Job::new("send_email", vec!["user@example.com".to_string()]);
storage.enqueue(&job).await?;
// Start job server
let config = ServerConfig::new("server-1").worker_count(4);
let server = BackgroundJobServer::new(storage, Arc::new(registry), config).await?;
server.start().await?;
println!("Job server running! Check the dashboard at http://localhost:8080");
// Server runs until stopped
tokio::signal::ctrl_c().await?;
server.stop().await?;
Ok(())
}
use qml_rs::{PostgresConfig, PostgresStorage, StorageInstance};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Configure PostgreSQL storage
let config = PostgresConfig::new()
.with_database_url("postgresql://postgres:password@localhost:5432/qml")
.with_auto_migrate(true)
.with_max_connections(10);
// Create storage instance
let storage = StorageInstance::postgres(config).await?;
// Storage is ready for production use
println!("PostgreSQL storage initialized with migrations!");
Ok(())
}
use qml_rs::{RedisConfig, RedisStorage, StorageInstance};
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Configure Redis storage
let config = RedisConfig::new()
.with_url("redis://localhost:6379")
.with_pool_size(20)
.with_command_timeout(Duration::from_secs(5))
.with_key_prefix("myapp:jobs");
// Create storage instance
let storage = StorageInstance::redis(config).await?;
println!("Redis storage ready for distributed processing!");
Ok(())
}
use qml_rs::{
BackgroundJobServer, DashboardServer, Job, PostgresConfig,
ServerConfig, StorageInstance, WorkerRegistry
};
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Production PostgreSQL setup
let storage_config = PostgresConfig::new()
.with_database_url(std::env::var("DATABASE_URL")?)
.with_auto_migrate(true)
.with_max_connections(50)
.with_min_connections(5);
let storage = Arc::new(StorageInstance::postgres(storage_config).await?);
// Setup workers and server
let registry = Arc::new(setup_worker_registry());
let server_config = ServerConfig::new("production-server")
.worker_count(20)
.queues(vec!["critical".to_string(), "normal".to_string(), "bulk".to_string()]);
// Start job processing server
let job_server = BackgroundJobServer::new(storage.clone(), registry, server_config).await?;
// Start web dashboard
let dashboard = DashboardServer::new(storage.clone()).await?;
// Start both servers
tokio::try_join!(
job_server.start(),
dashboard.start("0.0.0.0:8080")
)?;
Ok(())
}
fn setup_worker_registry() -> WorkerRegistry {
let mut registry = WorkerRegistry::new();
// Register your workers here
registry
}
QML provides comprehensive automated migration support for PostgreSQL with zero-configuration setup and production-ready patterns.
use qml_rs::{PostgresConfig, PostgresStorage};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Just provide a database URL - migrations run automatically!
let storage = PostgresStorage::new(
PostgresConfig::new()
.with_database_url("postgresql://user:pass@localhost/db")
.with_auto_migrate(true) // Default: enabled
).await?;
println!("Database ready with schema!");
Ok(())
}
// Auto-migrate everything on startup
let config = PostgresConfig::new()
.with_database_url(database_url)
.with_auto_migrate(true); // Enabled by default
let storage = PostgresStorage::new(config).await?; // Migrations run automatically
// Manual migration control for production safety
let config = PostgresConfig::new()
.with_database_url(database_url)
.with_auto_migrate(false); // Disable auto-migration
let storage = PostgresStorage::new(config).await?;
// Run migrations explicitly when ready
storage.migrate().await?;
// Fast setup for tests with automatic cleanup
let config = PostgresConfig::new()
.with_database_url(test_database_url)
.with_auto_migrate(true)
.with_max_connections(2) // Minimal resources
.with_min_connections(1);
let storage = PostgresStorage::new(config).await?;
The library automatically detects when migrations are needed:
// Check if schema exists before operations
if !storage.schema_exists().await? {
println!("Schema not found, migrations needed");
storage.migrate().await?;
}
// Only run migrations if actually needed
let migration_needed = storage.migrate_if_needed().await?;
if migration_needed {
println!("Migrations were applied");
} else {
println!("Schema already up to date");
}
use qml_rs::{PostgresStorage, StorageError, PostgresConfig};
async fn robust_initialization(database_url: String) -> Result<PostgresStorage, Box<dyn std::error::Error>> {
let config = PostgresConfig::new()
.with_database_url(database_url)
.with_auto_migrate(true);
match PostgresStorage::new(config).await {
Ok(storage) => {
// Verify schema after initialization
if storage.schema_exists().await? {
Ok(storage)
} else {
// Force migration if schema still missing
storage.migrate().await?;
Ok(storage)
}
}
Err(StorageError::MigrationError { message }) => {
eprintln!("Migration failed: {}", message);
Err("Database initialization failed".into())
}
Err(e) => Err(Box::new(e)),
}
}
QML now uses an embedded schema approach - no external migration files needed!
The complete PostgreSQL schema is embedded directly in the binary as install.sql and only requires the postgres feature to be enabled:
// Schema installation happens automatically or manually
let storage = PostgresStorage::new(
PostgresConfig::new()
.with_database_url(database_url)
.with_auto_migrate(true) // Installs embedded schema automatically
).await?;
The embedded install.sql includes everything needed for production:
# Database configuration
export DATABASE_URL="postgresql://user:pass@localhost:5432/qml"
export QML_MAX_CONNECTIONS="20"
export QML_MIN_CONNECTIONS="2"
export QML_AUTO_MIGRATE="true" # Enable embedded schema auto-installation
let config = PostgresConfig::new()
.with_database_url(database_url)
.with_auto_migrate(true) // Enable embedded schema installation
.with_max_connections(20)
.with_min_connections(2)
.with_connect_timeout(Duration::from_secs(10))
.with_command_timeout(Duration::from_secs(30))
.with_schema_name("qml")
.with_table_name("qml_jobs");
features = ["postgres"]// Deploy with auto_migrate=false for production safety
let config = PostgresConfig::new()
.with_auto_migrate(false);
// Install embedded schema manually during deployment
let storage = PostgresStorage::new(config).await?;
storage.migrate().await?; // Installs complete embedded schema
async fn health_check(storage: &PostgresStorage) -> Result<(), Box<dyn std::error::Error>> {
// Check schema exists
if !storage.schema_exists().await? {
return Err("Schema missing".into());
}
// Test basic operation
storage.get_job_count("default").await?;
Ok(())
}
// Only migrate if specific conditions are met
let should_migrate = !storage.schema_exists().await? ||
std::env::var("FORCE_MIGRATION").is_ok();
if should_migrate {
storage.migrate().await?;
}
use tracing::{info, warn, error};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Enable detailed migration logging
tracing_subscriber::fmt::init();
let storage = PostgresStorage::new(config).await?;
// Migration logs will be automatically emitted
Ok(())
}
| Feature | Memory | Redis | PostgreSQL |
|---|---|---|---|
| Performance | Ultra Fast | Fast | Good |
| Persistence | None | Durable | ACID |
| Scalability | Single Node | Distributed | Horizontal |
| Locking | Mutex | Distributed | Row-level |
| Production Ready | Development | β | β |
| Use Case | Testing | High Traffic | Enterprise |
βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ
β Web Dashboard β β Job Client β β Worker Nodes β
β (WebSocket) β β β β β
βββββββββββ¬ββββββββ βββββββββββ¬ββββββββ βββββββββββ¬ββββββββ
β β β
ββββββββββββββββββββββββΌβββββββββββββββββββββββ
β
βββββββββββββββ΄ββββββββββββββ
β Storage Layer β
β β
β βββββββ βββββββ βββββββ β
β βMem β βRedisβ βPgSQLβ β
β βββββββ βββββββ βββββββ β
βββββββββββββββββββββββββββββ
# All tests
cargo test
# Race condition tests only
cargo test test_locking
# With Redis/PostgreSQL (requires running services)
cargo test --features postgres
# Stress test
cargo test test_high_concurrency_stress
# Basic job creation and serialization
cargo run --example basic_job
# Multi-backend storage operations
cargo run --example storage_demo
# Real-time dashboard with WebSocket
cargo run --example dashboard_demo
# Complete job processing with workers
cargo run --example processing_demo
# PostgreSQL setup and operations
cargo run --example postgres_simple
# Comprehensive automated migration demo with embedded schema
cargo run --example automated_migration --features postgres
# Embedded schema installation patterns
cargo run --example custom_migrations --features postgres
The automated_migration.rs example demonstrates the new embedded schema approach:
// Multiple migration strategies using embedded schema
pub enum MigrationStrategy {
Development, // Auto-install embedded schema
Production, // Manual embedded schema control
Testing, // Minimal resources with embedded schema
}
// DatabaseManager with embedded schema installation
let database_manager = DatabaseManager::new(
database_url,
MigrationStrategy::Development
).await?;
// Schema installation and health checks
database_manager.ensure_schema().await?;
database_manager.health_check().await?;
The example includes:
After running the dashboard example:
The QML library now includes comprehensive automated migration functionality:
migrations/20250719000001_initial_schema.sql - Complete QML schema with indexes and triggersmigrations/20250719000002_add_job_locking.sql - Advanced job locking for distributed processingsrc/storage/postgres.rs - Enhanced with schema_exists(), migrate_if_needed(), error handlingexamples/automated_migration.rs - Comprehensive migration patterns demosrc/error.rs - Added MigrationError variant for consistency// Automatic schema detection
storage.schema_exists().await? // Check if schema exists
storage.migrate_if_needed().await? // Smart migration logic
storage.migrate().await? // Force migration
// Error handling
PostgresStorage::new(config).await? // Auto-migrate on init (if enabled)
CREATE DATABASE qml;
CREATE USER qml_user WITH PASSWORD 'secure_password';
GRANT ALL PRIVILEGES ON DATABASE qml TO qml_user;
export DATABASE_URL="postgresql://qml_user:secure_password@localhost:5432/qml"
export RUST_LOG=info
export QML_WORKERS=20
version: "3.8"
services:
postgres:
image: postgres:15
environment:
POSTGRES_DB: qml
POSTGRES_USER: qml_user
POSTGRES_PASSWORD: secure_password
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
qml-app:
build: .
environment:
DATABASE_URL: postgresql://qml_user:secure_password@postgres:5432/qml
QML_WORKERS: 20
depends_on:
- postgres
ports:
- "8080:8080"
volumes:
postgres_data:
# Redis with persistence
docker run -d --name redis \
-p 6379:6379 \
redis:7-alpine redis-server --appendonly yes
apiVersion: apps/v1
kind: Deployment
metadata:
name: qml-workers
spec:
replicas: 3
selector:
matchLabels:
app: qml-workers
template:
metadata:
labels:
app: qml-workers
spec:
containers:
- name: qml
image: your-registry/qml-app:latest
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: qml-secrets
key: database-url
- name: QML_WORKERS
value: "10"
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
let config = ServerConfig::new("production-server")
.worker_count(20) // Number of worker threads
.polling_interval(Duration::from_secs(1)) // Job fetch frequency
.job_timeout(Duration::from_secs(300)) // Per-job timeout
.queues(vec!["critical", "normal"]) // Queue priorities
.fetch_batch_size(10) // Jobs per fetch
.enable_scheduler(true); // Time-based scheduling
// PostgreSQL Production Config
let pg_config = PostgresConfig::new()
.with_database_url("postgresql://...")
.with_max_connections(50)
.with_min_connections(5)
.with_connect_timeout(Duration::from_secs(10))
.with_auto_migrate(true);
// Redis Production Config
let redis_config = RedisConfig::new()
.with_url("redis://cluster:6379")
.with_pool_size(20)
.with_command_timeout(Duration::from_secs(5))
.with_key_prefix("myapp:jobs")
.with_completed_job_ttl(Duration::from_secs(86400)); // 24h
qml is production-ready! The next phase focuses on:
We welcome contributions of all kinds! Whether you're fixing bugs, adding features, improving documentation, or enhancing tests, your help makes qml better for everyone.
Please see our Contributing Guide for detailed information on:
# Fork and clone the repository
git clone https://github.com/yourusername/qml.git
cd qml
# Install dependencies and run tests
cargo build
cargo test
# Start development with watch mode
cargo install cargo-watch
cargo watch -x test
For questions or help getting started, please open an issue with the "question" label.
This README now contains all comprehensive documentation previously spread across multiple files:
β οΈ IMPORTANT: This library includes placeholder development credentials in src/storage/settings.rs for testing and examples. These are clearly marked as development-only and should NEVER be used in production:
dev_password_change_me - Development PostgreSQL password placeholder.env.example)The library follows security best practices and is safe for public repositories when proper production configuration is used.
Licensed under either of:
at your option.
Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.
qml: Production-ready background job processing for Rust applications.