//! event use futures::stream::{StreamExt, TryStreamExt}; use futures::{executor, TryFutureExt}; use lol_io::const_uuid_wide; use lol_io::kernel::co; use lol_io::kernel::threading::{ event::{EventListener, InitialState, OwnedEventHandle, Reset}, pool::{ CompletionResult, Threadpool, ThreadpoolCallbackEnvironment, ThreadpoolTimer, ThreadpoolWork, }, timer::Timer, }; use lol_io::prelude::*; use lol_io::ArrayBuffer; use std::io; use std::os::windows::io::AsHandle; use std::path::PathBuf; use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::Duration; #[test] fn lol_test_threading_event() { const ID: *const u16 = lol_io::const_uuid_wide!(); std::thread::spawn(|| { let event = OwnedEventHandle::new_raw(ID, Reset::Automatic, InitialState::Unset).unwrap(); std::thread::sleep(Duration::from_millis(50)); event.set().unwrap(); }); let event = OwnedEventHandle::new_raw(ID, Reset::Automatic, InitialState::Unset).unwrap(); event.wait(Some(Duration::from_secs(8))).unwrap(); } #[test] fn lol_test_threading_async_event() { const EVENT_ID: *const u16 = const_uuid_wide!(); let waiter = OwnedEventHandle::new_raw(EVENT_ID, Reset::Manual, InitialState::Unset).unwrap(); let setter = OwnedEventHandle::new_raw(EVENT_ID, Reset::Manual, InitialState::Unset).unwrap(); let (_pool, env) = make_pool().unwrap(); let fut = EventListener::new(Some(&env)) .unwrap() .listen(waiter.borrow(), Some(Duration::from_millis(123))); std::thread::spawn(move || { std::thread::sleep(Duration::from_millis(5)); setter.set()?; io::Result::Ok(()) }); let result = executor::block_on(fut); assert!(result.is_ok()); } #[test] fn lol_test_threading_async_event_cancel() { const EVENT_ID: *const u16 = const_uuid_wide!(); let waiter = OwnedEventHandle::new_raw(EVENT_ID, Reset::Manual, InitialState::Unset).unwrap(); let (_pool, env) = make_pool().unwrap(); let fut = EventListener::new(Some(&env)) .unwrap() .listen(waiter, Some(Duration::from_millis(1000))); let abort = fut.abort_handle(); abort.abort("Testing cancel"); let result = executor::block_on(fut); assert!(result.is_err()); } #[test] fn lol_test_threading_async_event_cancel_ignored() { const EVENT_ID: *const u16 = const_uuid_wide!(); let waiter = OwnedEventHandle::new_raw(EVENT_ID, Reset::Manual, InitialState::Unset).unwrap(); let setter = OwnedEventHandle::new_raw(EVENT_ID, Reset::Manual, InitialState::Unset).unwrap(); let (_pool, env) = make_pool().unwrap(); let fut = EventListener::new(Some(&env)) .unwrap() .listen(waiter, Some(Duration::from_millis(1000))); let abort = fut.abort_handle(); // Trigger kernel event setter.set().unwrap(); // Let wait_callback have time from the kernel which will modify the state to Complete std::thread::sleep(Duration::from_millis(50)); // now that the state is complete, our cancel should be ignored abort.abort("Ignore me"); let result = executor::block_on(fut); assert!(result.is_ok()); } #[test] fn lol_test_threading_async_event_timeout() { const EVENT_ID: *const u16 = const_uuid_wide!(); let ev = OwnedEventHandle::new_raw(EVENT_ID, Reset::Manual, InitialState::Unset).unwrap(); let (_pool, env) = make_pool().unwrap(); let fut = EventListener::new(Some(&env)) .unwrap() .listen(ev, Some(Duration::from_millis(100))); let result = executor::block_on(fut); assert!(result.is_err()); } #[test] fn lol_test_threading_async_event_anon() { let owned = OwnedEventHandle::anonymous(Reset::Manual, InitialState::Unset).unwrap(); let listener = EventListener::new(None).unwrap(); let fut = listener.listen(owned.borrow(), Some(Duration::from_millis(123))); owned.set().unwrap(); let result = executor::block_on(fut); assert!(result.is_ok()); } #[test] fn lol_test_threading_timer() { let (_pool, env) = make_pool().unwrap(); let timers = ThreadpoolTimer::new(Some(&env), |_instance| { static COUNT: AtomicUsize = AtomicUsize::new(0); COUNT.fetch_add(1, Ordering::SeqCst) }) .unwrap(); let timeouts = executor::block_on(async { timers.set( Duration::from_millis(2), Duration::from_millis(2), Duration::from_millis(4000), ); timers .listen() .take(3) .map_err(|_| usize::max_value()) .collect::>() .await }); assert_eq!(vec![Ok(0), Ok(1), Ok(2)], timeouts); } #[test] fn lol_test_threading_worker_many() { let (_pool, env) = make_pool().unwrap(); let pool = ThreadpoolWork::new(Some(&env), |_instance| { static COUNT: AtomicUsize = AtomicUsize::new(0); std::thread::sleep(Duration::from_millis(50)); COUNT.fetch_add(1, Ordering::SeqCst) }) .unwrap(); let (a, b, c, finished) = executor::block_on(async { pool.submit(3); let a = pool .listen() .take(3) .map_err(|_e| usize::max_value()) .collect::>() .await; pool.submit(3); let b = pool .listen() .take(3) .map_err(|_e| usize::max_value()) .collect::>() .await; pool.submit(3); let c = pool .listen() .take(3) .map_err(|_e| usize::max_value()) .collect::>() .await; pool.done(); pool.submit(1); let finished = pool.listen().next().await; (a, b, c, finished) }); assert_eq!(vec![Ok(0), Ok(1), Ok(2)], a); assert_eq!(vec![Ok(3), Ok(4), Ok(5)], b); assert_eq!(vec![Ok(6), Ok(7), Ok(8)], c); assert!(finished.is_none()); } #[test] fn lol_test_threading_executor() { // create an interval timer to spawn on the executor let timeouts = Timer::new(None) .unwrap() .interval( Duration::from_millis(20), Duration::from_millis(20), Duration::from_millis(10), ) .take(3) .map_err(|_| usize::max_value()) .collect::>(); // Create a oneshot timer to spawn on the executor let timeout = Timer::new(None) .unwrap() .oneshot(Duration::from_millis(10), Duration::from_millis(10)) .map_err(|_| usize::max_value()); // Spawn a top level task let task = lol_io::executor::spawn(None, async move { // Spawn another task for oneshot timer let fut0 = lol_io::executor::spawn(None, async move { timeout.await })?; // Spawn a third task for the interval timer let fut1 = lol_io::executor::spawn(None, async move { timeouts.await })?; // Wait for the oneshot timer to complete let t0 = fut0.wait_join(None).await?; // Wait for the interval timer to complete let t1 = fut1.wait_join(None).await?; // measure some results in the main thread CompletionResult::Ok((t0, t1)) }) .unwrap(); // wait for the top level future to complete let (fut0, fut1) = task.block_join(None).unwrap().unwrap(); // Test the oneshot timer result assert_eq!(Ok(0), fut0); // Test the interval timer result assert_eq!(vec![Ok(0), Ok(1), Ok(2)], fut1); } #[test] pub fn lol_test_threading_fs() { let read_to_end = lol_io::executor::spawn(None, async { let stat = lol_io::fs::stat(mock_file("read_to_end.txt"))?.await?; let h = lol_io::fs::open(mock_file("read_to_end.txt"))?.await?; let buf = ArrayBuffer::new(stat.size + 1); let result = lol_io::fs::read_to_end(h.as_handle(), buf)?.await?; std::io::Result::Ok(result) }); let read_exact = lol_io::executor::spawn(None, async { let stat = lol_io::fs::stat(mock_file("read_exact.txt"))?.await?; let h = lol_io::fs::open(mock_file("read_exact.txt"))?.await?; let result = lol_io::fs::read_exact(h.as_handle(), ArrayBuffer::new(stat.size))?.await?; std::io::Result::Ok(result) }); let read_file = lol_io::executor::spawn(None, async { let result = lol_io::fs::read_file(mock_file("read_file.txt")).await?; std::io::Result::Ok(result) }); let result = read_to_end.unwrap().block_join(None).unwrap().unwrap(); assert_eq!(b"read_to_end\r\n", &result[..]); let result = read_exact.unwrap().block_join(None).unwrap().unwrap(); assert_eq!(b"read_exact\r\n", &result[..]); let result = read_file.unwrap().block_join(None).unwrap().unwrap(); assert_eq!(b"read_file\r\n", &result[..]); } fn mock_file(file: &'static str) -> PathBuf { let mut path = std::env::var("CARGO_MANIFEST_DIR") .map(PathBuf::from) .unwrap(); path.push("tests"); path.push("__mock__"); path.push(file); path } pub fn make_pool() -> io::Result<(Threadpool, ThreadpoolCallbackEnvironment)> { let pool = Threadpool::new()?; pool.min_threads(1)? .max_threads(1) .set_stack_size(4096, 4096)?; let env = pool .new_environment() .with_priority(co::ThreadpoolPriority::HIGH); Ok((pool, env)) }