use simple_redis; use simple_redis::{Interrupts, Message}; use std::{thread, time}; #[test] fn pub_sub() { match simple_redis::create("redis://127.0.0.1:6379/") { Ok(mut subscriber) => { assert!(!subscriber.is_connection_open()); let mut result = subscriber.subscribe("int_pub_sub"); assert!(result.is_ok()); thread::spawn(|| { thread::sleep(time::Duration::from_secs(2)); match simple_redis::create("redis://127.0.0.1:6379/") { Ok(mut publisher) => { assert!(!publisher.is_connection_open()); let result = publisher.publish("int_pub_sub", "test pub_sub message"); assert!(result.is_ok()); assert!(publisher.is_connection_open()); } _ => panic!("test error"), }; }); subscriber .fetch_messages( &mut |message: Message| -> bool { let payload: String = message.get_payload().unwrap(); assert_eq!(payload, "test pub_sub message"); true }, &mut || -> Interrupts { Interrupts::new() }, ) .unwrap(); result = subscriber.subscribe("int_pub_sub2"); assert!(result.is_ok()); result = subscriber.unsubscribe("int_pub_sub"); assert!(result.is_ok()); thread::spawn(|| { thread::sleep(time::Duration::from_secs(2)); match simple_redis::create("redis://127.0.0.1:6379/") { Ok(mut publisher) => { assert!(!publisher.is_connection_open()); let mut result = publisher.publish("int_pub_sub", "bad"); assert!(result.is_ok()); assert!(publisher.is_connection_open()); thread::sleep(time::Duration::from_secs(1)); result = publisher.publish("int_pub_sub2", "good"); assert!(result.is_ok()); } _ => panic!("test error"), }; }); subscriber .fetch_messages( &mut |message: Message| -> bool { let payload: String = message.get_payload().unwrap(); assert_eq!(payload, "good"); true }, &mut || -> Interrupts { Interrupts::new() }, ) .unwrap(); thread::spawn(|| { thread::sleep(time::Duration::from_secs(2)); match simple_redis::create("redis://127.0.0.1:6379/") { Ok(mut publisher) => { assert!(!publisher.is_connection_open()); let mut result = publisher.publish("int_pub_sub", "bad"); assert!(result.is_ok()); assert!(publisher.is_connection_open()); thread::sleep(time::Duration::from_secs(1)); result = publisher.publish("int_pub_sub2", "good"); assert!(result.is_ok()); } _ => panic!("test error"), }; }); let mut counter = 0; subscriber .fetch_messages( &mut |_message: Message| -> bool { panic!("test error"); }, &mut || -> Interrupts { counter = counter + 1; let mut interrupts = Interrupts::new(); interrupts.stop = counter == 5; interrupts.next_polling_time = Some(10); interrupts }, ) .unwrap(); } _ => panic!("test error"), }; } #[test] fn pub_psub() { match simple_redis::create("redis://127.0.0.1:6379/") { Ok(mut subscriber) => { assert!(!subscriber.is_connection_open()); let mut result = subscriber.psubscribe("int_pub_psub::*"); assert!(result.is_ok()); thread::spawn(|| { thread::sleep(time::Duration::from_secs(2)); match simple_redis::create("redis://127.0.0.1:6379/") { Ok(mut publisher) => { assert!(!publisher.is_connection_open()); let result = publisher.publish("int_pub_psub::123", "test pub_sub message"); assert!(result.is_ok()); assert!(publisher.is_connection_open()); } _ => panic!("test error"), }; }); subscriber .fetch_messages( &mut |message: Message| -> bool { let payload: String = message.get_payload().unwrap(); assert_eq!(payload, "test pub_sub message"); true }, &mut || -> Interrupts { Interrupts::new() }, ) .unwrap(); result = subscriber.psubscribe("int_pub_psub2::*"); assert!(result.is_ok()); result = subscriber.punsubscribe("int_pub_psub::*"); assert!(result.is_ok()); thread::spawn(|| { thread::sleep(time::Duration::from_secs(2)); match simple_redis::create("redis://127.0.0.1:6379/") { Ok(mut publisher) => { assert!(!publisher.is_connection_open()); let mut result = publisher.publish("int_pub_psub::123", "bad"); assert!(result.is_ok()); assert!(publisher.is_connection_open()); thread::sleep(time::Duration::from_secs(1)); result = publisher.publish("int_pub_psub2::123", "good"); assert!(result.is_ok()); } _ => panic!("test error"), }; }); subscriber .fetch_messages( &mut |message: Message| -> bool { let payload: String = message.get_payload().unwrap(); assert_eq!(payload, "good"); true }, &mut || -> Interrupts { Interrupts::new() }, ) .unwrap(); } _ => panic!("test error"), }; }