time-key-stream-set

Crates.iotime-key-stream-set
lib.rstime-key-stream-set
version0.1.6
sourcesrc
created_at2023-05-27 15:30:57.134987
updated_at2023-05-28 13:36:32.449141
descriptionA time-keyed stream set
homepage
repository
max_upload_size
id875976
size53,855
Jacob Trueb (trueb2)

documentation

README

Time Key Stream Set

This crate provides an hybrid in/out-of-memory data structure for deduplication of mostly ordered stream of time keys.

Goals

There are several priority features that motivate the design and development of this crate.

  • Deduplicate a mostly continuous stream of time keys for rows emitted from IoT/Wearable devices with discretely sampling frontends on the order of .1Hz to 10kHz.
    • A time key is expected to be composed of a microsecond timestamp, integral user_id, integral device_id, and integral modality enumerations.
  • Limit RAM usage for the data structure that provides the deduplication feature to allow a single node with 32GiB to 256 GiB of RAM to act as the deduplication node.
  • Provide deduplication for ~100B rows per day
  • Allow configurable retention periods without rebuilding an index or filter
  • Specialize for u128 width time keys

Motivational Heuristics

There are several veteran structures in this space like B+ Trees or Log Structured Merge Trees. As a specialized use case, we need not suffer some of the trade offs involved in the generalization these structures. Several features of the incoming data stream will be utilized to optimize the average case.

  1. Inserts are uploaded in batches from Wearable/Mobile devices
    1. Batch sizes of ~5000 consecutive time keys are the standard case
  2. For each user_id/device_id/source pair
    1. The timestamp in microseconds will be separated by 100s to 1000000s.
    2. Incoming data will rarely go backwards, almost only when reading out saved data from the beginning and again progressing forward.
  3. Row data associated with the time keys are not stored with the time key stream set.
  4. Truncation and reclamation of the stream set resources may happen at relatively infrequent intervals, for example, once hourly.

A B+ Tree is reasonably well suited for such a scenario, except for several key distinctions. The number one reason being the lack of a viable crate implementation for OOM storage, and number two being the lack of optimization for the contiguous block insertion for u128 time keys. A similar lack of support for LSM trees motivates the development of the following algorithm's implementation.

Approach

  • Time-interval partitioned files, Segments, back data on disk that are flushed in LRU fashion at risk of thrashing
    • Time keys belong to one and only one segment
    • User may choose a time-interval (hourly)
      • Aligning w/ reclamation interval, yields trivial truncation
  • Segments are a specialized B+ Tree with page-sized aligned nodes for time keys
    • Supporting an insertion interface, fn insert(batch: Vec<TimeKey>) -> Vec<bool>
    • Supporting efficient page cache usage for memmap file-backed durability
  • Internal nodes are persisted separately from segment "leaf" nodes and held continuously in RAM.
  • Time Keys are 128 bits wide with 64 bits of epoch microseconds and 64 bits of the second half of the id.
    • Implementing From<&MyId> for TimeKey, with ts_us: i64 then user_id: i32 and device_id: i32 as positive integers occupying the second half of the time key will leave preceding zeros that are highly compressible for deployments w/ relatively low cardinality user_id and device_id.

Segments are compressed via tsz then zstd w/ compression for some sparse segments with consistent data rates over 1_000x to 200_000x compression rates for some data sets when flushed to disk. Segments are accessed serially with insertion rates dependent on the backing index, BTreeMap right now yeilds insertion rates on the order of ~300_000 time keys per second (single core CPU bound).

On each insert, here is the pseudo code for insertion

  1. Compute the segment start bucket timestamp for each row

  2. Lock the segment index

  3. Find the existing segments in range for bucket timestamps under insertion

  4. Create any new segments not yet in the index

  5. Lock the hot segment cache

  6. Insert new segments into the index and the cache

  7. Loop on LRU eviction and hydration of segments not/under insertion, respecting requested memory limit

  8. Release the hot segment cache lock

  9. Lock each segment under insertion

  10. Release the segment index lock

  11. Insert time keys into their respective segments

  12. Release each segment locks

  13. Update memory usage for newly inserted time keys

Interface

In Rust, using the Time Key Stream Set, TkStreamSet, looks like the following:

use time_key_stream_set::prelude::*;
use std::error::Error;

async fn frontend_receive() -> Result<Vec<AdcRow>, Box<dyn Error>> {
    // Who knows, maybe you get this data on an HTTP server
    unimplemented!()
}

async fn yolo() {
    // Hope the database doesn't rollback this transaction
    unimplemented!()
}

#[derive(IntoTimeKey)]
struct AdcRow {
    ts_us: i64,
    user_id: i32,
    device_id: i32,
    millivolts: i16,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    // Rehydrate a set of time keys from a known directory or a temperatory directory
    // Configure a stream set via a builder with reasonable defaults for the data source
    let stream_set = TkStreamSetBuilder::new()
        .with_segment_time_interval(Duration::from_secs(60 * 60 * 2))
        .with_memory_limit(MemoryLimit::Low)
        .build()
        .await
        .unwrap();


    // Multiple threads could be running the same loop on this node
    loop {
        // I got some new data that may have duplicates
        let batch: Vec<AdcRow> = frontend_receive().await?;

        // We are going to keep this data, glad RAM doesn't blow up
        let dups = stream_set.insert(batch.iter().map(|row| row.into()).collect::<Vec<_>>).await?;
        let dedups = batch.iter()
            .zip(dups)
            .filter_map(|(row, keep)| if keep { Some(row) } else { None });

        // Send to the data warehouse somewhere
        loop {
            match yolo().await {
                Ok(_) => break,
                Err(e) => todo!(),
            }
        }
    }
}

Commit count: 0

cargo fmt