// Copyright 2020 Sigma Prime Pty Ltd. // // Permission is hereby granted, free of charge, to any person obtaining a // copy of this software and associated documentation files (the "Software"), // to deal in the Software without restriction, including without limitation // the rights to use, copy, modify, merge, publish, distribute, sublicense, // and/or sell copies of the Software, and to permit persons to whom the // Software is furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. //! This implements a time-based LRU cache for checking gossipsub message duplicates. use fnv::FnvHashMap; use std::collections::hash_map::{ self, Entry::{Occupied, Vacant}, }; use std::collections::VecDeque; use std::time::{Duration, Instant}; struct ExpiringElement { /// The element that expires element: Element, /// The expire time. expires: Instant, } pub struct TimeCache { /// Mapping a key to its value together with its latest expire time (can be updated through /// reinserts). map: FnvHashMap>, /// An ordered list of keys by expires time. list: VecDeque>, /// The time elements remain in the cache. ttl: Duration, } pub struct OccupiedEntry<'a, K, V> { expiration: Instant, entry: hash_map::OccupiedEntry<'a, K, ExpiringElement>, list: &'a mut VecDeque>, } impl<'a, K, V> OccupiedEntry<'a, K, V> where K: Eq + std::hash::Hash + Clone, { pub fn into_mut(self) -> &'a mut V { &mut self.entry.into_mut().element } pub fn insert_without_updating_expiration(&mut self, value: V) -> V { //keep old expiration, only replace value of element ::std::mem::replace(&mut self.entry.get_mut().element, value) } pub fn insert_and_update_expiration(&mut self, value: V) -> V { //We push back an additional element, the first reference in the list will be ignored // since we also updated the expires in the map, see below. self.list.push_back(ExpiringElement { element: self.entry.key().clone(), expires: self.expiration, }); self.entry .insert(ExpiringElement { element: value, expires: self.expiration, }) .element } } pub struct VacantEntry<'a, K, V> { expiration: Instant, entry: hash_map::VacantEntry<'a, K, ExpiringElement>, list: &'a mut VecDeque>, } impl<'a, K, V> VacantEntry<'a, K, V> where K: Eq + std::hash::Hash + Clone, { pub fn insert(self, value: V) -> &'a mut V { self.list.push_back(ExpiringElement { element: self.entry.key().clone(), expires: self.expiration, }); &mut self .entry .insert(ExpiringElement { element: value, expires: self.expiration, }) .element } } pub enum Entry<'a, K: 'a, V: 'a> { Occupied(OccupiedEntry<'a, K, V>), Vacant(VacantEntry<'a, K, V>), } impl<'a, K: 'a, V: 'a> Entry<'a, K, V> where K: Eq + std::hash::Hash + Clone, { pub fn or_insert_with V>(self, default: F) -> &'a mut V { match self { Entry::Occupied(entry) => entry.into_mut(), Entry::Vacant(entry) => entry.insert(default()), } } } impl TimeCache where Key: Eq + std::hash::Hash + Clone, { pub fn new(ttl: Duration) -> Self { TimeCache { map: FnvHashMap::default(), list: VecDeque::new(), ttl, } } fn remove_expired_keys(&mut self, now: Instant) { while let Some(element) = self.list.pop_front() { if element.expires > now { self.list.push_front(element); break; } if let Occupied(entry) = self.map.entry(element.element.clone()) { if entry.get().expires <= now { entry.remove(); } } } } pub fn entry(&mut self, key: Key) -> Entry { let now = Instant::now(); self.remove_expired_keys(now); match self.map.entry(key) { Occupied(entry) => Entry::Occupied(OccupiedEntry { expiration: now + self.ttl, entry, list: &mut self.list, }), Vacant(entry) => Entry::Vacant(VacantEntry { expiration: now + self.ttl, entry, list: &mut self.list, }), } } /// Empties the entire cache. pub fn clear(&mut self) { self.map.clear(); self.list.clear(); } pub fn contains_key(&mut self, key: &Key) -> bool { self.map.contains_key(key) } pub fn get(&self, key: &Key) -> Option<&Value> { self.map.get(key).map(|e| &e.element) } } pub struct DuplicateCache(TimeCache); impl DuplicateCache where Key: Eq + std::hash::Hash + Clone, { pub fn new(ttl: Duration) -> Self { Self(TimeCache::new(ttl)) } // Inserts new elements and removes any expired elements. // // If the key was not present this returns `true`. If the value was already present this // returns `false`. pub fn insert(&mut self, key: Key) -> bool { if let Entry::Vacant(entry) = self.0.entry(key) { entry.insert(()); true } else { false } } pub fn contains(&mut self, key: &Key) -> bool { self.0.contains_key(key) } } #[cfg(test)] mod test { use super::*; #[test] fn cache_added_entries_exist() { let mut cache = DuplicateCache::new(Duration::from_secs(10)); cache.insert("t"); cache.insert("e"); // Should report that 't' and 't' already exists assert!(!cache.insert("t")); assert!(!cache.insert("e")); } #[test] fn cache_entries_expire() { let mut cache = DuplicateCache::new(Duration::from_millis(100)); cache.insert("t"); assert!(!cache.insert("t")); cache.insert("e"); //assert!(!cache.insert("t")); assert!(!cache.insert("e")); // sleep until cache expiry std::thread::sleep(Duration::from_millis(101)); // add another element to clear previous cache cache.insert("s"); // should be removed from the cache assert!(cache.insert("t")); } }