14 releases (5 breaking)

new 0.6.3 Oct 31, 2024
0.6.2 Sep 30, 2024
0.5.0 Aug 20, 2024
0.4.0 Jul 28, 2024
0.1.2 May 22, 2024

#10 in #etl

Download history 246/week @ 2024-07-04 23/week @ 2024-07-11 13/week @ 2024-07-18 239/week @ 2024-07-25 40/week @ 2024-08-01 3/week @ 2024-08-08 140/week @ 2024-08-15 36/week @ 2024-08-22 14/week @ 2024-08-29 6/week @ 2024-09-05 143/week @ 2024-09-12 66/week @ 2024-09-19 357/week @ 2024-09-26 52/week @ 2024-10-03 20/week @ 2024-10-10 5/week @ 2024-10-17

444 downloads per month
Used in 2 crates (via aqueducts)

Custom license

11KB
102 lines

Aqueducts

Build status Crates.io Documentation

Aqueducts is a framework to write and execute ETL data pipelines declaratively.

Features:

  • Define ETL pipelines in YAML
  • Extract data from csv files, JSONL, parquet files or delta tables
  • Process data using SQL
  • Load data into object stores as csv/parquet or delta tables
  • Support for file and delta table partitioning
  • Support for Upsert/Replace/Append operation on delta tables
  • Support for Local, S3, GCS and Azure Blob storage
  • EXPERIMENTAL Support for ODBC Sources and Destinations

This framework builds on the fantastic work done by projects such as:

Please show these projects some support ❤️!

Documentation

You can find the docs at https://vigimite.github.io/aqueducts

Change log: CHANGELOG

Quick start

To define and execute an Aqueduct pipeline there are a couple of options

  • using a yaml configuration file
  • using a json configuration file
  • manually in code

You can check out some examples in the examples directory. Here is a simple example defining an Aqueduct pipeline using the yaml config format link:

sources:
  # Register a local file source containing temperature readings for various cities
  - type: File
    name: temp_readings
    file_type:
      type: Csv
      options: {}
    # use built-in templating functionality
    location: ./examples/temp_readings_${month}_${year}.csv

  #Register a local file source containing a mapping between location_ids and location names
  - type: File
    name: locations
    file_type:
      type: Csv
      options: {}
    location: ./examples/location_dict.csv

stages:
  # Query to aggregate temperature data by date and location
  - - name: aggregated
      query: >
          SELECT
            cast(timestamp as date) date,
            location_id,
            round(min(temperature_c),2) min_temp_c,
            round(min(humidity),2) min_humidity,
            round(max(temperature_c),2) max_temp_c,
            round(max(humidity),2) max_humidity,
            round(avg(temperature_c),2) avg_temp_c,
            round(avg(humidity),2) avg_humidity
          FROM temp_readings
          GROUP by 1,2
          ORDER by 1 asc
      # print the query plan to stdout for debugging purposes
      explain: true

  # Enrich aggregation with the location name
  - - name: enriched
      query: >
        SELECT
          date,
          location_name,
          min_temp_c,
          max_temp_c,
          avg_temp_c,
          min_humidity,
          max_humidity,
          avg_humidity
        FROM aggregated
        JOIN locations 
          ON aggregated.location_id = locations.location_id
        ORDER BY date, location_name
      # print 10 rows to stdout for debugging purposes
      show: 10

# Write the pipeline result to a parquet file (can be omitted if you don't want an output)
destination:
  type: File
  name: results
  file_type:
    type: Parquet
    options: {}
  location: ./examples/output_${month}_${year}.parquet

This repository contains a minimal example implementation of the Aqueducts framework which can be used to test out pipeline definitions like the one above:

cargo install aqueducts-cli
aqueducts --file examples/aqueduct_pipeline_example.yml --param year=2024 --param month=jan

Roadmap

  • Docs
  • ODBC source
  • ODBC destination
  • Parallel processing of stages
  • Streaming Source (initially kafka + maybe aws kinesis)
  • Streaming destination (initially kafka)

Dependencies

~55–74MB
~1.5M SLoC