//! Testing that job order works as expected. #![allow(clippy::expect_used)] mod common; use std::sync::atomic::{AtomicUsize, Ordering}; use bonsaidb::local::{ config::{Builder, StorageConfiguration}, AsyncDatabase, }; use bonsaimq::{job_registry, CurrentJob, JobRegister, JobRunner, MessageQueueSchema}; use color_eyre::Result; /// Counter static COUNT: AtomicUsize = AtomicUsize::new(0); /// Ordered counting handler. async fn counter(mut job: CurrentJob) -> Result<()> { let expected = job.payload_bytes().expect("byte payload")[0]; let nr = COUNT.fetch_add(1, Ordering::SeqCst); assert_eq!(nr, expected as usize); job.complete().await?; Ok(()) } job_registry!(JobRegistry, { Counter: "counter" => counter, }); #[tokio::test] #[ntest::timeout(60000)] async fn job_order() -> Result<()> { common::init(); let db_path = "ordering-test.bonsaidb"; tokio::fs::remove_dir_all(db_path).await.ok(); let db = AsyncDatabase::open::(StorageConfiguration::new(db_path)).await?; let job_runner = JobRunner::new(db.clone()).run::(); let n = 100_u8; let mut jobs = Vec::with_capacity(n as usize); for i in 0..n { let job_id = JobRegistry::Counter.builder().payload_bytes(vec![i]).spawn(&db).await?; jobs.push(job_id); } // Wait for jobs to finish for job_id in jobs { bonsaimq::await_job(job_id, 100, &db).await?; } let value = COUNT.load(Ordering::SeqCst); assert_eq!(value, n as usize); job_runner.abort(); tokio::fs::remove_dir_all(db_path).await?; Ok(()) }