extern crate asynchronous; use std::sync::{Arc, Mutex}; use std::thread; use asynchronous::*; #[test] fn promises() { for x in 0..10 { let promise = Promise::::new(move || { match x { 0 => Err("Division by zero"), _ => Ok(x * 2) } }).success(move |res| { assert_eq!(res, x * 2); Ok(res * 2) }).fail(|error| { assert_eq!(error, "Division by zero"); Err(error) }) ; let result = promise.sync(); match x { 0 => assert!(result.is_err()), _ => { assert!(result.is_ok()); assert_eq!(result.unwrap(), x * 4); } } } } #[test] fn promises_parallel() { let promise1 = Promise::::new(|| { Ok(1u32) }); let promise2 = Promise::::new(|| { Ok(2u32) }); let promise3 = Promise::::new(|| { Ok(3u32) }); let promise4 = Promise::::new(|| { Err("Error") }); let promise5 = Promise::::new(|| { Ok(5u32) }); Promise::all(vec![promise1, promise2, promise3]).finally_sync(|res| { assert_eq!(res.unwrap(), vec![1u32,2u32,3u32]); }); Promise::all(vec![promise4, promise5]).finally_sync(|res| { match res { Ok(_) => unreachable!(), Err(err) => { assert!(err[0].is_err()); assert!(err[1].is_ok()); } } }); } #[test] fn deferred() { for x in 0..10 { let deferred = Deferred::::new(move || { match x { 0 => Err("Division by zero"), _ => Ok(x * 2) } }).success(move |res| { assert_eq!(res, x * 2); Ok(res * 2) }).fail(|error| { assert_eq!(error, "Division by zero"); Err(error) }) ; let result = deferred.sync(); match x { 0 => assert!(result.is_err()), _ => { assert!(result.is_ok()); assert_eq!(result.unwrap(), x * 4); } } } } #[test] fn deferred_to_promise() { Deferred::::new(|| { Ok(88u32) }).to_promise().finally_sync(|r| { assert_eq!(r.unwrap(), 88u32); }); } #[test] fn deferred_in_series() { let st = Arc::new(Mutex::new(String::new())); let lock1 = st.clone(); let d1 = Deferred::::new(move ||{ thread::sleep_ms(200); lock1.lock().unwrap().push_str("Def1"); Ok(1u32) }); let lock2 = st.clone(); let d2 = Deferred::::new(move || { thread::sleep_ms(100); lock2.lock().unwrap().push_str("Def2"); Ok(2u32) }); let lock3 = st.clone(); let d3 = Deferred::::new(move ||{ thread::sleep_ms(200); lock3.lock().unwrap().push_str("Def3"); Ok(3u32) }); let d4 = Deferred::::new(|| { Ok(4u32) }).chain(|res| { Ok(res.unwrap() * 3) }); let d5 = Deferred::::new(|| { Err("Error") }); let d6 = Deferred::::new(|| { Ok(6u32) }); let r = Deferred::vec_to_promise(vec![d1, d2, d3], ControlFlow::Series) .success(|res| { assert_eq!(vec![1u32,2u32, 3u32], res); Ok(0u32) }).sync(); assert_eq!(r, Ok(0u32)); assert_eq!(*st.lock().unwrap(),"Def1Def2Def3"); Deferred::vec_to_promise(vec![d4,d5,d6], ControlFlow::Series) .finally_sync(|res| { match res { Ok(r) => unreachable!("Res: {:?}", r), Err(errors) => { assert_eq!(errors.len(), 2); assert_eq!(errors[0], Ok(12u32)); assert_eq!(errors[1], Err("Error")); } } }); } #[test] fn deferred_in_parallel() { let st = Arc::new(Mutex::new(String::new())); let lock1 = st.clone(); let d1 = Deferred::::new(move ||{ thread::sleep_ms(200); lock1.lock().unwrap().push_str("Def1"); Ok(1u32) }); let lock2 = st.clone(); let d2 = Deferred::::new(move || { thread::sleep_ms(300); lock2.lock().unwrap().push_str("Def2"); Ok(2u32) }); let lock3 = st.clone(); let d3 = Deferred::::new(move ||{ thread::sleep_ms(50); lock3.lock().unwrap().push_str("Def3"); Ok(3u32) }); let d4 = Deferred::::new(|| { Ok(4u32) }); let d5 = Deferred::::new(|| { Err("Error") }); let d6 = Deferred::::new(|| { Ok(6u32) }); let r = Deferred::vec_to_promise(vec![d1, d2, d3], ControlFlow::Parallel) .success(|res| { assert_eq!(vec![1u32,2u32, 3u32], res); Ok(0u32) }).sync(); assert_eq!(r, Ok(0u32)); assert_eq!(*st.lock().unwrap(),"Def3Def1Def2"); Deferred::vec_to_promise(vec![d4,d5,d6], ControlFlow::Parallel) .finally_sync(|res| { match res { Ok(r) => unreachable!("Res: {:?}", r), Err(errors) => { assert_eq!(errors.len(), 3); assert_eq!(errors[0], Ok(4u32)); assert_eq!(errors[1], Err("Error")); assert_eq!(errors[2], Ok(6u32)); } } }); } #[test] fn deferred_in_parallel_limit() { let st = Arc::new(Mutex::new(String::new())); let lock1 = st.clone(); let d1 = Deferred::::new(move ||{ thread::sleep_ms(150); lock1.lock().unwrap().push_str("Def1"); Ok(1u32) }); let lock2 = st.clone(); let d2 = Deferred::::new(move || { thread::sleep_ms(300); lock2.lock().unwrap().push_str("Def2"); Ok(2u32) }); let lock3 = st.clone(); let d3 = Deferred::::new(move ||{ thread::sleep_ms(50); lock3.lock().unwrap().push_str("Def3"); Ok(3u32) }); let lock4 = st.clone(); let d4 = Deferred::::new(move || { thread::sleep_ms(200); lock4.lock().unwrap().push_str("Def4"); Ok(4u32) }); let d5 = Deferred::::new(|| { Ok(5u32) }); let d6 = Deferred::::new(|| { Err("Error d") }); let r = Deferred::vec_to_promise(vec![d1, d2, d3, d4], ControlFlow::ParallelLimit(2)) .success(|res| { assert_eq!(vec![1u32,2u32, 3u32,4u32], res); Ok(0u32) }).sync(); assert_eq!(r, Ok(0u32)); assert_eq!(*st.lock().unwrap(),"Def1Def3Def2Def4"); Deferred::vec_to_promise(vec![d5,d6], ControlFlow::ParallelLimit(1)) .finally_sync(|res| { match res { Ok(r) => unreachable!("Res: {:?}", r), Err(errors) => { assert_eq!(errors.len(), 2); assert_eq!(errors[0], Ok(5u32)); assert_eq!(errors[1], Err("Error d")); } } }); } #[test] fn deferred_in_parallel_limit_cpus() { let mut vec = Vec::new(); for i in 1..5 { vec.push(Deferred::::new(move ||{ Ok(i) })); } Deferred::vec_to_promise(vec, ControlFlow::ParallelCPUS) .finally_sync(|res| { assert_eq!(res.unwrap(), vec![1u32, 2u32, 3u32, 4u32]); }); } #[test] fn deferred_first_wait() { let mut v = vec![]; for i in 0..20 { v.push(Deferred::::new(move ||{ Ok(i) })); } let _ = Deferred::first_to_promise(2, true, v, ControlFlow::Series) .finally_sync(|res| { assert_eq!(res.unwrap().len(), 2); }); let mut v = vec![]; for i in 0..20 { v.push(Deferred::::new(move ||{ Ok(i) })); } let _ = Deferred::first_to_promise(5, true, v, ControlFlow::ParallelLimit(3)) .finally_sync(|res| { let r = res.unwrap(); assert!(r.len()>=5 && r.len()<=7); }); let mut v = vec![]; for i in 0..20 { v.push(Deferred::::new(move ||{ Ok(i) })); } let _ = Deferred::first_to_promise(5, true, v, ControlFlow::Parallel) .finally_sync(|res| { assert_eq!(res.unwrap().len(), 20); }); let mut v = vec![]; for i in 0..5 { v.push(Deferred::::new(move ||{ Ok(i) })); } v.push(Deferred::::new(move ||{ Err("Error in the middle") })); for i in 6..20 { v.push(Deferred::::new(move ||{ Ok(i) })); } let _ = Deferred::first_to_promise(10, true, v, ControlFlow::Parallel) .finally_sync(|res| { assert_eq!(res.unwrap().len(), 19); }); let mut v = vec![]; for i in 0..5 { v.push(Deferred::::new(move ||{ Ok(i) })); } for _ in 5..10 { v.push(Deferred::::new(move ||{ Err("Error") })); } let _ = Deferred::first_to_promise(7, true, v, ControlFlow::ParallelLimit(3)) .finally_sync(|res| { assert_eq!(res.unwrap_err().len(), 10); }); let mut v = vec![]; for i in 0..5 { v.push(Deferred::::new(move ||{ Ok(i) })); } let _ = Deferred::first_to_promise(7, true, v, ControlFlow::ParallelLimit(3)) .finally_sync(|res| { assert_eq!(res.unwrap_err().len(), 5); }); } #[test] fn deferred_first_no_wait() { let mut v = vec![]; for i in 0..20 { v.push(Deferred::::new(move ||{ Ok(i) })); } let _ = Deferred::first_to_promise(2, false, v, ControlFlow::Series) .finally_sync(|res| { assert_eq!(res.unwrap().len(), 2); }); let mut v = vec![]; for i in 0..20 { v.push(Deferred::::new(move ||{ Ok(i) })); } let _ = Deferred::first_to_promise(5, false, v, ControlFlow::ParallelLimit(3)) .finally_sync(|res| { assert_eq!(res.unwrap().len(), 5); }); let mut v = vec![]; for i in 0..20 { v.push(Deferred::::new(move ||{ Ok(i) })); } let _ = Deferred::first_to_promise(5, false, v, ControlFlow::Parallel) .finally_sync(|res| { assert_eq!(res.unwrap().len(), 5); }); let mut v = vec![]; for i in 0..5 { v.push(Deferred::::new(move ||{ Ok(i) })); } v.push(Deferred::::new(move ||{ Err("Error in the middle") })); for i in 6..20 { v.push(Deferred::::new(move ||{ Ok(i) })); } let _ = Deferred::first_to_promise(10, false, v, ControlFlow::ParallelLimit(3)) .finally_sync(|res| { assert_eq!(res.unwrap().len(), 10); }); let mut v = vec![]; for i in 0..5 { v.push(Deferred::::new(move ||{ Ok(i) })); } for _ in 5..10 { v.push(Deferred::::new(move ||{ Err("Error") })); } let _ = Deferred::first_to_promise(7, false, v, ControlFlow::ParallelLimit(3)) .finally_sync(|res| { assert_eq!(res.unwrap_err().len(), 10); }); let mut v = vec![]; for i in 0..5 { v.push(Deferred::::new(move ||{ Ok(i) })); } let _ = Deferred::first_to_promise(7, false, v, ControlFlow::ParallelLimit(3)) .finally_sync(|res| { assert_eq!(res.unwrap_err().len(), 5); }); } #[test] fn deferred_chained() { Deferred::::new(||{ thread::sleep_ms(50); if true { Ok("first".to_string()) } else { Err("Nothing") } }).chain(|res| { let mut v = res.unwrap(); assert_eq!(v, "first"); thread::sleep_ms(50); if true { v.push_str("second"); Ok(v) } else { Err("Nothing") } }).finally_sync(|res| { assert_eq!(res.unwrap(), "firstsecond"); }); } #[test] fn nested_promises() { let res = Promise::<_,&str>::new(|| { // Do nothing Promise::new(|| { Promise::new(|| { Ok(4) }).success(|res| { Ok(res + 2) }).sync() }).success(|res| { Ok(res * 7) }).sync() }).success(|res| { Ok(res + 5) }).sync().unwrap(); assert_eq!(res, 47); } #[test] fn event_loop_1() { let event_loop = EventLoop::new(); assert_eq!(event_loop.emit("EventA"), Ok(())); assert_eq!(event_loop.emit("EventB"), Ok(())); assert_eq!(event_loop.emit("EventC"), Ok(())); let res = event_loop.finish().to_promise().sync().unwrap(); assert_eq!(res, vec!["EventA", "EventB", "EventC"]); } #[test] fn event_loop_2() { let event_loop = EventLoop::new().finish_in_ms(50); assert_eq!(event_loop.emit("EventA"), Ok(())); assert_eq!(event_loop.emit("EventB"), Ok(())); thread::sleep_ms(100); assert_eq!(event_loop.emit("EventC"), Err("EventC")); let res = event_loop.finish().to_promise().sync().unwrap(); assert_eq!(res, vec!["EventA", "EventB"]); } #[test] fn event_loop_3() { let event_loop = EventLoop::new(); assert_eq!(event_loop.emit("EventA"), Ok(())); assert_eq!(event_loop.emit("EventB"), Ok(())); let event_loop = event_loop.finish(); assert_eq!(event_loop.emit("EventC"), Err("EventC")); assert_eq!(event_loop.is_active(), false); let res = event_loop.to_promise().sync().unwrap(); assert_eq!(res, vec!["EventA", "EventB"]); } #[test] fn event_loop_4() { let event_loop = EventLoop::new().finish_in_ms(450); assert_eq!(event_loop.emit("EventA"), Ok(())); assert_eq!(event_loop.emit("EventB"), Ok(())); let x = Arc::new(Mutex::new(0)); event_loop.emit_until(move || { thread::sleep_ms(100); let mut lock_x = x.lock().unwrap(); *lock_x += 1; if *lock_x == 2 { return Emit::Continue } if *lock_x <= 5 { Emit::Event("EventC") } else { Emit::Stop } }); let res = event_loop.to_promise().sync().unwrap(); assert_eq!(res, vec!["EventA", "EventB", "EventC", "EventC", "EventC"]); } #[test] fn event_loop_on_1() { let v = Arc::new(Mutex::new(Vec::<&str>::new())); let v_cloned = v.clone(); let event_loop = EventLoop::on(move |event| { match event { "EventA" => { let mut v_lock = v_cloned.lock().unwrap(); v_lock.push("EventATreated"); }, "EventB" => (), _ => { let mut v_lock = v_cloned.lock().unwrap(); v_lock.push("EventOtherTreated"); }, } }); assert_eq!(event_loop.emit("EventA"), Ok(())); assert_eq!(event_loop.emit("EventB"), Ok(())); assert_eq!(event_loop.emit("EventC"), Ok(())); let res = event_loop.finish().to_promise().sync().unwrap(); assert_eq!(res.len(), 0); } #[test] fn event_loop_on_2() { enum Event { Hello(String), Goodbye(String) } let v = Arc::new(Mutex::new(Vec::::new())); let v_cloned = v.clone(); let event_loop = EventLoop::on(move |event| { match event { Event::Hello(_) => (), Event::Goodbye(v) => { let mut v_lock = v_cloned.lock().unwrap(); v_lock.push(v); }, } }).finish_in_ms(100); assert!(event_loop.emit(Event::Hello("World".to_string())).is_ok()); assert!(event_loop.emit(Event::Goodbye("BCN".to_string())).is_ok()); assert!(event_loop.emit(Event::Goodbye("MAD".to_string())).is_ok()); let res = event_loop.to_promise().sync().unwrap(); assert_eq!(res.len(), 0); } #[test] fn event_loop_on_managed_1() { let event_loop = EventLoop::on_managed(|event| { match event { "EventA" => Emit::Event("EventATreated"), "EventB" => Emit::Continue, "EventStop" => Emit::Stop, _ => Emit::Event("EventOtherTreated") } }); assert_eq!(event_loop.emit("EventA"), Ok(())); assert_eq!(event_loop.emit("EventB"), Ok(())); assert_eq!(event_loop.emit("EventC"), Ok(())); assert_eq!(event_loop.emit("EventStop"), Ok(())); thread::sleep_ms(75); assert_eq!(event_loop.emit("EventE"), Err("EventE")); let res = event_loop.finish().to_promise().sync().unwrap(); assert_eq!(res, vec!["EventATreated", "EventOtherTreated"]); } #[test] fn event_loop_on_managed_2() { enum Event { Hello(String), Goodbye(String) } let event_loop = EventLoop::on_managed(|event| { match event { Event::Hello(_) => Emit::Continue, Event::Goodbye(v) => Emit::Event(Event::Goodbye(v)), } }).finish_in_ms(100); assert!(event_loop.emit(Event::Hello("World".to_string())).is_ok()); assert!(event_loop.emit(Event::Goodbye("BCN".to_string())).is_ok()); assert!(event_loop.emit(Event::Goodbye("MAD".to_string())).is_ok()); let res = event_loop.to_promise().sync().unwrap(); assert_eq!(res.len(), 2); match res[0] { Event::Goodbye(ref v) => assert_eq!(v, "BCN") , _ => panic!() }; match res[1] { Event::Goodbye(ref v) => assert_eq!(v, "MAD") , _ => panic!() }; }