Crates.io | time-key-stream-set |
lib.rs | time-key-stream-set |
version | 0.1.6 |
source | src |
created_at | 2023-05-27 15:30:57.134987 |
updated_at | 2023-05-28 13:36:32.449141 |
description | A time-keyed stream set |
homepage | |
repository | |
max_upload_size | |
id | 875976 |
size | 53,855 |
This crate provides an hybrid in/out-of-memory data structure for deduplication of mostly ordered stream of time keys.
There are several priority features that motivate the design and development of this crate.
There are several veteran structures in this space like B+ Trees or Log Structured Merge Trees. As a specialized use case, we need not suffer some of the trade offs involved in the generalization these structures. Several features of the incoming data stream will be utilized to optimize the average case.
A B+ Tree is reasonably well suited for such a scenario, except for several key distinctions. The number one reason being the lack of a viable crate implementation for OOM storage, and number two being the lack of optimization for the contiguous block insertion for u128 time keys. A similar lack of support for LSM trees motivates the development of the following algorithm's implementation.
fn insert(batch: Vec<TimeKey>) -> Vec<bool>
From<&MyId> for TimeKey
, with ts_us: i64
then user_id: i32
and device_id: i32
as positive integers occupying the second half of the time key will leave preceding zeros that are highly compressible for deployments w/ relatively low cardinality user_id and device_id.Segments are compressed via tsz then zstd w/ compression for some sparse segments with consistent data rates over 1_000x to 200_000x compression rates for some data sets when flushed to disk. Segments are accessed serially with insertion rates dependent on the backing index, BTreeMap
right now yeilds insertion rates on the order of ~300_000 time keys per second (single core CPU bound).
On each insert, here is the pseudo code for insertion
Compute the segment start bucket timestamp for each row
Lock the segment index
Find the existing segments in range for bucket timestamps under insertion
Create any new segments not yet in the index
Lock the hot segment cache
Insert new segments into the index and the cache
Loop on LRU eviction and hydration of segments not/under insertion, respecting requested memory limit
Release the hot segment cache lock
Lock each segment under insertion
Release the segment index lock
Insert time keys into their respective segments
Release each segment locks
Update memory usage for newly inserted time keys
In Rust, using the Time Key Stream Set, TkStreamSet
, looks like the following:
use time_key_stream_set::prelude::*;
use std::error::Error;
async fn frontend_receive() -> Result<Vec<AdcRow>, Box<dyn Error>> {
// Who knows, maybe you get this data on an HTTP server
unimplemented!()
}
async fn yolo() {
// Hope the database doesn't rollback this transaction
unimplemented!()
}
#[derive(IntoTimeKey)]
struct AdcRow {
ts_us: i64,
user_id: i32,
device_id: i32,
millivolts: i16,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
// Rehydrate a set of time keys from a known directory or a temperatory directory
// Configure a stream set via a builder with reasonable defaults for the data source
let stream_set = TkStreamSetBuilder::new()
.with_segment_time_interval(Duration::from_secs(60 * 60 * 2))
.with_memory_limit(MemoryLimit::Low)
.build()
.await
.unwrap();
// Multiple threads could be running the same loop on this node
loop {
// I got some new data that may have duplicates
let batch: Vec<AdcRow> = frontend_receive().await?;
// We are going to keep this data, glad RAM doesn't blow up
let dups = stream_set.insert(batch.iter().map(|row| row.into()).collect::<Vec<_>>).await?;
let dedups = batch.iter()
.zip(dups)
.filter_map(|(row, keep)| if keep { Some(row) } else { None });
// Send to the data warehouse somewhere
loop {
match yolo().await {
Ok(_) => break,
Err(e) => todo!(),
}
}
}
}