use std::fs::{create_dir, remove_dir_all}; use std::io; use std::ops::Add; use std::path::PathBuf; use amnesimmap::Aggregate; #[derive(Clone, Debug, Eq, PartialEq)] #[repr(C)] struct Row { max: u16, num: usize, } impl Aggregate for Row { type Marginal = u16; fn combine(&mut self, other: &Self) { self.max = self.max.max(other.max); self.num = self.num.add(other.num); } fn update(&mut self, other: &Self::Marginal) { self.max = self.max.max(*other); self.num = self.num.add(1); } fn empty(last: &Self::Marginal) -> Self { Self { max: *last, num: 0 } } fn initial(curr: &Self::Marginal) -> Self { Self { max: *curr, num: 1 } } } fn tmp(name: &str) -> io::Result { let path = PathBuf::from(std::env!("CARGO_TARGET_TMPDIR")).join(name); if path.exists() { remove_dir_all(&path).unwrap(); } create_dir(&path)?; Ok(path) } #[cfg(test)] mod tests { use std::path::Path; use amnesimmap::BinnedTimeSeries; use super::*; fn create>( path: P, num_bin: usize, bin_size: i64, ) -> anyhow::Result> { // Safety: Row is repr(C) and any permutation of bits is a valid Row unsafe { BinnedTimeSeries::::create_unsafe(path, num_bin, bin_size) } } fn resume>( path: P, num_bin: usize, bin_size: i64, ) -> anyhow::Result> { // Safety: Row is repr(C) and any permutation of bits is a valid Row unsafe { BinnedTimeSeries::::resume_unsafe(path, num_bin, bin_size) } } fn row(max: u16, num: usize) -> Row { Row { max, num } } #[test] fn simple_use_case() { let path = tmp("simple_use_case").unwrap().join("table"); let num_bin = 5; let bin_size = 60; let mut db = create(&path, num_bin, bin_size).unwrap(); assert_eq!(db.sparse(), vec![]); db.update(0, 1); assert_eq!(db.sparse(), vec![(0, &row(1, 1))]); db.update(0, 2); assert_eq!(db.sparse(), vec![(0, &row(2, 2))]); db.update(1, 3); assert_eq!(db.sparse(), vec![(0, &row(3, 3))]); db.update(59, 4); assert_eq!(db.sparse(), vec![(0, &row(4, 4))]); db.update(60, 5); assert_eq!(db.sparse(), vec![(0, &row(4, 4)), (60, &row(5, 1)),]); db.update(180, 6); assert_eq!( db.sparse(), vec![ (0, &row(4, 4)), (60, &row(5, 1)), (120, &row(5, 0)), (180, &row(6, 1)), ] ); db.update(360, 7); assert_eq!( db.sparse(), vec![ (120, &row(5, 0)), (180, &row(6, 1)), (240, &row(6, 0)), (300, &row(6, 0)), (360, &row(7, 1)), ] ); db.flush().unwrap(); drop(db); // Cannot accidentally overwrite existing assert!(create(&path, num_bin, bin_size).is_err()); // Cannot accidentally open wrong database assert!(resume(&path, num_bin + 1, bin_size).is_err()); // Data is persisted let mut db = resume(&path, num_bin, bin_size).unwrap(); assert_eq!( db.sparse(), vec![ (120, &row(5, 0)), (180, &row(6, 1)), (240, &row(6, 0)), (300, &row(6, 0)), (360, &row(7, 1)), ] ); // First update does not assume continuity db.update(480, 8); assert_eq!( db.sparse(), vec![ (120, &row(5, 0)), (240, &row(6, 0)), (300, &row(6, 0)), (360, &row(7, 1)), (480, &row(8, 1)), ] ); // Subsequent updates assume continuity db.update(600, 9); assert_eq!( db.sparse(), vec![ (120, &row(5, 0)), (360, &row(7, 1)), (480, &row(8, 1)), (540, &row(8, 0)), (600, &row(9, 1)), ] ); // Fragmentation is unfortunate but expected at the moment. // Eventually it is rotated out. // In the future the resume method may do defragmentation. db.update(719, 8); db.update(720, 8); assert_eq!( db.sparse(), vec![ (480, &row(8, 1)), (540, &row(8, 0)), (600, &row(9, 1)), (660, &row(9, 1)), (720, &row(8, 1)), ] ); // Larger bin sizes can be emulated assert_eq!(db.aggregated(480, 599), Some(row(8, 1))); assert_eq!(db.aggregated(480, 659), Some(row(9, 2))); } }