use super::*; use std::sync::{ atomic::{AtomicUsize, Ordering}, Arc, }; use std::thread; use std::time::Duration; #[derive(Clone)] struct UnitHandler {} impl dbq::Handler for UnitHandler { type Error = std::io::Error; fn handle(&self, _ctx: dbq::JobContext) -> std::result::Result<(), Self::Error> { Ok(()) } } #[derive(Clone)] struct ErrHandler {} impl dbq::Handler for ErrHandler { type Error = std::io::Error; fn handle(&self, _ctx: dbq::JobContext) -> std::result::Result<(), Self::Error> { Err(std::io::Error::new(std::io::ErrorKind::Other, "job error")) } } #[derive(Clone)] struct PanicHandler {} impl dbq::Handler for PanicHandler { type Error = std::io::Error; fn handle(&self, _ctx: dbq::JobContext) -> std::result::Result<(), Self::Error> { panic!("panic!"); } } #[derive(Clone)] struct CountHandler { total: Arc, } impl CountHandler { fn new() -> CountHandler { CountHandler { total: Arc::new(AtomicUsize::new(0)), } } fn total(&self) -> usize { self.total.load(Ordering::SeqCst) } } impl dbq::Handler for CountHandler { type Error = std::io::Error; fn handle(&self, _ctx: dbq::JobContext) -> std::result::Result<(), Self::Error> { self.total.fetch_add(1, Ordering::SeqCst); Ok(()) } } #[test] fn test_single_worker() { let schema_config = init(); let queue_name = "test_single_worker"; let queue = dbq::Queue::new(schema_config, queue_name.to_string()); let conn = db_conn(); queue.clear(&conn).unwrap(); let _job_id = enqueue_test_job(&queue, 1, &conn); let connect_params = db_connect_params(); let handler = CountHandler::new(); let mut config = dbq::WorkerPoolConfig::new(queue, connect_params, handler.clone()).unwrap(); config.set_num_workers(1); let pool = dbq::WorkerPool::start(config); thread::sleep(Duration::new(0, 20_000_000)); pool.join(); assert_eq!(1, handler.total()); } #[test] fn test_multi_workers() { let schema_config = init(); let queue_name = "test_multi_workers"; let queue = dbq::Queue::new(schema_config, queue_name.to_string()); let conn = db_conn(); queue.clear(&conn).unwrap(); let _job_id = enqueue_test_job(&queue, 1, &conn); drop(conn); let connect_params = db_connect_params(); let config = dbq::WorkerPoolConfig::new(queue, connect_params, UnitHandler {}).unwrap(); let pool = dbq::WorkerPool::start(config); thread::sleep(Duration::new(0, 20_000_000)); pool.join(); } #[test] fn test_job_failure() { let schema_config = init(); let queue_name = "test_job_failure"; let queue = dbq::Queue::new(schema_config, queue_name.to_string()); let max_attempts = 3; let conn = db_conn(); queue.clear(&conn).unwrap(); let job_id = enqueue_test_job(&queue, max_attempts, &conn); drop(conn); let connect_params = db_connect_params(); let mut config = dbq::WorkerPoolConfig::new(queue.clone(), connect_params, ErrHandler {}).unwrap(); config.set_num_workers(1); let pool = dbq::WorkerPool::start(config); // sleep longer because of backoff thread::sleep(Duration::new(2, 0)); pool.join(); let conn = db_conn(); let job = queue .lookup_in_dead_letters(job_id, &conn) .unwrap() .unwrap(); assert_eq!(max_attempts, job.error_count) } #[test] fn test_panic() { let schema_config = init(); let queue_name = "test_panic"; let queue = dbq::Queue::new(schema_config, queue_name.to_string()); let conn = db_conn(); queue.clear(&conn).unwrap(); let job_id = enqueue_test_job(&queue, 1, &conn); drop(conn); let connect_params = db_connect_params(); let mut config = dbq::WorkerPoolConfig::new(queue.clone(), connect_params, PanicHandler {}) .unwrap(); config.set_num_workers(1); let pool = dbq::WorkerPool::start(config); thread::sleep(Duration::new(0, 20_000_000)); pool.join(); let conn = db_conn(); let job = queue.lookup_in_queue(job_id, &conn).unwrap().unwrap(); // the error was not recorded due to the panic assert_eq!(0, job.error_count) }