| Crates.io | manifold-timeseries |
| lib.rs | manifold-timeseries |
| version | 0.1.0 |
| created_at | 2025-11-03 03:29:27.384272+00 |
| updated_at | 2025-11-03 03:29:27.384272+00 |
| description | Time-series storage optimizations for Manifold embedded database |
| homepage | https://github.com/tomWhiting/manifold |
| repository | https://github.com/tomWhiting/manifold |
| max_upload_size | |
| id | 1913842 |
| size | 117,871 |
Time-series storage optimizations for the Manifold embedded database.
manifold-timeseries provides ergonomic, type-safe wrappers around Manifold's core primitives for storing and querying time-series data with multi-granularity downsampling and retention policies. It does not implement time-series analytics (forecasting, anomaly detection) - instead, it focuses on efficient persistent storage and provides integration traits for external analytics libraries.
TimeSeriesSource trait for external analytics librariesuse manifold::column_family::ColumnFamilyDatabase;
use manifold_timeseries::{TimeSeriesTable, TimeSeriesTableRead, AbsoluteEncoding};
// Open database and column family
let db = ColumnFamilyDatabase::open("my.db")?;
let cf = db.column_family_or_create("metrics")?;
// Write time series data
{
let write_txn = cf.begin_write()?;
let mut ts = TimeSeriesTable::<AbsoluteEncoding>::open(&write_txn, "cpu")?;
let timestamp = 1609459200000; // 2021-01-01 00:00:00 UTC (milliseconds)
ts.write("server1.cpu.usage", timestamp, 42.5)?;
ts.write("server1.cpu.usage", timestamp + 1000, 43.2)?;
ts.write("server1.cpu.usage", timestamp + 2000, 41.8)?;
drop(ts);
write_txn.commit()?;
}
// Read time series data
let read_txn = cf.begin_read()?;
let ts = TimeSeriesTableRead::<AbsoluteEncoding>::open(&read_txn, "cpu")?;
// Query a specific point
if let Some(value) = ts.get("server1.cpu.usage", timestamp)? {
println!("CPU usage at {}: {}%", timestamp, value);
}
// Range query
let start = 1609459200000;
let end = 1609459260000;
for point in ts.range("server1.cpu.usage", start, end)? {
let (timestamp, value) = point?;
println!("{}: {}", timestamp, value);
}
For high-throughput metric ingestion:
let points = vec![
("server1.cpu.usage", 1609459200000, 42.5),
("server1.cpu.usage", 1609459201000, 43.2),
("server1.mem.usage", 1609459200000, 78.3),
("server2.cpu.usage", 1609459200000, 55.1),
];
let write_txn = cf.begin_write()?;
let mut ts = TimeSeriesTable::<AbsoluteEncoding>::open(&write_txn, "metrics")?;
// Batch write for better performance
ts.write_batch(&points, false)?;
drop(ts);
write_txn.commit()?;
Stores timestamps as 8-byte big-endian u64 values:
use manifold_timeseries::AbsoluteEncoding;
let mut ts = TimeSeriesTable::<AbsoluteEncoding>::open(&write_txn, "metrics")?;
Best for:
Storage: 8 bytes per timestamp (fixed)
Stores timestamps as varint-compressed deltas with periodic checkpoints:
use manifold_timeseries::DeltaEncoding;
let mut ts = TimeSeriesTable::<DeltaEncoding>::open(&write_txn, "metrics")?;
Best for:
Storage: 1-9 bytes per timestamp (variable, typically 1-2 bytes for regular intervals)
Each TimeSeriesTable maintains four internal tables for efficient queries at different time scales:
Each aggregate contains:
min: f32 - Minimum value in the windowmax: f32 - Maximum value in the windowsum: f32 - Sum of all valuescount: u64 - Number of data pointslast: f32 - Most recent valueConvert raw data to aggregates:
use manifold_timeseries::Granularity;
let write_txn = cf.begin_write()?;
let mut ts = TimeSeriesTable::<AbsoluteEncoding>::open(&write_txn, "cpu")?;
// Downsample last hour of raw data to minute aggregates
let start_ms = now_ms - (60 * 60 * 1000); // 1 hour ago
let count = ts.downsample_to_minute("server1.cpu.usage", start_ms, now_ms)?;
println!("Created {} minute aggregates", count);
// Downsample minute data to hour aggregates
let count = ts.downsample_minute_to_hour("server1.cpu.usage", start_ms, now_ms)?;
println!("Created {} hour aggregates", count);
drop(ts);
write_txn.commit()?;
Query aggregates:
let read_txn = cf.begin_read()?;
let ts = TimeSeriesTableRead::<AbsoluteEncoding>::open(&read_txn, "cpu")?;
// Get minute-level aggregate
let agg = ts.get_aggregate(Granularity::Minute, "server1.cpu.usage", timestamp)?;
if let Some(agg) = agg {
println!("Minute stats: min={}, max={}, avg={}", agg.min, agg.max, agg.average());
}
// Range query over hour aggregates
for result in ts.range_aggregates(Granularity::Hour, "server1.cpu.usage", start, end)? {
let (timestamp, agg) = result?;
println!("{}: avg={}", timestamp, agg.average());
}
Delete old data to manage storage:
use std::time::Duration;
let write_txn = cf.begin_write()?;
let mut ts = TimeSeriesTable::<AbsoluteEncoding>::open(&write_txn, "metrics")?;
// Keep only last 7 days of raw data
ts.apply_retention(
Granularity::Raw,
Duration::from_secs(7 * 24 * 60 * 60)
)?;
// Apply multiple retention policies at once
ts.apply_all_retentions(&[
(Granularity::Raw, Duration::from_secs(7 * 24 * 60 * 60)), // 7 days
(Granularity::Minute, Duration::from_secs(30 * 24 * 60 * 60)), // 30 days
(Granularity::Hour, Duration::from_secs(90 * 24 * 60 * 60)), // 90 days
(Granularity::Day, Duration::from_secs(365 * 24 * 60 * 60)), // 1 year
])?;
drop(ts);
write_txn.commit()?;
Each time series table creates four internal Manifold tables:
{name}_raw → (timestamp: u64, series_id: &str) → value: f32
{name}_minute → (timestamp: u64, series_id: &str) → aggregate: Aggregate
{name}_hour → (timestamp: u64, series_id: &str) → aggregate: Aggregate
{name}_day → (timestamp: u64, series_id: &str) → aggregate: Aggregate
All tables share the same composite key structure for efficient range queries.
Aggregates are stored as fixed-width 24-byte values:
[min: f32][max: f32][sum: f32][count: u64][last: f32]
4 bytes 4 bytes 4 bytes 8 bytes 4 bytes
The crate includes comprehensive examples demonstrating real-world usage:
examples/metrics_collection.rs)Real system metrics collection:
cargo run --example metrics_collection -p manifold-timeseries
examples/iot_sensors.rs)IoT sensor data simulation:
cargo run --example iot_sensors -p manifold-timeseries
examples/downsampling_lifecycle.rs)Complete downsampling workflow:
cargo run --example downsampling_lifecycle -p manifold-timeseries
manifold-timeseries works seamlessly with other manifold domain layers:
// Store entity embeddings in vectors
let vectors_cf = db.column_family_or_create("embeddings")?;
let mut vectors = VectorTable::<768>::open(&vectors_txn, "entities")?;
vectors.insert("server_1", &embedding)?;
// Store entity relationships in graph
let graph_cf = db.column_family_or_create("infrastructure")?;
let mut graph = GraphTable::open(&graph_txn, "network")?;
graph.add_edge(&server_1, "connects_to", &server_2, true, 1.0)?;
// Store entity metrics in time series
let metrics_cf = db.column_family_or_create("monitoring")?;
let mut ts = TimeSeriesTable::open(&metrics_txn, "server_metrics")?;
ts.write("server_1.cpu", timestamp, 45.2)?;
The TimeSeriesSource trait enables integration with external time-series analytics libraries:
use manifold_timeseries::TimeSeriesSource;
let read_txn = cf.begin_read()?;
let ts = TimeSeriesTableRead::<AbsoluteEncoding>::open(&read_txn, "metrics")?;
// Use the trait to integrate with external libraries
let points: Vec<_> = ts.range_raw("cpu.usage", start, end)?
.collect::<Result<Vec<_>, _>>()?;
// Pass to analytics library for forecasting, anomaly detection, etc.
// (example with hypothetical library)
let forecast = analytics::forecast(&points, 24)?;
manifold version 3.1+sorted: true for better performanceLicensed under either of:
at your option.
Contributions are welcome! This crate follows the patterns established in the manifold domain layer architecture.