| Crates.io | aqueducts-delta |
| lib.rs | aqueducts-delta |
| version | 0.11.1 |
| created_at | 2025-05-25 20:00:54.095559+00 |
| updated_at | 2025-07-24 11:53:32.016066+00 |
| description | Framework to build ETL data pipelines declaratively |
| homepage | https://github.com/vigimite/aqueducts |
| repository | https://github.com/vigimite/aqueducts |
| max_upload_size | |
| id | 1688653 |
| size | 215,895 |
Aqueducts is a framework to write and execute ETL data pipelines declaratively.
Features:
${variable} syntaxJoin our Discord community to get help, share your work, and connect with other Aqueducts users:
Homebrew (macOS and Linux):
# Add the tap and install
brew tap vigimite/aqueducts
brew install aqueducts-cli
Shell Installer (Linux, macOS, Windows):
# One-line installer
curl --proto '=https' --tlsv1.2 -LsSf https://github.com/vigimite/aqueducts/releases/latest/download/aqueducts-installer.sh | sh
Direct Download: Download pre-built binaries from the latest release.
# Install with default features (s3, gcs, azure, yaml)
cargo install aqueducts-cli --locked
# Install with odbc support
cargo install aqueducts-cli --locked --features odbc
# Install with minimal features
cargo install aqueducts-cli --locked --no-default-features --features yaml
Install the executor to run pipelines remotely, closer to data sources:
# Install the executor (with default cloud storage support)
cargo install aqueducts-executor --locked
# For ODBC support (requires unixodbc-dev to be installed)
cargo install aqueducts-executor --locked --features odbc
Add Aqueducts to your Rust project:
[dependencies]
# Default setup with file processing and cloud storage
aqueducts = "0.10"
# Minimal setup for local file processing only
aqueducts = { version = "0.10", default-features = false, features = ["yaml"] }
# Full-featured setup with all storage providers and databases
aqueducts = { version = "0.10", features = ["json", "toml", "yaml", "s3", "gcs", "azure", "odbc", "delta"] }
Run a pipeline locally:
aqueducts run --file examples/aqueduct_pipeline_example.yml --param year=2024 --param month=jan
# Run the executor with an API key
aqueducts-executor --api-key your_secret_key --max-memory 4
aqueducts run --file examples/aqueduct_pipeline_example.yml \
--param year=2024 --param month=jan \
--executor http://executor-host:3031 \
--api-key your_secret_key
Here's a simple example pipeline in YAML format (full example):
# yaml-language-server: $schema=https://raw.githubusercontent.com/vigimite/aqueducts/main/json_schema/aqueducts.schema.json
version: "v2"
sources:
# Register a local file source containing temperature readings for various cities
- type: file
name: temp_readings
format:
type: csv
options: {}
# use built-in templating functionality
location: ./examples/temp_readings_${month}_${year}.csv
#Register a local file source containing a mapping between location_ids and location names
- type: file
name: locations
format:
type: csv
options: {}
location: ./examples/location_dict.csv
stages:
# Query to aggregate temperature data by date and location
- - name: aggregated
query: >
SELECT
cast(timestamp as date) date,
location_id,
round(min(temperature_c),2) min_temp_c,
round(min(humidity),2) min_humidity,
round(max(temperature_c),2) max_temp_c,
round(max(humidity),2) max_humidity,
round(avg(temperature_c),2) avg_temp_c,
round(avg(humidity),2) avg_humidity
FROM temp_readings
GROUP by 1,2
ORDER by 1 asc
# print the query plan to stdout for debugging purposes
explain: true
# Enrich aggregation with the location name
- - name: enriched
query: >
SELECT
date,
location_name,
min_temp_c,
max_temp_c,
avg_temp_c,
min_humidity,
max_humidity,
avg_humidity
FROM aggregated
JOIN locations
ON aggregated.location_id = locations.location_id
ORDER BY date, location_name
# print 10 rows to stdout for debugging purposes
show: 10
# Write the pipeline result to a parquet file (can be omitted if you don't want an output)
destination:
type: file
name: results
format:
type: parquet
options: {}
location: ./examples/output_${month}_${year}.parquet
Editor Support: Add the yaml-language-server comment (shown above) to enable autocompletion, validation, and inline documentation in VS Code, Neovim, and other editors with YAML Language Server support.
Use Aqueducts as a library in your Rust applications:
use aqueducts::prelude::*;
use datafusion::prelude::SessionContext;
use std::{collections::HashMap, sync::Arc};
#[tokio::main]
async fn main() -> Result<()> {
// Load pipeline configuration with parameters
let mut params = HashMap::new();
params.insert("year".to_string(), "2024".to_string());
params.insert("month".to_string(), "jan".to_string());
let pipeline = Aqueduct::from_file("pipeline.yml", params)?;
// Create DataFusion execution context
let ctx = Arc::new(SessionContext::new());
// Execute pipeline with progress tracking
let tracker = Arc::new(LoggingProgressTracker);
let _result = run_pipeline(ctx, pipeline, Some(tracker)).await?;
Ok(())
}
Aqueducts consists of several components:
aqueducts): Unified interface providing all functionality through feature flagsaqueducts-core): The main engine for defining and executing data pipelinesaqueducts-schemas): Configuration types and validationaqueducts-delta) and ODBC (aqueducts-odbc) integrationsFor component-specific details, see the respective README files:
This project would not be possible without the incredible work done by the open source community, particularly the maintainers and contributors of:
Please show these projects some love and support! ❤️
Contributions to Aqueducts are welcome! Whether it's bug reports, feature requests, or code contributions, we appreciate all forms of help.
Please see the Contributing Guide for detailed instructions on how to: