rigatoni-destinations

Crates.iorigatoni-destinations
lib.rsrigatoni-destinations
version0.2.0
created_at2025-11-22 13:22:27.826554+00
updated_at2025-12-12 12:49:51.55066+00
descriptionDestination implementations for Rigatoni CDC/Data Replication: S3 with multiple formats and compression
homepagehttps://github.com/valeriouberti/rigatoni
repositoryhttps://github.com/valeriouberti/rigatoni
max_upload_size
id1945326
size207,317
Valerio Uberti (valeriouberti)

documentation

https://docs.rs/rigatoni-core

README

rigatoni-destinations

Destination implementations for Rigatoni CDC/Data Replication framework - write data to S3 and other targets.

Crates.io Documentation License: Apache-2.0

Overview

Production-ready destination implementations for streaming data from MongoDB to various targets.

Supported Destinations

AWS S3

  • Multiple Formats: JSON, CSV, Parquet, Avro
  • Compression: Gzip, Zstandard
  • Partitioning: Hive-style, date-based, collection-based
  • Features: Retry logic, S3-compatible storage (LocalStack, MinIO)

Installation

[dependencies]
rigatoni-destinations = { version = "0.2.0", features = ["s3", "json"] }

Available Features

Destinations:

  • s3 - AWS S3 (enabled by default)

Formats:

  • json - JSON/JSONL (enabled by default)
  • csv - CSV format
  • parquet - Apache Parquet
  • avro - Apache Avro

Compression:

  • gzip - Gzip compression
  • zstandard - Zstandard compression

Convenience:

  • all-formats - All serialization formats
  • all - All features (S3 + all formats + compression)

Quick Start - S3 Destination

use rigatoni_destinations::s3::{S3Config, S3Destination};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = S3Config::builder()
        .bucket("my-data-lake")
        .region("us-east-1")
        .prefix("mongodb-cdc")
        .build()?;

    let destination = S3Destination::new(config).await?;

    // Use with Rigatoni pipeline
    // pipeline.set_destination(destination).await?;

    Ok(())
}

S3 Features

Serialization Formats

use rigatoni_destinations::s3::SerializationFormat;

// JSON (default)
.format(SerializationFormat::Json)

// Parquet for analytics
.format(SerializationFormat::Parquet)

// CSV for exports
.format(SerializationFormat::Csv)

// Avro for streaming
.format(SerializationFormat::Avro)

Compression

use rigatoni_destinations::s3::Compression;

// Gzip (widely compatible)
.compression(Compression::Gzip)

// Zstandard (better ratio and speed)
.compression(Compression::Zstd)

Partitioning Strategies

use rigatoni_destinations::s3::KeyGenerationStrategy;

// Hive partitioning for analytics
.key_strategy(KeyGenerationStrategy::HivePartitioned)
// Creates: collection=users/year=2025/month=01/day=16/hour=10/timestamp.ext

// Date-hour partitioning (default)
.key_strategy(KeyGenerationStrategy::DateHourPartitioned)
// Creates: users/2025/01/16/10/timestamp.ext

// Date partitioning
.key_strategy(KeyGenerationStrategy::DatePartitioned)
// Creates: users/2025/01/16/timestamp.ext

Examples

See the rigatoni-examples directory:

  • s3_basic - Basic S3 usage
  • s3_advanced - Advanced features (formats, compression, partitioning)
  • s3_with_compression - Compression examples
cargo run --example s3_basic --features s3,json
cargo run --example s3_advanced --all-features

Testing with LocalStack

# Start LocalStack
docker-compose up -d

# Run integration tests
cargo test --test s3_integration_test --features s3,json,gzip -- --ignored

Documentation

License

Licensed under the Apache License, Version 2.0 (LICENSE or http://www.apache.org/licenses/LICENSE-2.0).

Commit count: 0

cargo fmt