#![cfg_attr(feature = "nightly", feature(duration_span))] extern crate disqrust; extern crate disque; extern crate redis; use std::time::Duration; use std::sync::mpsc::{channel, Sender}; use disque::Disque; use disqrust::{EventLoop, Handler, JobStatus}; use redis::Value; enum HandlerCall { Job(Vec, String, Vec), Error(Vec, String, u32, u32), } impl HandlerCall { fn body(&self) -> Vec { match *self { HandlerCall::Job(_, _, ref body) => body.clone(), HandlerCall::Error(_, _, _, _) => panic!("getting body for error"), } } fn nack_additional_deliveries(&self) -> (u32, u32) { match *self { HandlerCall::Job(_, _, _) => panic!("getting nack for job"), HandlerCall::Error(_, _, a, b) => (a, b), } } } #[derive(Clone)] struct MyHandler { sender: Sender, process_job_ret: JobStatus, process_error_ret: bool, sleep: Option, panic: bool, } impl MyHandler { fn new(sender: Sender, process_job_ret: JobStatus, process_error_ret: bool) -> MyHandler { MyHandler { sender: sender, process_job_ret: process_job_ret, process_error_ret: process_error_ret, sleep: None, panic: false, } } #[allow(dead_code)] fn set_sleep(&mut self, sleep: Option) { self.sleep = sleep; } #[allow(dead_code)] fn should_panic(&mut self, panic: bool) { self.panic = panic; } } impl Handler for MyHandler { fn process_job(&self, queue_name: &[u8], jobid: &String, body: Vec ) -> JobStatus { if let Some(sleep) = self.sleep { std::thread::sleep_ms(sleep); } if self.panic { panic!("panic on demand"); } self.sender.send(HandlerCall::Job(queue_name.to_vec(), jobid.clone(), body)).unwrap(); self.process_job_ret.clone() } fn process_error(&self, queue_name: &[u8], jobid: &String, nack: u32, additional_deliveries: u32) -> bool { self.sender.send(HandlerCall::Error(queue_name.to_vec(), jobid.clone(), nack, additional_deliveries)).unwrap(); self.process_error_ret } } #[cfg(test)] fn create_job(queue: &[u8], job: &[u8], nack: bool ) -> (Disque, Vec, Vec, String) { let disque = Disque::open("redis://127.0.0.1:7711/").unwrap(); let jobid = disque.addjob(queue, job, Duration::from_secs(10), None, None, Some(Duration::from_secs(1)), None, None, false).unwrap(); if nack { disque.getjob(true, None, &[queue]).unwrap(); disque.nackjob(jobid.as_bytes()).unwrap(); } (disque, queue.to_vec(), job.to_vec(), jobid) } #[test] fn basic() { let (disque, queue, job, _) = create_job(b"basic", b"job67", false); let (tx, rx) = channel(); let handler = MyHandler::new(tx, JobStatus::AckJob, true); let mut el = EventLoop::new(disque, 1, handler); el.watch_queue(queue.to_vec()); el.run_times(1); el.stop(); assert_eq!(rx.try_recv().unwrap().body(), job); assert!(rx.try_recv().is_err()); } #[test] fn error() { let (disque, queue, _, jobid) = create_job(b"error", b"job456", true); let (tx, rx) = channel(); let handler = MyHandler::new(tx, JobStatus::AckJob, false); let mut el = EventLoop::new(disque, 1, handler); el.watch_queue(queue.to_vec()); el.run_times(1); el.stop(); assert_eq!(rx.try_recv().unwrap().nack_additional_deliveries(), (1, 0)); assert!(rx.try_recv().is_err()); let disque = Disque::open("redis://127.0.0.1:7711/").unwrap(); disque.getjob(true, None, &[&*queue]).unwrap(); disque.ackjob(jobid.as_bytes()).unwrap(); } #[test] fn error_and_job() { let (disque, queue, job, _) = create_job(b"errorandjob", b"job123", true); let (tx, rx) = channel(); let handler = MyHandler::new(tx, JobStatus::AckJob, true); let mut el = EventLoop::new(disque, 1, handler); el.watch_queue(queue.to_vec()); el.run_times(1); el.stop(); assert_eq!(rx.try_recv().unwrap().nack_additional_deliveries(), (1, 0)); assert_eq!(rx.try_recv().unwrap().body(), job); assert!(rx.try_recv().is_err()); } #[test] fn fastack() { let (disque, queue, _, jobid) = create_job(b"fastack", b"job123", false); assert!(disque.show(jobid.as_bytes()).unwrap().is_some()); let (tx, rx) = channel(); let handler = MyHandler::new(tx, JobStatus::FastAck, true); let mut el = EventLoop::new(disque, 1, handler); el.watch_queue(queue.to_vec()); el.run_times(1); el.stop(); rx.try_recv().unwrap(); let disque = Disque::open("redis://127.0.0.1:7711/").unwrap(); assert!(disque.show(jobid.as_bytes()).unwrap().is_none()); } #[test] fn ackjob() { let (disque, queue, _, jobid) = create_job(b"ackjob", b"job123", false); assert!(disque.show(jobid.as_bytes()).unwrap().is_some()); let (tx, rx) = channel(); let handler = MyHandler::new(tx, JobStatus::AckJob, true); let mut el = EventLoop::new(disque, 1, handler); el.watch_queue(queue.to_vec()); el.run_times(1); el.stop(); rx.try_recv().unwrap(); let disque = Disque::open("redis://127.0.0.1:7711/").unwrap(); assert!(disque.show(jobid.as_bytes()).unwrap().is_none()); } #[test] fn nack() { let (disque, queue, _, jobid) = create_job(b"nack", b"job000", false); let (tx, rx) = channel(); let handler = MyHandler::new(tx, JobStatus::NAck, true); let mut el = EventLoop::new(disque, 1, handler); el.watch_queue(queue.to_vec()); el.run_times(3); el.stop(); rx.try_recv().unwrap(); rx.try_recv().unwrap(); rx.try_recv().unwrap(); let disque = Disque::open("redis://127.0.0.1:7711/").unwrap(); assert_eq!(*disque.show(jobid.as_bytes()).unwrap().unwrap().get( "nacks").unwrap(), Value::Int(3)); disque.ackjob(jobid.as_bytes()).unwrap(); } #[test] fn jobcount_current_node() { let (disque, queue, _, _) = create_job(b"ackjob", b"job123", false); let (tx, rx) = channel(); let handler = MyHandler::new(tx, JobStatus::AckJob, true); let mut el = EventLoop::new(disque, 1, handler); el.watch_queue(queue.to_vec()); el.run_times(1); assert!(el.jobcount_current_node() >= 1); el.stop(); rx.try_recv().unwrap(); } #[test] fn change_servers() { let disque = Disque::open("redis://127.0.0.1:7711/").unwrap(); // This test does not apply when there is only one server let hello = disque.hello().unwrap(); if hello.2.len() == 1 { return; } let disque2 = Disque::open(&*format!("redis://{}:{}/", hello.2[1].1, hello.2[1].2)).unwrap(); let (tx, rx) = channel(); let handler = MyHandler::new(tx, JobStatus::AckJob, true); let mut el = EventLoop::new(disque2, 1, handler); let oldid = el.current_node_id(); let queue = b"change_servers"; let job = b"job"; el.watch_queue(queue.to_vec()); let att = 20; for _ in 0..att { disque.addjob(queue, job, Duration::from_secs(10), None, None, None, None, None, false).unwrap(); el.run_times(1); el.do_cycle(); let newid = el.current_node_id(); if newid != oldid { rx.try_recv().unwrap(); return; } } panic!("After {} attempts it did not change node", att); } #[cfg(feature = "nightly")] #[test] fn queueing() { let disque = Disque::open("redis://127.0.0.1:7711/").unwrap(); // This test does not apply when there is only one server let hello = disque.hello().unwrap(); if hello.2.len() == 1 { return; } let disque2 = Disque::open(&*format!("redis://{}:{}/", hello.2[1].1, hello.2[1].2)).unwrap(); let (tx, rx) = channel(); let mut handler = MyHandler::new(tx, JobStatus::AckJob, true); handler.set_sleep(Some(200)); let mut el = EventLoop::new(disque2, 1, handler); let queue = b"queueing"; el.watch_queue(queue.to_vec()); let d = Duration::span(|| { disque.addjob(queue, b"job1", Duration::from_secs(10), None, None, None, None, None, false).unwrap(); disque.addjob(queue, b"job2", Duration::from_secs(10), None, None, None, None, None, false).unwrap(); disque.addjob(queue, b"job3", Duration::from_secs(10), None, None, None, None, None, false).unwrap(); el.run_times(3); el.stop(); }); assert_eq!(rx.try_recv().unwrap().body(), b"job1"); assert_eq!(rx.try_recv().unwrap().body(), b"job2"); assert_eq!(rx.try_recv().unwrap().body(), b"job3"); assert_eq!(d.as_secs(), 0); assert!(d.subsec_nanos() >= 600_000_000); assert!(d.subsec_nanos() <= 999_999_999); } #[cfg(feature = "nightly")] #[test] fn panic_recover() { let (disque, queue, _, jobid) = create_job(b"panic_recover", b"job451", false); let (tx, rx) = channel(); let mut handler = MyHandler::new(tx, JobStatus::AckJob, true); handler.should_panic(true); let mut el = EventLoop::new(disque, 1, handler); el.watch_queue(queue.to_vec()); el.run_times(3); el.stop(); assert_eq!(rx.try_recv().unwrap().nack_additional_deliveries(), (0, 1)); assert_eq!(rx.try_recv().unwrap().nack_additional_deliveries(), (0, 2)); assert!(rx.try_recv().is_err()); let disque = Disque::open("redis://127.0.0.1:7711/").unwrap(); disque.ackjob(jobid.as_bytes()).unwrap(); }