use std::iter; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Barrier}; use std::thread::{sleep, spawn, JoinHandle}; use std::time::Duration; use win_events::{wait_all, wait_first, AutoResetEvent, Event}; #[test] fn ae_test_init_false() { let event = AutoResetEvent::new(false); assert!(!event.is_set()); } #[test] fn ae_test_init_true() { let event = AutoResetEvent::new(true); assert!(event.is_set()); } #[test] fn ae_test_set_event() { let event = AutoResetEvent::new(false); event.set(); assert!(event.is_set()); } #[test] fn ae_test_clear_event() { let event = AutoResetEvent::new(true); event.clear(); assert!(!event.is_set()); } #[test] fn ae_test_thread_wait() { let event = AutoResetEvent::new(false); let event2 = event.clone(); let worker = spawn(move || { sleep(Duration::from_millis(3)); event2.set(); }); let wait_result = event.wait(Duration::from_millis(100)).unwrap(); assert!(wait_result && !event.is_set()); worker.join().unwrap(); } #[test] fn ae_test_wait_already_set() { let event = AutoResetEvent::new(true); assert!(event.wait(Duration::from_millis(100)).unwrap()) } #[test] fn ae_test_wait_timeout() { let event = AutoResetEvent::new(false); assert!(!event.wait(Duration::from_millis(0)).unwrap()) } #[test] fn ae_test_wait_first() { let set_event0 = AutoResetEvent::new(false); let set_event1 = AutoResetEvent::new(false); let set_event2 = AutoResetEvent::new(false); let inner_set = set_event2.clone(); let worker = spawn(move || { sleep(Duration::from_millis(10)); inner_set.set() }); let wait_result = wait_first( vec![&set_event0, &set_event1, &set_event2], Duration::from_millis(100), ) .unwrap(); assert!(2 == wait_result && !set_event2.is_set()); worker.join().unwrap(); } #[test] fn ae_test_wait_first_already_set() { let set_event0 = AutoResetEvent::new(false); let set_event1 = AutoResetEvent::new(false); let set_event2 = AutoResetEvent::new(true); let wait_result = wait_first( vec![&set_event0, &set_event1, &set_event2], Duration::from_millis(10), ) .unwrap(); assert!(2 == wait_result && !set_event2.is_set()); } #[test] fn ae_test_wait_first_timeout() { let set_event0 = AutoResetEvent::new(false); let set_event1 = AutoResetEvent::new(false); let set_event2 = AutoResetEvent::new(false); let wait_result = wait_first( vec![&set_event0, &set_event1, &set_event2], Duration::from_millis(3), ) .unwrap(); assert_eq!(-1, wait_result); } #[test] fn ae_test_wait_first_toggled_timeout() { let set_event0 = AutoResetEvent::new(false); let set_event1 = AutoResetEvent::new(false); let set_event2 = AutoResetEvent::new(true); let inner_set0 = set_event0.clone(); let inner_set2 = set_event2.clone(); let worker = spawn(move || { inner_set0.clear(); inner_set2.set(); }); let wait_result = wait_all( vec![&set_event0, &set_event1, &set_event2], Duration::from_millis(3), ) .unwrap(); assert_eq!(false, wait_result); worker.join().unwrap(); } #[test] fn ae_test_multiple_waiters_one() { let set_event0 = AutoResetEvent::new(false); let set_event1 = AutoResetEvent::new(false); let set_event2 = AutoResetEvent::new(true); let values = [1, 2, 3]; let workers: Vec> = values .iter() .map(|e| { let inner0 = set_event0.clone(); let inner1 = set_event1.clone(); let inner2 = set_event2.clone(); let timeout = *e; spawn(move || { let result = wait_first( vec![&inner0, &inner1, &inner2], Duration::from_millis(timeout), ); match result { Ok(wait_result) => wait_result, Err(_) => -10, } }) }) .collect(); let results = workers.into_iter().map(|e| { let value = e.join().unwrap(); print!("{}", value); value }); assert_eq!(0, results.sum()) } #[test] fn ae_test_wait_all() { let set_event0 = AutoResetEvent::new(false); let set_event1 = AutoResetEvent::new(false); let set_event2 = AutoResetEvent::new(false); let inner_set0 = set_event0.clone(); let inner_set1 = set_event1.clone(); let inner_set2 = set_event2.clone(); let worker = spawn(move || { sleep(Duration::from_millis(10)); inner_set0.set(); inner_set1.set(); inner_set2.set(); }); let wait_result = wait_all( vec![&set_event0, &set_event1, &set_event2], Duration::from_millis(100), ) .unwrap(); let events = vec![set_event0, set_event1, set_event2]; assert!(wait_result && events.iter().all(|e| !e.is_set())); worker.join().unwrap(); } #[test] fn ae_test_wait_all_already_set() { let set_event0 = AutoResetEvent::new(true); let set_event1 = AutoResetEvent::new(true); let set_event2 = AutoResetEvent::new(true); let wait_result = wait_all( vec![&set_event0, &set_event1, &set_event2], Duration::from_millis(10), ) .unwrap(); let events = vec![set_event0, set_event1, set_event2]; assert!(wait_result && events.iter().all(|e| !e.is_set())); } #[test] fn ae_test_wait_all_partial_set() { let set_event0 = AutoResetEvent::new(true); let set_event1 = AutoResetEvent::new(true); let set_event2 = AutoResetEvent::new(false); let inner_set = set_event2.clone(); let worker = spawn(move || { sleep(Duration::from_millis(10)); inner_set.set(); }); let wait_result = wait_first( vec![&set_event0, &set_event1, &set_event2], Duration::from_millis(100), ) .unwrap(); assert!( 0 == wait_result && !set_event0.is_set() && set_event1.is_set() && !set_event2.is_set() ); worker.join().unwrap(); } #[test] fn ae_test_wait_all_timeout() { let set_event0 = AutoResetEvent::new(false); let set_event1 = AutoResetEvent::new(false); let set_event2 = AutoResetEvent::new(false); let wait_result = wait_all( vec![&set_event0, &set_event1, &set_event2], Duration::from_millis(1), ) .unwrap(); assert_eq!(false, wait_result); } #[test] fn ae_test_wait_all_toggled_initialized_timeout() { let set_event0 = AutoResetEvent::new(true); let set_event1 = AutoResetEvent::new(true); let set_event2 = AutoResetEvent::new(false); set_event0.clear(); set_event2.set(); let wait_result = wait_all( vec![&set_event0, &set_event1, &set_event2], Duration::from_millis(3), ) .unwrap(); assert_eq!(false, wait_result); } #[test] fn ae_test_wait_all_toggled_timeout() { let set_event0 = AutoResetEvent::new(true); let set_event1 = AutoResetEvent::new(true); let set_event2 = AutoResetEvent::new(false); let inner_set0 = set_event0.clone(); let inner_set2 = set_event2.clone(); let worker = spawn(move || { inner_set0.clear(); inner_set2.set(); }); let wait_result = wait_all( vec![&set_event0, &set_event1, &set_event2], Duration::from_millis(3), ) .unwrap(); assert_eq!(false, wait_result); worker.join().unwrap(); } #[test] fn ae_test_multiple_waiters_all() { let set_event0 = AutoResetEvent::new(true); let set_event1 = AutoResetEvent::new(true); let set_event2 = AutoResetEvent::new(true); let values = [1, 2, 3]; let workers: Vec> = values .iter() .map(|e| { let inner0 = set_event0.clone(); let inner1 = set_event1.clone(); let inner2 = set_event2.clone(); let timeout = *e; spawn(move || { let result = wait_all( vec![&inner0, &inner1, &inner2], Duration::from_millis(timeout), ); match result { Ok(wait_result) => wait_result.into(), Err(_) => -10, } }) }) .collect(); let results = workers.into_iter().map(|e| { let value = e.join().unwrap(); print!("{}", value); value }); assert_eq!(1, results.sum()) } #[test] fn ae_test_multiple_ordered_lock_all() { let len = 4; let mut events = { let mut e: Vec<_> = iter::repeat_with(|| AutoResetEvent::new(false)) .take(len) .collect(); e.sort_by_key(|e| Arc::as_ptr(e.handle())); e }; let event0 = events.pop().unwrap(); let event1_no_lock = events.pop().unwrap(); let event2 = events.pop().unwrap(); let event3 = events.pop().unwrap(); let cases = vec![ vec![ event1_no_lock.clone(), event3.clone(), event2.clone(), event0.clone(), ], vec![event3.clone(), event2.clone(), event0.clone()], vec![event2.clone(), event3.clone(), event0.clone()], vec![event0.clone(), event2.clone(), event3.clone()], vec![event0.clone(), event3.clone(), event2.clone()], ]; let workers = cases.len(); let total_threads = workers + 1; let mut handles = Vec::with_capacity(cases.len()); let sync = Arc::new((Barrier::new(total_threads), AtomicBool::new(false))); let attempts = 50; for case in cases { let inner_sync = Arc::clone(&sync); let mut results = Vec::with_capacity(attempts); let handle = spawn(move || loop { let (barrier, stop) = &*inner_sync; // Sync the main thread with the wait threads barrier.wait(); if stop.load(Ordering::Relaxed) { return results; } // wait on the events, let events = case.iter().map(|e| e as &dyn Event).collect(); // wait on the events, with a sleep in the main thread of 1ns, I need at least let wait_result = match wait_all(events, Duration::from_millis(20)) { Ok(wait_result) => Some(wait_result), Err(_) => None, }; results.push(wait_result); }); handles.push(handle); } let (barrier, stop) = &*sync; let wait = Duration::from_nanos(1); // The +1 thread will set the events, all waiters should continue or timeout. for _ in 0..attempts { barrier.wait(); assert!(vec![&event0, &event2, &event3].iter().all(|e| !e.is_set())); sleep(wait); event0.set(); event2.set(); event3.set(); } stop.store(true, Ordering::Relaxed); barrier.wait(); let mut results = Vec::with_capacity(attempts); for handle in handles { let wait_result = handle.join().unwrap(); results.push(wait_result); } // Make sure all have the same amount of results let len = results[0].len(); assert!(results .iter() .all(|worker_results| worker_results.len() == len)); // the first wait thread response should all have timed out assert!(results[0] .iter() .all(|wait_result| *wait_result == Some(false))); for attempt in 0..attempts { let mut attempt_results = Vec::with_capacity(workers); for worker in 0..workers { let result = results[worker][attempt].unwrap().into(); attempt_results.push(result) } // for each attempt, only one should have consumed the event assert_eq!(1, attempt_results.iter().sum()) } }