use eyros::{DB,Setup,Tree2,QTrace,TreeRef,Row,Coord,Error}; use random::{Source,default as rand}; use tempfile::Builder as Tmpfile; use async_std::{prelude::*,channel}; use std::collections::HashMap; use desert::CountBytes; type P = (Coord,Coord); type V = Vec; type T = Tree2; #[async_std::test] async fn max_tree_bytes_1() -> Result<(),Error> { let dir = Tmpfile::new().prefix("eyros").tempdir()?; let max_tree_bytes = 5_000; let mut db: DB<_,T,P,V> = Setup::from_path(dir.path()) .max_tree_bytes(max_tree_bytes) .build().await? ; db.batch(&vec![ Row::Insert((Coord::Scalar(1.0),Coord::Scalar(3.0)),vec![1;1_000]), Row::Insert((Coord::Scalar(1.0),Coord::Scalar(2.0)),vec![2;2_000]), Row::Insert((Coord::Interval(6.0,9.0),Coord::Interval(4.0,5.0)),vec![3;1_000]), Row::Insert((Coord::Interval(-2.5,0.5),Coord::Scalar(-1.2)),vec![4;2_000]), Row::Insert((Coord::Scalar(-4.5),Coord::Interval(-5.5,-1.2)),vec![5;1_000]), Row::Insert((Coord::Interval(-9.0,-8.0),Coord::Interval(-4.0,4.0)),vec![6;1_000]), ]).await?; db.sync().await?; let trace = Box::new(Trace::default()); let bbox = ((-10.0,-10.0),(10.0,10.0)); let mut stream = db.query_trace(&bbox, trace.clone()).await?; let mut count = 0; while let Some(result) = stream.next().await { result?; count += 1; } trace.close(); assert_eq![count, 6]; let mut sizes = HashMap::new(); while let Ok(r) = trace.next().await { let t = db.trees.get(&r.id).await?; let bytes = t.lock().await.count_bytes(); assert![bytes <= max_tree_bytes, "{} <= {}", bytes, max_tree_bytes]; sizes.insert(r.id, bytes); } Ok(()) } #[async_std::test] async fn max_tree_bytes_2() -> Result<(),Error> { let dir = Tmpfile::new().prefix("eyros").tempdir()?; let max_tree_bytes = 250_000; let mut db: DB<_,T,P,V> = Setup::from_path(dir.path()) .max_tree_bytes(max_tree_bytes) .build().await? ; let mut r = rand().seed([13,12]); let (batch_size,nbatches) = (1_000,25); for _ in 0..nbatches { let batch: Vec> = (0..batch_size).map(|_| { let (point,value) = { let n = (r.read::()*500.0+1.0) as usize; let buf = r.iter().take(n).collect::>(); if r.read::() > 0.5 { let xmin: f32 = r.read::()*2.0-1.0; let xmax: f32 = xmin + r.read::().powf(2.0)*(1.0-xmin); let ymin: f32 = r.read::()*2.0-1.0; let ymax: f32 = ymin + r.read::().powf(2.0)*(1.0-ymin); ( (Coord::Interval(xmin,xmax),Coord::Interval(ymin,ymax)), buf ) } else { let x: f32 = r.read::()*2.0-1.0; let y: f32 = r.read::()*2.0-1.0; ( (Coord::Scalar(x),Coord::Scalar(y)), buf ) } }; Row::Insert(point,value) }).collect(); db.batch(&batch).await?; } db.sync().await?; let trace = Box::new(Trace::default()); let bbox = ((-10.0,-10.0),(10.0,10.0)); let mut stream = db.query_trace(&bbox, trace.clone()).await?; let mut count = 0; while let Some(result) = stream.next().await { result?; count += 1; } trace.close(); assert_eq![count, batch_size * nbatches]; let mut sizes = HashMap::new(); while let Ok(r) = trace.next().await { let t = db.trees.get(&r.id).await?; let bytes = t.lock().await.count_bytes(); assert![bytes <= max_tree_bytes, "{} <= {}", bytes, max_tree_bytes]; sizes.insert(r.id, bytes); } Ok(()) } struct Trace { receiver: channel::Receiver>, sender: channel::Sender>, } impl Clone for Trace { fn clone(&self) -> Self { Self { receiver: self.receiver.clone(), sender: self.sender.clone(), } } } impl Default for Trace { fn default() -> Self { let (sender,receiver) = channel::unbounded(); Self { sender, receiver } } } impl Trace { async fn next(&self) -> Result,Error> { self.receiver.recv().await.map_err(|e| e.into()) } fn close(&self) { self.sender.close(); } } impl QTrace

for Trace { fn trace(&mut self, tr: TreeRef

) { let ch = self.sender.clone(); ch.try_send(tr).unwrap(); } }