| Crates.io | rigatoni-destinations |
| lib.rs | rigatoni-destinations |
| version | 0.2.0 |
| created_at | 2025-11-22 13:22:27.826554+00 |
| updated_at | 2025-12-12 12:49:51.55066+00 |
| description | Destination implementations for Rigatoni CDC/Data Replication: S3 with multiple formats and compression |
| homepage | https://github.com/valeriouberti/rigatoni |
| repository | https://github.com/valeriouberti/rigatoni |
| max_upload_size | |
| id | 1945326 |
| size | 207,317 |
Destination implementations for Rigatoni CDC/Data Replication framework - write data to S3 and other targets.
Production-ready destination implementations for streaming data from MongoDB to various targets.
[dependencies]
rigatoni-destinations = { version = "0.2.0", features = ["s3", "json"] }
Destinations:
s3 - AWS S3 (enabled by default)Formats:
json - JSON/JSONL (enabled by default)csv - CSV formatparquet - Apache Parquetavro - Apache AvroCompression:
gzip - Gzip compressionzstandard - Zstandard compressionConvenience:
all-formats - All serialization formatsall - All features (S3 + all formats + compression)use rigatoni_destinations::s3::{S3Config, S3Destination};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = S3Config::builder()
.bucket("my-data-lake")
.region("us-east-1")
.prefix("mongodb-cdc")
.build()?;
let destination = S3Destination::new(config).await?;
// Use with Rigatoni pipeline
// pipeline.set_destination(destination).await?;
Ok(())
}
use rigatoni_destinations::s3::SerializationFormat;
// JSON (default)
.format(SerializationFormat::Json)
// Parquet for analytics
.format(SerializationFormat::Parquet)
// CSV for exports
.format(SerializationFormat::Csv)
// Avro for streaming
.format(SerializationFormat::Avro)
use rigatoni_destinations::s3::Compression;
// Gzip (widely compatible)
.compression(Compression::Gzip)
// Zstandard (better ratio and speed)
.compression(Compression::Zstd)
use rigatoni_destinations::s3::KeyGenerationStrategy;
// Hive partitioning for analytics
.key_strategy(KeyGenerationStrategy::HivePartitioned)
// Creates: collection=users/year=2025/month=01/day=16/hour=10/timestamp.ext
// Date-hour partitioning (default)
.key_strategy(KeyGenerationStrategy::DateHourPartitioned)
// Creates: users/2025/01/16/10/timestamp.ext
// Date partitioning
.key_strategy(KeyGenerationStrategy::DatePartitioned)
// Creates: users/2025/01/16/timestamp.ext
See the rigatoni-examples directory:
s3_basic - Basic S3 usages3_advanced - Advanced features (formats, compression, partitioning)s3_with_compression - Compression examplescargo run --example s3_basic --features s3,json
cargo run --example s3_advanced --all-features
# Start LocalStack
docker-compose up -d
# Run integration tests
cargo test --test s3_integration_test --features s3,json,gzip -- --ignored
Licensed under the Apache License, Version 2.0 (LICENSE or http://www.apache.org/licenses/LICENSE-2.0).