extern crate chrono; extern crate closer; extern crate env_logger; extern crate r2d2; extern crate r2d2_redis; extern crate resque; use std::thread; use std::sync::Arc; use std::time::Duration; use std::sync::atomic::{AtomicUsize, Ordering}; use r2d2::{Config, Pool, PooledConnection}; use r2d2_redis::RedisConnectionManager; use resque::{JobService, Result, ResultExt, WithRedis}; type RedisPool = Pool; type RedisConn = PooledConnection; // RUST_LOG=debug cargo run --example hello fn main() { env_logger::init().unwrap(); let closer = closer::Closer::init(); let with_redis = HelloWithRedis::new("redis://127.0.0.1:6379/0"); start_enqueue_thread(with_redis.0.clone()); let mut config = resque::Config::new("local", 2, with_redis, closer.closed()); config.set_watch_dead_thread(10); let job_service = Box::new(HelloJobService::new()); config.reg_job_service(job_service).unwrap(); resque::start(config).unwrap(); } fn start_enqueue_thread(redis_pool: RedisPool) { thread::spawn(move || { let redis_conn = redis_pool.get().unwrap(); loop { let now_i = chrono::Utc::now().timestamp(); let args = vec![]; resque::enqueue(&redis_conn, "hello", "HelloJob", &args).unwrap(); let args = vec!["Arg1".to_string()]; resque::enqueue(&redis_conn, "hello", "HelloJob", &args).unwrap(); let args = vec!["Arg1".to_string(), "Arg2".to_string()]; resque::enqueue(&redis_conn, "hello", "HelloJob", &args).unwrap(); let args = vec!["ScheduleArg1".to_string()]; resque::enqueue_at(&redis_conn, now_i + 1, "hello", "HelloJob", &args).unwrap(); let args = vec!["ScheduleArg1".to_string(), "ScheduleArg2".to_string()]; resque::enqueue_at(&redis_conn, now_i + 2, "hello", "HelloJob", &args).unwrap(); thread::sleep(Duration::from_secs(5)); } }); } #[derive(Clone)] struct HelloWithRedis(RedisPool); impl HelloWithRedis { fn new(redis_url: &str) -> HelloWithRedis { let manager = RedisConnectionManager::new(redis_url).unwrap(); let pool = Pool::new(Config::default(), manager).unwrap(); HelloWithRedis(pool) } } impl WithRedis for HelloWithRedis { type Conn = RedisConn; fn with_redis(&self, f: F) -> Result where F: FnOnce(&Self::Conn) -> Result, { let redis_conn = self.0.get().chain_err(|| "could not get redis conn")?; f(&redis_conn) } } #[derive(Clone)] struct HelloJobService(Arc); impl HelloJobService { fn new() -> HelloJobService { HelloJobService(Arc::new(AtomicUsize::new(1))) } } impl JobService for HelloJobService { fn run(&self, args: &[String]) -> Result<()> { self.0.fetch_add(1, Ordering::Relaxed); thread::sleep(Duration::from_secs(2)); let i = self.0.load(Ordering::Relaxed); println!("hello: {}, args: {:?}", i, args); Ok(()) } fn queue(&self) -> &str { "hello" } fn job_type(&self) -> &str { "HelloJob" } fn box_clone(&self) -> Box { Box::new((*self).clone()) } }