#![allow(clippy::mutable_key_type)] use std::collections::{BTreeMap, VecDeque}; use std::{cell::Cell, cell::RefCell, ops, rc::Rc, time::Duration, time::Instant}; use ntex_util::time::{now, sleep, Seconds}; use ntex_util::{spawn, HashSet}; use crate::{io::IoState, IoRef}; const CAP: usize = 64; const SEC: Duration = Duration::from_secs(1); thread_local! { static TIMER: Inner = Inner { running: Cell::new(false), base: Cell::new(Instant::now()), current: Cell::new(0), storage: RefCell::new(InnerMut { cache: VecDeque::with_capacity(CAP), notifications: BTreeMap::default(), }) } } #[derive(Copy, Clone, Default, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] pub struct TimerHandle(u32); impl TimerHandle { pub const ZERO: TimerHandle = TimerHandle(0); pub fn is_set(&self) -> bool { self.0 != 0 } pub fn remains(&self) -> Seconds { TIMER.with(|timer| { let cur = timer.current.get(); if self.0 <= cur { Seconds::ZERO } else { Seconds((self.0 - cur) as u16) } }) } pub fn instant(&self) -> Instant { TIMER.with(|timer| timer.base.get() + Duration::from_secs(self.0 as u64)) } } impl ops::Add for TimerHandle { type Output = TimerHandle; #[inline] fn add(self, other: Seconds) -> TimerHandle { TimerHandle(self.0 + other.0 as u32) } } struct Inner { running: Cell, base: Cell, current: Cell, storage: RefCell, } struct InnerMut { cache: VecDeque>>, notifications: BTreeMap>>, } impl InnerMut { fn unregister(&mut self, hnd: TimerHandle, io: &IoRef) { if let Some(states) = self.notifications.get_mut(&hnd.0) { states.remove(&io.0); } } } pub(crate) fn unregister(hnd: TimerHandle, io: &IoRef) { TIMER.with(|timer| { timer.storage.borrow_mut().unregister(hnd, io); }) } pub(crate) fn update(hnd: TimerHandle, timeout: Seconds, io: &IoRef) -> TimerHandle { TIMER.with(|timer| { let new_hnd = timer.current.get() + timeout.0 as u32; if hnd.0 == new_hnd || hnd.0 == new_hnd + 1 { hnd } else { timer.storage.borrow_mut().unregister(hnd, io); register(timeout, io) } }) } pub(crate) fn register(timeout: Seconds, io: &IoRef) -> TimerHandle { TIMER.with(|timer| { // setup current delta if !timer.running.get() { let current = (now() - timer.base.get()).as_secs() as u32; timer.current.set(current); log::debug!( "{}: Timer driver does not run, current: {}", io.tag(), current ); } let hnd = { let hnd = timer.current.get() + timeout.0 as u32; let mut inner = timer.storage.borrow_mut(); // insert key if let Some(item) = inner.notifications.range_mut(hnd..hnd + 1).next() { item.1.insert(io.0.clone()); *item.0 } else { let mut items = inner.cache.pop_front().unwrap_or_default(); items.insert(io.0.clone()); inner.notifications.insert(hnd, items); hnd } }; if !timer.running.get() { timer.running.set(true); #[allow(clippy::let_underscore_future)] let _ = spawn(async move { let guard = TimerGuard; loop { sleep(SEC).await; let stop = TIMER.with(|timer| { let current = timer.current.get(); timer.current.set(current + 1); // notify io dispatcher let mut inner = timer.storage.borrow_mut(); while let Some(key) = inner.notifications.keys().next() { let key = *key; if key <= current { let mut items = inner.notifications.remove(&key).unwrap(); items.drain().for_each(|st| st.notify_timeout()); if inner.cache.len() <= CAP { inner.cache.push_back(items); } } else { break; } } // new tick if inner.notifications.is_empty() { timer.running.set(false); true } else { false } }); if stop { break; } } drop(guard); }); } TimerHandle(hnd) }) } struct TimerGuard; impl Drop for TimerGuard { fn drop(&mut self) { TIMER.with(|timer| { timer.running.set(false); timer.storage.borrow_mut().notifications.clear(); }) } }