parquet_opendal

Crates.ioparquet_opendal
lib.rsparquet_opendal
version0.2.2
sourcesrc
created_at2024-08-13 09:42:25.275793
updated_at2024-11-07 06:02:25.503676
descriptionparquet Integration for Apache OpenDAL
homepagehttps://opendal.apache.org/
repositoryhttps://github.com/apache/opendal
max_upload_size
id1335769
size97,897
Xuanwo (Xuanwo)

documentation

README

Apache OpenDALâ„¢ parquet integration

Build Status Latest Version Crate Downloads chat

parquet_opendal provides parquet efficient IO utilities.

Useful Links

Examples

Add the following dependencies to your Cargo.toml with correct version:

[dependencies]
parquet_opendal = "0.0.1"
opendal = { version = "0.48.0", features = ["services-s3"] }
use std::sync::Arc;

use arrow::array::{ArrayRef, Int64Array, RecordBatch};

use futures::StreamExt;
use opendal::{services::S3Config, Operator};
use parquet::arrow::{AsyncArrowWriter, ParquetRecordBatchStreamBuilder};
use parquet_opendal::{AsyncReader, AsyncWriter};

#[tokio::main]
async fn main() {
    let mut cfg = S3Config::default();
    cfg.access_key_id = Some("my_access_key".to_string());
    cfg.secret_access_key = Some("my_secret_key".to_string());
    cfg.endpoint = Some("my_endpoint".to_string());
    cfg.region = Some("my_region".to_string());
    cfg.bucket = "my_bucket".to_string();

    // Create a new operator
    let operator = Operator::from_config(cfg).unwrap().finish();
    let path = "/path/to/file.parquet";

    // Create an async writer
    let writer = AsyncWriter::new(
        operator
            .writer_with(path)
            .chunk(32 * 1024 * 1024)
            .concurrent(8)
            .await
            .unwrap(),
    );

    let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
    let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
    let mut writer = AsyncArrowWriter::try_new(writer, to_write.schema(), None).unwrap();
    writer.write(&to_write).await.unwrap();
    writer.close().await.unwrap();

    /// `gap(512 * 1024)` - Sets the maximum gap size (in bytes) to merge small byte ranges
    ///   to 512 KB.
    /// `chunk(16 * 1024 * 1024)` - Sets the chunk size (in bytes) for reading data to 16 MB.
    /// `concurrent(16)` - Sets the number of concurrent fetch operations to 16.
    let reader = operator
        .reader_with(path)
        .gap(512 * 1024)
        .chunk(16 * 1024 * 1024)
        .concurrent(16)
        .await
        .unwrap();

    let content_len = operator.stat(path).await.unwrap().content_length();
    // `with_prefetch_footer_size(512 * 1024)` - Sets the prefetch footer size to 512 KB.
    let reader = AsyncReader::new(reader, content_len).with_prefetch_footer_size(512 * 1024);
    let mut stream = ParquetRecordBatchStreamBuilder::new(reader)
        .await
        .unwrap()
        .build()
        .unwrap();
    let read = stream.next().await.unwrap().unwrap();
    assert_eq!(to_write, read);
}

Branding

The first and most prominent mentions must use the full form: Apache OpenDALâ„¢ of the name for any individual usage (webpage, handout, slides, etc.) Depending on the context and writing style, you should use the full form of the name sufficiently often to ensure that readers clearly understand the association of both the OpenDAL project and the OpenDAL software product to the ASF as the parent organization.

For more details, see the Apache Product Name Usage Guide.

License and Trademarks

Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0

Apache OpenDAL, OpenDAL, and Apache are either registered trademarks or trademarks of the Apache Software Foundation.

Commit count: 3022

cargo fmt