#![allow(clippy::disallowed_names)] #![warn(rust_2018_idioms)] #![cfg(feature = "full")] use tokio::time::{self, sleep, sleep_until, Duration, Instant}; use tokio_test::{assert_pending, assert_ready, task}; use tokio_util::time::DelayQueue; macro_rules! poll { ($queue:ident) => { $queue.enter(|cx, mut queue| queue.poll_expired(cx)) }; } macro_rules! assert_ready_some { ($e:expr) => {{ match assert_ready!($e) { Some(v) => v, None => panic!("None"), } }}; } #[tokio::test] async fn single_immediate_delay() { time::pause(); let mut queue = task::spawn(DelayQueue::new()); let _key = queue.insert_at("foo", Instant::now()); // Advance time by 1ms to handle thee rounding sleep(ms(1)).await; assert_ready_some!(poll!(queue)); let entry = assert_ready!(poll!(queue)); assert!(entry.is_none()) } #[tokio::test] async fn multi_immediate_delays() { time::pause(); let mut queue = task::spawn(DelayQueue::new()); let _k = queue.insert_at("1", Instant::now()); let _k = queue.insert_at("2", Instant::now()); let _k = queue.insert_at("3", Instant::now()); sleep(ms(1)).await; let mut res = vec![]; while res.len() < 3 { let entry = assert_ready_some!(poll!(queue)); res.push(entry.into_inner()); } let entry = assert_ready!(poll!(queue)); assert!(entry.is_none()); res.sort_unstable(); assert_eq!("1", res[0]); assert_eq!("2", res[1]); assert_eq!("3", res[2]); } #[tokio::test] async fn single_short_delay() { time::pause(); let mut queue = task::spawn(DelayQueue::new()); let _key = queue.insert_at("foo", Instant::now() + ms(5)); assert_pending!(poll!(queue)); sleep(ms(1)).await; assert!(!queue.is_woken()); sleep(ms(5)).await; assert!(queue.is_woken()); let entry = assert_ready_some!(poll!(queue)); assert_eq!(*entry.get_ref(), "foo"); let entry = assert_ready!(poll!(queue)); assert!(entry.is_none()); } #[tokio::test] async fn multi_delay_at_start() { time::pause(); let long = 262_144 + 9 * 4096; let delays = &[1000, 2, 234, long, 60, 10]; let mut queue = task::spawn(DelayQueue::new()); // Setup the delays for &i in delays { let _key = queue.insert_at(i, Instant::now() + ms(i)); } assert_pending!(poll!(queue)); assert!(!queue.is_woken()); let start = Instant::now(); for elapsed in 0..1200 { println!("elapsed: {:?}", elapsed); let elapsed = elapsed + 1; tokio::time::sleep_until(start + ms(elapsed)).await; if delays.contains(&elapsed) { assert!(queue.is_woken()); assert_ready!(poll!(queue)); assert_pending!(poll!(queue)); } else if queue.is_woken() { let cascade = &[192, 960]; assert!( cascade.contains(&elapsed), "elapsed={} dt={:?}", elapsed, Instant::now() - start ); assert_pending!(poll!(queue)); } } println!("finished multi_delay_start"); } #[tokio::test] async fn insert_in_past_fires_immediately() { println!("running insert_in_past_fires_immediately"); time::pause(); let mut queue = task::spawn(DelayQueue::new()); let now = Instant::now(); sleep(ms(10)).await; queue.insert_at("foo", now); assert_ready!(poll!(queue)); println!("finished insert_in_past_fires_immediately"); } #[tokio::test] async fn remove_entry() { time::pause(); let mut queue = task::spawn(DelayQueue::new()); let key = queue.insert_at("foo", Instant::now() + ms(5)); assert_pending!(poll!(queue)); let entry = queue.remove(&key); assert_eq!(entry.into_inner(), "foo"); sleep(ms(10)).await; let entry = assert_ready!(poll!(queue)); assert!(entry.is_none()); } #[tokio::test] async fn reset_entry() { time::pause(); let mut queue = task::spawn(DelayQueue::new()); let now = Instant::now(); let key = queue.insert_at("foo", now + ms(5)); assert_pending!(poll!(queue)); sleep(ms(1)).await; queue.reset_at(&key, now + ms(10)); assert_pending!(poll!(queue)); sleep(ms(7)).await; assert!(!queue.is_woken()); assert_pending!(poll!(queue)); sleep(ms(3)).await; assert!(queue.is_woken()); let entry = assert_ready_some!(poll!(queue)); assert_eq!(*entry.get_ref(), "foo"); let entry = assert_ready!(poll!(queue)); assert!(entry.is_none()) } // Reproduces tokio-rs/tokio#849. #[tokio::test] async fn reset_much_later() { time::pause(); let mut queue = task::spawn(DelayQueue::new()); let now = Instant::now(); sleep(ms(1)).await; let key = queue.insert_at("foo", now + ms(200)); assert_pending!(poll!(queue)); sleep(ms(3)).await; queue.reset_at(&key, now + ms(10)); sleep(ms(20)).await; assert!(queue.is_woken()); } // Reproduces tokio-rs/tokio#849. #[tokio::test] async fn reset_twice() { time::pause(); let mut queue = task::spawn(DelayQueue::new()); let now = Instant::now(); sleep(ms(1)).await; let key = queue.insert_at("foo", now + ms(200)); assert_pending!(poll!(queue)); sleep(ms(3)).await; queue.reset_at(&key, now + ms(50)); sleep(ms(20)).await; queue.reset_at(&key, now + ms(40)); sleep(ms(20)).await; assert!(queue.is_woken()); } /// Regression test: Given an entry inserted with a deadline in the past, so /// that it is placed directly on the expired queue, reset the entry to a /// deadline in the future. Validate that this leaves the entry and queue in an /// internally consistent state by running an additional reset on the entry /// before polling it to completion. #[tokio::test] async fn repeatedly_reset_entry_inserted_as_expired() { time::pause(); let mut queue = task::spawn(DelayQueue::new()); let now = Instant::now(); let key = queue.insert_at("foo", now - ms(100)); queue.reset_at(&key, now + ms(100)); queue.reset_at(&key, now + ms(50)); assert_pending!(poll!(queue)); time::sleep_until(now + ms(60)).await; assert!(queue.is_woken()); let entry = assert_ready_some!(poll!(queue)).into_inner(); assert_eq!(entry, "foo"); let entry = assert_ready!(poll!(queue)); assert!(entry.is_none()); } #[tokio::test] async fn remove_expired_item() { time::pause(); let mut queue = DelayQueue::new(); let now = Instant::now(); sleep(ms(10)).await; let key = queue.insert_at("foo", now); let entry = queue.remove(&key); assert_eq!(entry.into_inner(), "foo"); } /// Regression test: it should be possible to remove entries which fall in the /// 0th slot of the internal timer wheel — that is, entries whose expiration /// (a) falls at the beginning of one of the wheel's hierarchical levels and (b) /// is equal to the wheel's current elapsed time. #[tokio::test] async fn remove_at_timer_wheel_threshold() { time::pause(); let mut queue = task::spawn(DelayQueue::new()); let now = Instant::now(); let key1 = queue.insert_at("foo", now + ms(64)); let key2 = queue.insert_at("bar", now + ms(64)); sleep(ms(80)).await; let entry = assert_ready_some!(poll!(queue)).into_inner(); match entry { "foo" => { let entry = queue.remove(&key2).into_inner(); assert_eq!(entry, "bar"); } "bar" => { let entry = queue.remove(&key1).into_inner(); assert_eq!(entry, "foo"); } other => panic!("other: {:?}", other), } } #[tokio::test] async fn expires_before_last_insert() { time::pause(); let mut queue = task::spawn(DelayQueue::new()); let now = Instant::now(); queue.insert_at("foo", now + ms(10_000)); // Delay should be set to 8.192s here. assert_pending!(poll!(queue)); // Delay should be set to the delay of the new item here queue.insert_at("bar", now + ms(600)); assert_pending!(poll!(queue)); sleep(ms(600)).await; assert!(queue.is_woken()); let entry = assert_ready_some!(poll!(queue)).into_inner(); assert_eq!(entry, "bar"); } #[tokio::test] async fn multi_reset() { time::pause(); let mut queue = task::spawn(DelayQueue::new()); let now = Instant::now(); let one = queue.insert_at("one", now + ms(200)); let two = queue.insert_at("two", now + ms(250)); assert_pending!(poll!(queue)); queue.reset_at(&one, now + ms(300)); queue.reset_at(&two, now + ms(350)); queue.reset_at(&one, now + ms(400)); sleep(ms(310)).await; assert_pending!(poll!(queue)); sleep(ms(50)).await; let entry = assert_ready_some!(poll!(queue)); assert_eq!(*entry.get_ref(), "two"); assert_pending!(poll!(queue)); sleep(ms(50)).await; let entry = assert_ready_some!(poll!(queue)); assert_eq!(*entry.get_ref(), "one"); let entry = assert_ready!(poll!(queue)); assert!(entry.is_none()) } #[tokio::test] async fn expire_first_key_when_reset_to_expire_earlier() { time::pause(); let mut queue = task::spawn(DelayQueue::new()); let now = Instant::now(); let one = queue.insert_at("one", now + ms(200)); queue.insert_at("two", now + ms(250)); assert_pending!(poll!(queue)); queue.reset_at(&one, now + ms(100)); sleep(ms(100)).await; assert!(queue.is_woken()); let entry = assert_ready_some!(poll!(queue)).into_inner(); assert_eq!(entry, "one"); } #[tokio::test] async fn expire_second_key_when_reset_to_expire_earlier() { time::pause(); let mut queue = task::spawn(DelayQueue::new()); let now = Instant::now(); queue.insert_at("one", now + ms(200)); let two = queue.insert_at("two", now + ms(250)); assert_pending!(poll!(queue)); queue.reset_at(&two, now + ms(100)); sleep(ms(100)).await; assert!(queue.is_woken()); let entry = assert_ready_some!(poll!(queue)).into_inner(); assert_eq!(entry, "two"); } #[tokio::test] async fn reset_first_expiring_item_to_expire_later() { time::pause(); let mut queue = task::spawn(DelayQueue::new()); let now = Instant::now(); let one = queue.insert_at("one", now + ms(200)); let _two = queue.insert_at("two", now + ms(250)); assert_pending!(poll!(queue)); queue.reset_at(&one, now + ms(300)); sleep(ms(250)).await; assert!(queue.is_woken()); let entry = assert_ready_some!(poll!(queue)).into_inner(); assert_eq!(entry, "two"); } #[tokio::test] async fn insert_before_first_after_poll() { time::pause(); let mut queue = task::spawn(DelayQueue::new()); let now = Instant::now(); let _one = queue.insert_at("one", now + ms(200)); assert_pending!(poll!(queue)); let _two = queue.insert_at("two", now + ms(100)); sleep(ms(99)).await; assert_pending!(poll!(queue)); sleep(ms(1)).await; assert!(queue.is_woken()); let entry = assert_ready_some!(poll!(queue)).into_inner(); assert_eq!(entry, "two"); } #[tokio::test] async fn insert_after_ready_poll() { time::pause(); let mut queue = task::spawn(DelayQueue::new()); let now = Instant::now(); queue.insert_at("1", now + ms(100)); queue.insert_at("2", now + ms(100)); queue.insert_at("3", now + ms(100)); assert_pending!(poll!(queue)); sleep(ms(100)).await; assert!(queue.is_woken()); let mut res = vec![]; while res.len() < 3 { let entry = assert_ready_some!(poll!(queue)); res.push(entry.into_inner()); queue.insert_at("foo", now + ms(500)); } res.sort_unstable(); assert_eq!("1", res[0]); assert_eq!("2", res[1]); assert_eq!("3", res[2]); } #[tokio::test] async fn reset_later_after_slot_starts() { time::pause(); let mut queue = task::spawn(DelayQueue::new()); let now = Instant::now(); let foo = queue.insert_at("foo", now + ms(100)); assert_pending!(poll!(queue)); sleep_until(now + Duration::from_millis(80)).await; assert!(!queue.is_woken()); // At this point the queue hasn't been polled, so `elapsed` on the wheel // for the queue is still at 0 and hence the 1ms resolution slots cover // [0-64). Resetting the time on the entry to 120 causes it to get put in // the [64-128) slot. As the queue knows that the first entry is within // that slot, but doesn't know when, it must wake immediately to advance // the wheel. queue.reset_at(&foo, now + ms(120)); assert!(queue.is_woken()); assert_pending!(poll!(queue)); sleep_until(now + Duration::from_millis(119)).await; assert!(!queue.is_woken()); sleep(ms(1)).await; assert!(queue.is_woken()); let entry = assert_ready_some!(poll!(queue)).into_inner(); assert_eq!(entry, "foo"); } #[tokio::test] async fn reset_inserted_expired() { time::pause(); let mut queue = task::spawn(DelayQueue::new()); let now = Instant::now(); let key = queue.insert_at("foo", now - ms(100)); // this causes the panic described in #2473 queue.reset_at(&key, now + ms(100)); assert_eq!(1, queue.len()); sleep(ms(200)).await; let entry = assert_ready_some!(poll!(queue)).into_inner(); assert_eq!(entry, "foo"); assert_eq!(queue.len(), 0); } #[tokio::test] async fn reset_earlier_after_slot_starts() { time::pause(); let mut queue = task::spawn(DelayQueue::new()); let now = Instant::now(); let foo = queue.insert_at("foo", now + ms(200)); assert_pending!(poll!(queue)); sleep_until(now + Duration::from_millis(80)).await; assert!(!queue.is_woken()); // At this point the queue hasn't been polled, so `elapsed` on the wheel // for the queue is still at 0 and hence the 1ms resolution slots cover // [0-64). Resetting the time on the entry to 120 causes it to get put in // the [64-128) slot. As the queue knows that the first entry is within // that slot, but doesn't know when, it must wake immediately to advance // the wheel. queue.reset_at(&foo, now + ms(120)); assert!(queue.is_woken()); assert_pending!(poll!(queue)); sleep_until(now + Duration::from_millis(119)).await; assert!(!queue.is_woken()); sleep(ms(1)).await; assert!(queue.is_woken()); let entry = assert_ready_some!(poll!(queue)).into_inner(); assert_eq!(entry, "foo"); } #[tokio::test] async fn insert_in_past_after_poll_fires_immediately() { time::pause(); let mut queue = task::spawn(DelayQueue::new()); let now = Instant::now(); queue.insert_at("foo", now + ms(200)); assert_pending!(poll!(queue)); sleep(ms(80)).await; assert!(!queue.is_woken()); queue.insert_at("bar", now + ms(40)); assert!(queue.is_woken()); let entry = assert_ready_some!(poll!(queue)).into_inner(); assert_eq!(entry, "bar"); } #[tokio::test] async fn delay_queue_poll_expired_when_empty() { let mut delay_queue = task::spawn(DelayQueue::new()); let key = delay_queue.insert(0, std::time::Duration::from_secs(10)); assert_pending!(poll!(delay_queue)); delay_queue.remove(&key); assert!(assert_ready!(poll!(delay_queue)).is_none()); } #[tokio::test(start_paused = true)] async fn compact_expire_empty() { let mut queue = task::spawn(DelayQueue::new()); let now = Instant::now(); queue.insert_at("foo1", now + ms(10)); queue.insert_at("foo2", now + ms(10)); sleep(ms(10)).await; let mut res = vec![]; while res.len() < 2 { let entry = assert_ready_some!(poll!(queue)); res.push(entry.into_inner()); } queue.compact(); assert_eq!(queue.len(), 0); assert_eq!(queue.capacity(), 0); } #[tokio::test(start_paused = true)] async fn compact_remove_empty() { let mut queue = task::spawn(DelayQueue::new()); let now = Instant::now(); let key1 = queue.insert_at("foo1", now + ms(10)); let key2 = queue.insert_at("foo2", now + ms(10)); queue.remove(&key1); queue.remove(&key2); queue.compact(); assert_eq!(queue.len(), 0); assert_eq!(queue.capacity(), 0); } #[tokio::test(start_paused = true)] // Trigger a re-mapping of keys in the slab due to a `compact` call and // test removal of re-mapped keys async fn compact_remove_remapped_keys() { let mut queue = task::spawn(DelayQueue::new()); let now = Instant::now(); queue.insert_at("foo1", now + ms(10)); queue.insert_at("foo2", now + ms(10)); // should be assigned indices 3 and 4 let key3 = queue.insert_at("foo3", now + ms(20)); let key4 = queue.insert_at("foo4", now + ms(20)); sleep(ms(10)).await; let mut res = vec![]; while res.len() < 2 { let entry = assert_ready_some!(poll!(queue)); res.push(entry.into_inner()); } // items corresponding to `foo3` and `foo4` will be assigned // new indices here queue.compact(); queue.insert_at("foo5", now + ms(10)); // test removal of re-mapped keys let expired3 = queue.remove(&key3); let expired4 = queue.remove(&key4); assert_eq!(expired3.into_inner(), "foo3"); assert_eq!(expired4.into_inner(), "foo4"); queue.compact(); assert_eq!(queue.len(), 1); assert_eq!(queue.capacity(), 1); } #[tokio::test(start_paused = true)] async fn compact_change_deadline() { let mut queue = task::spawn(DelayQueue::new()); let mut now = Instant::now(); queue.insert_at("foo1", now + ms(10)); queue.insert_at("foo2", now + ms(10)); // should be assigned indices 3 and 4 queue.insert_at("foo3", now + ms(20)); let key4 = queue.insert_at("foo4", now + ms(20)); sleep(ms(10)).await; let mut res = vec![]; while res.len() < 2 { let entry = assert_ready_some!(poll!(queue)); res.push(entry.into_inner()); } // items corresponding to `foo3` and `foo4` should be assigned // new indices queue.compact(); now = Instant::now(); queue.insert_at("foo5", now + ms(10)); let key6 = queue.insert_at("foo6", now + ms(10)); queue.reset_at(&key4, now + ms(20)); queue.reset_at(&key6, now + ms(20)); // foo3 and foo5 will expire sleep(ms(10)).await; while res.len() < 4 { let entry = assert_ready_some!(poll!(queue)); res.push(entry.into_inner()); } sleep(ms(10)).await; while res.len() < 6 { let entry = assert_ready_some!(poll!(queue)); res.push(entry.into_inner()); } let entry = assert_ready!(poll!(queue)); assert!(entry.is_none()); } #[cfg_attr(target_os = "wasi", ignore = "FIXME: Does not seem to work with WASI")] #[tokio::test(start_paused = true)] async fn remove_after_compact() { let now = Instant::now(); let mut queue = DelayQueue::new(); let foo_key = queue.insert_at("foo", now + ms(10)); queue.insert_at("bar", now + ms(20)); queue.remove(&foo_key); queue.compact(); let panic = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { queue.remove(&foo_key); })); assert!(panic.is_err()); } #[cfg_attr(target_os = "wasi", ignore = "FIXME: Does not seem to work with WASI")] #[tokio::test(start_paused = true)] async fn remove_after_compact_poll() { let now = Instant::now(); let mut queue = task::spawn(DelayQueue::new()); let foo_key = queue.insert_at("foo", now + ms(10)); queue.insert_at("bar", now + ms(20)); sleep(ms(10)).await; assert_eq!(assert_ready_some!(poll!(queue)).key(), foo_key); queue.compact(); let panic = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { queue.remove(&foo_key); })); assert!(panic.is_err()); } fn ms(n: u64) -> Duration { Duration::from_millis(n) }