# Aqueducts
[![Build status](https://github.com/vigimite/aqueducts/actions/workflows/build.yml/badge.svg?branch=main)](https://github.com/vigimite/aqueducts/actions/workflows/CI.yml) [![Crates.io](https://img.shields.io/crates/v/aqueducts)](https://crates.io/crates/aqueducts) [![Documentation](https://docs.rs/aqueducts/badge.svg)](https://docs.rs/aqueducts)
Aqueducts is a framework to write and execute ETL data pipelines declaratively.
**Features:**
- Define ETL pipelines in YAML
- Extract data from csv files, JSONL, parquet files or delta tables
- Process data using SQL
- Load data into object stores as csv/parquet or delta tables
- Support for file and delta table partitioning
- Support for Upsert/Replace/Append operation on delta tables
- Support for Local, S3, GCS and Azure Blob storage
- *EXPERIMENTAL* Support for ODBC Sources and Destinations
This framework builds on the fantastic work done by projects such as:
- [arrow-rs](https://github.com/apache/arrow-rs)
- [datafusion](https://github.com/apache/datafusion)
- [delta-rs](https://github.com/delta-io/delta-rs)
Please show these projects some support :heart:!
## Documentation
You can find the docs at
Change log: [CHANGELOG](CHANGELOG.md)
## Quick start
To define and execute an Aqueduct pipeline there are a couple of options
- using a yaml configuration file
- using a json configuration file
- manually in code
You can check out some examples in the [examples](examples) directory.
Here is a simple example defining an Aqueduct pipeline using the yaml config format [link](examples/aqueduct_pipeline_simple.yml):
```yaml
sources:
# Register a local file source containing temperature readings for various cities
- type: File
name: temp_readings
file_type:
type: Csv
options: {}
# use built-in templating functionality
location: ./examples/temp_readings_${month}_${year}.csv
#Register a local file source containing a mapping between location_ids and location names
- type: File
name: locations
file_type:
type: Csv
options: {}
location: ./examples/location_dict.csv
stages:
# Query to aggregate temperature data by date and location
- - name: aggregated
query: >
SELECT
cast(timestamp as date) date,
location_id,
round(min(temperature_c),2) min_temp_c,
round(min(humidity),2) min_humidity,
round(max(temperature_c),2) max_temp_c,
round(max(humidity),2) max_humidity,
round(avg(temperature_c),2) avg_temp_c,
round(avg(humidity),2) avg_humidity
FROM temp_readings
GROUP by 1,2
ORDER by 1 asc
# print the query plan to stdout for debugging purposes
explain: true
# Enrich aggregation with the location name
- - name: enriched
query: >
SELECT
date,
location_name,
min_temp_c,
max_temp_c,
avg_temp_c,
min_humidity,
max_humidity,
avg_humidity
FROM aggregated
JOIN locations
ON aggregated.location_id = locations.location_id
ORDER BY date, location_name
# print 10 rows to stdout for debugging purposes
show: 10
# Write the pipeline result to a parquet file (can be omitted if you don't want an output)
destination:
type: File
name: results
file_type:
type: Parquet
options: {}
location: ./examples/output_${month}_${year}.parquet
```
This repository contains a minimal example implementation of the Aqueducts framework which can be used to test out pipeline definitions like the one above:
```bash
cargo install aqueducts-cli
aqueducts --file examples/aqueduct_pipeline_example.yml --param year=2024 --param month=jan
```
## Roadmap
- [x] Docs
- [x] ODBC source
- [x] ODBC destination
- [x] Parallel processing of stages
- [ ] Streaming Source (initially kafka + maybe aws kinesis)
- [ ] Streaming destination (initially kafka)