Crates.io | parquet_aramid |
lib.rs | parquet_aramid |
version | 0.1.0 |
source | src |
created_at | 2024-10-22 16:21:12.72428 |
updated_at | 2024-10-22 16:21:12.72428 |
description | Query engine using Parquet tables as a Key-Value store |
homepage | |
repository | https://gitlab.softwareheritage.org/swh/devel/parquet_aramid-rs |
max_upload_size | |
id | 1418899 |
size | 102,373 |
Query engine using Parquet tables as a Key-Value store, for Rust applications.
Parquet is column-oriented file format, meaning that it is designed to efficiently read of large chunks of a column at once (aka. OLAP workflows).
This also means that it is generally badly suited to queries that expect a few rows at once (aka. OLTP workflows), or relational joins.
This crate is for the few queries of this kind where Parquet can shine. This means:
If your queries do not match point 1, you need a more traditional Parquet query engine, like Datafusion. If your queries do not match points 2 or 3, you should probably use a SQL database (like sqlite), or RocksDB.
# tokio_test::block_on(async {
use std::path::PathBuf;
use std::sync::Arc;
use futures::stream::StreamExt;
use parquet_aramid::Table;
use parquet_aramid::types::FixedSizeBinary;
use parquet_aramid::reader_builder_config::FilterFixedSizeBinaryConfigurator;
// Location of the Parquet table
let url = url::Url::parse("file:///srv/data/my_table/").unwrap();
let (store, path) = object_store::parse_url(&url).unwrap();
let store = store.into();
// Location of external indexes to speed-up lookup
let ef_indexes_path = PathBuf::from("/tmp/my_table_catalog/");
// Initialize table, read metadata to RAM
let table = Table::new(store, path, ef_indexes_path).await.unwrap();
// Load external indexes from disk (see below for how to generate them)
table.mmap_ef_index("key_column").unwrap();
// Filter out rows whose key is not b"abcd"
let keys = Arc::new(vec![FixedSizeBinary(*b"abcd")]);
let configurator = Arc::new(
FilterFixedSizeBinaryConfigurator::with_sorted_keys("key_column", Arc::clone(&keys))
);
// Read the table
table
.stream_for_keys("key_column", Arc::clone(&keys), configurator)
.await
.unwrap()
.for_each(|batch| async move {
// Iterates on every row with b"abcd" as its value in "key_column"
println!("{:?}", batch);
});
# })
# tokio_test::block_on(async {
use std::path::PathBuf;
use std::sync::Arc;
use epserde::ser::Serialize;
use futures::stream::StreamExt;
use parquet_aramid::Table;
use parquet_aramid::types::FixedSizeBinary;
use parquet_aramid::reader_builder_config::FilterFixedSizeBinaryConfigurator;
use tokio::task::JoinSet;
// Location of the Parquet table
let url = url::Url::parse("file:///srv/data/my_table/").unwrap();
let (store, path) = object_store::parse_url(&url).unwrap();
let store = store.into();
// Location where to write external indexes
let ef_indexes_path = PathBuf::from("/tmp/my_table_catalog/");
// Initialize table
let table = Table::new(store, path, ef_indexes_path.clone()).await.unwrap();
// Create directory to store indexes
std::fs::create_dir_all(&ef_indexes_path).unwrap();
let mut tasks = Vec::new();
for file in table.files {
tasks.push(tokio::task::spawn(async move {
// Build index
let ef_values = file
.build_ef_index("key_column")
.await
.expect("Could not build Elias-Fano index");
// Write index to disk
let index_path = file.ef_index_path("key_column");
let mut ef_file =std::fs::File::create_new(&index_path).unwrap();
ef_values
.serialize(&mut ef_file)
.expect("Could not serialize Elias-Fano index");
}));
}
tasks
.into_iter()
.collect::<JoinSet<_>>()
.join_all()
.await
.into_iter()
.collect::<Result<Vec<_>, tokio::task::JoinError>>()
.expect("Could not join task");
# })
[Table::new
] reads all metadata from Parquet files in the table into RAM,
and keeps it until it is dropped, in order to significantly reduce time
spent on each query.
This can be significant for some tables, but is no more than Datafusion's peak
memory usage while a query is running.
[Table::stream_for_keys
] is designed very similary to any Parquet query
engine's SELECT xxx FROM table WHERE key = yyy
:
Table::mmap_ef_index
was called for this column, use said index to
filter out files which do not contain any of the given keys.
For kept files, remember which of the input keys are present in the file
(to reduce false positives in the next three filters).
This filter does not have false positives.
This is a parquet_aramid
-specific design/format, built on
Sebastiano Vigna's Elias-Fano implementationSo far, none of the keys or data in the table was read.
At this point, every matching page (of the key column, and potentially others
depending on the configurator
) is decompressed, and the configurator
checks individual values, to tell parquet_aramid
which rows × columns should
bne returned.
See the [ParquetRecordBatchStreamBuilder
] documentation for an exhaustive
overview of all the filtering the configurator
can do.