Crates.io | bonsaimq |
lib.rs | bonsaimq |
version | 0.3.0 |
source | src |
created_at | 2022-05-29 12:56:59.753791 |
updated_at | 2023-12-21 21:16:03.942268 |
description | Message/job queue based on bonsaidb, similar to sqlxmq. |
homepage | https://github.com/FlixCoder/bonsaimq |
repository | https://github.com/FlixCoder/bonsaimq |
max_upload_size | |
id | 596503 |
size | 132,246 |
Simple database message queue based on bonsaidb.
The project is highly influenced by sqlxmq.
Warning: This project is in early alpha and should not be used in production!
Import the project using:
cargo add bonsaimq
or
# adjust the version to the latest version:
bonsaimq = "0.3.0"
# or
bonsaimq = { git = "https://github.com/FlixCoder/bonsaimq" }
Then you can use the message/job queue as follows:
CurrentJob
and return nothing. CurrentJob
allows interfacing the job to retrieve job input or complete the job etc.job_regristy!
needs to be use to create a job registry, which maps message names/types to the job handlers and allows spawning new jobs.Besides the following simple example, see the examples in the examples folder and take a look at the tests.
use bonsaidb::local::{
config::{Builder, StorageConfiguration},
AsyncDatabase,
};
use bonsaimq::{job_registry, CurrentJob, JobRegister, JobRunner, MessageQueueSchema};
use color_eyre::Result;
/// Example job function. It receives a handle to the current job, which gives
/// the ability to get the input payload, complete the job and more.
async fn greet(mut job: CurrentJob) -> color_eyre::Result<()> {
// Load the JSON payload and make sure it is there.
let name: String = job.payload_json().expect("input should be given")?;
println!("Hello {name}!");
job.complete().await?;
Ok(())
}
// The JobRegistry provides a way to spawn new jobs and provides the interface
// for the JobRunner to find the functions to execute for the jobs.
job_registry!(JobRegistry, {
Greetings: "greet" => greet,
});
#[tokio::main]
async fn main() -> Result<()> {
// Open a local database for this example.
let db_path = "simple-doc-example.bonsaidb";
let db = AsyncDatabase::open::<MessageQueueSchema>(StorageConfiguration::new(db_path)).await?;
// Start the job runner to execute jobs from the messages in the queue in the
// database.
let job_runner = JobRunner::new(db.clone()).run::<JobRegistry>();
// Spawn new jobs via a message on the database queue.
let job_id = JobRegistry::Greetings.builder().payload_json("cats")?.spawn(&db).await?;
// Wait for job to finish execution, polling every 100 ms.
bonsaimq::await_job(job_id, 100, &db).await?;
// Clean up.
job_runner.abort(); // Is done automatically on drop.
tokio::fs::remove_dir_all(db_path).await?;
Ok(())
}
This projects uses a bunch of clippy lints for higher code quality and style.
Install cargo-lints
using cargo install --git https://github.com/FlixCoder/cargo-lints
. The lints are defined in lints.toml
and can be checked by running cargo lints clippy --all-targets --workspace
.