//! stream_ext use futures::StreamExt as FuturesStreamExt; use lol_io::{ kernel::threading::event::{Event, EventListener, InitialState, OwnedEventHandle, Reset}, util::stream_ext::StreamExt, }; use std::{ matches, sync::atomic::{AtomicUsize, Ordering}, task::Poll, }; // TODO this is not an integration test since we are not polling this realistically #[test] fn lol_test_stream_ext_throttle_until() { // Our throttler let event = OwnedEventHandle::anonymous(Reset::Manual, InitialState::Unset).unwrap(); let listener = EventListener::new(None).unwrap(); let throttle = listener.listen(event.borrow(), None); // Create a stream to emit a throttled sequence of numbers let mut stream = futures::stream::poll_fn(|_| -> Poll> { static COUNT: AtomicUsize = AtomicUsize::new(0); let next = COUNT.fetch_add(1, Ordering::SeqCst); Poll::Ready(Some(next)) }) .throttle_until(throttle) .take(3); // Setup mock waker let waker = futures::task::noop_waker_ref(); let mut cx = std::task::Context::from_waker(waker); // First poll is not throttled let next = stream.poll_next_unpin(&mut cx); assert!(matches!(next, Poll::Ready(Some(Ok(0))))); // We're now throttling so next poll will be pending let next = stream.poll_next_unpin(&mut cx); assert!(matches!(next, Poll::Pending)); // We set the event and now the next poll wont throttle anymore // NOTE time delay because kernel might not signal the event before we poll event.set().unwrap(); std::thread::sleep(std::time::Duration::from_millis(1)); let next = stream.poll_next_unpin(&mut cx); assert!(matches!(next, Poll::Ready(Some(Ok(1))))); // We are now pending again let next = stream.poll_next_unpin(&mut cx); assert!(matches!(next, Poll::Pending)); // We set the event and now the next poll wont throttle anymore. // NOTE time delay because kernel might not signal the event before we poll event.set().unwrap(); std::thread::sleep(std::time::Duration::from_millis(1)); let next = stream.poll_next_unpin(&mut cx); assert!(matches!(next, Poll::Ready(Some(Ok(2))))); // Our stream is over let next = stream.poll_next_unpin(&mut cx); assert!(matches!(next, Poll::Ready(None))); }