Skip to content

Latest commit

 

History

History
164 lines (138 loc) · 7.09 KB

README.md

File metadata and controls

164 lines (138 loc) · 7.09 KB

Luigi ETL pipeline

Disclaimer:

⚠️Using some or all of the elements of this code, You assume responsibility for any consequences!

⚠️The licenses for the technologies on which the code depends are subject to change by their authors.

Description of the pipeline:

The pipeline collects data from sources, in the form of (csv tables / json dictionaries) data, so that in the end:

  • Collects data from external sources to Luigi targets.
  • Data cleansing.
  • Land data to DWH.

Required:

The application code is written in python and obviously depends on it.
Python version 3.6 [Python Software Foundation License / (with) Zero-Clause BSD license (after 3.8.6 version Python)]:

Required Packages:

Luigi [Apache License 2.0]:

Used to Luigi tasks conveyor.

Pandas [BSD-3-Clause license]:

Used to work with tabular data.

NumPy [BSD-3-Clause license]:

Used to bring the table cells to the desired value.

PyArrow [Apache-2.0 license]:

Used to save data in parquet format.

Installing the Required Packages:

pip install luigi
pip install pandas
pip install numpy
pip install pyarrow

Description of tasks:

ExternalData:

Wrappers for data from external sources.

  • Reads datasets in the directory received from the parameter 'external_data_path'.
    ⚠️All paths to partitions inside the root directory of the passed ExternalData must be in the format 'Dataset_Name/YYYY/MM/DD/'.
  • For all partitions where a '_Validate' flag file was found, creates a new '_Validate_Success' flag as Luigi.LocalTarget.

ExtractTask:

  • Reads data from ExternalData by dates.
  • Merges them into one array.
  • If 'drop_list' parameter is not 'None' ('None' as default) Task will drop all columns names in this Luigi.ListParameter.
    Example of 'drop_list' Luigi.ListParameter:
["drop_name", "Delete"]
  • 'extract_file_mask' Luigi.Parameter as output file format and 'external_data_file_mask' as input.

TransformTask:

  • Remove all lines matching the transform_parsing_rules_drop parameter.
    Example of 'transform_parsing_rules_drop' Luigi.DictParameter:
{"column_to_drop": ["False", "NaN", 0]}
  • Rows will be discarded if at least one value matches in ALL keys of transform_parsing_rules_filter.
    Example of 'transform_parsing_rules_filter' Luigi.DictParameter:
{"column_to_filter": ["drop_if_not_in_vip", "drop_too"], "filter_too": ["0"]}
  • And provided that the string does not contain values from the transform_parsing_rules_vip keys.
    Example of 'transform_parsing_rules_vip' Luigi.DictParameter:
{"data_to_save_like_vip": ["vip_value_1", "vip_value_2"], "save_too": ["vip_value_3"]}
  • Has 'date_parameter' Luigi.DateParameter (today as default).
  • 'transform_file_mask' Luigi.Parameter as output file format and 'extract_file_mask' as input.

LoadTask:

  • Landing result data to directory received from the Luigi.Parameter 'load_data_path'.
  • Has 'date_parameter' Luigi.DateParameter (today as default).
  • 'load_file_mask' Luigi.Parameter as output file format and 'transform_file_mask' as input.

Launch:

Launch with 'luigi_config' and Luigi.build:

If you want to use a simple launch by passing Luigi parameters through a configuration file:

  1. Fill the 'luigi_config.cfg' file with correct data.
  2. Then run the script 'luigi_pipeline.py'. Files location:
    ./📂Luigi_ETL
       └── 📁Pipeline
                ├── 📄luigi_pipeline.py
                └── 📁My_Beautiful_Tasks.py
                         └── 📁Configuration
                                  └── 📄luigi_config.cfg

Please note that rows with optional parameters can be removed from the 'luigi_config' if you do not need them.

Example of run script:

python3 -B -m .luigi_pipeline.py

Launch with terminal or command line:

First you need to replace the variable 'build' to variable 'run' in 'Pipeline_launcher.py' script, with removing all the parameters passed to it.
Then you need to clear all parameters in Luigi's task instances that are called in 'luigi_pipeline.py' script.

After that, you can start Luigi by passing parameters through the terminal, or using a 'start_luigi_etl_pipeline.sh' script.

Files location:
./📂Luigi_ETL
   └── 📁Pipeline
            ├── 📄luigi_pipeline.py
            ├── 📄start_luigi_etl_pipeline.sh
            └── 📁My_Beautiful_Tasks
                     └── 📄Pipeline_launcher.py

If Your OS has a bash shell the ETL pipeline can be started using the bash script:

./start_luigi_etl_pipeline.sh

The script contains an example of all the necessary arguments to run.
To launch the pipeline through this script, do not forget to make it executable.

chmod +x ./start_luigi_etl_pipeline.sh

The script can also be run directly with python.
Example of run script:

python3 -B -m luigi_pipeline Load.LoadTask --local-scheduler \
--ExternalData.ExternalData-external-data-path "~/luigi_tasks/ExternalData" \
\
--Extract.ExtractTask-extract-data-path "~/luigi_tasks/ExtractTask" \
--Extract.ExtractTask-extract-file-mask "csv" \
--Extract.ExtractTask-external-data-file-mask "csv" \
--Extract.ExtractTask-drop-list "['column_drop_name', 'column_to_delete']" \
\
--Transform.TransformTask-file-to-transform-path "~/luigi_tasks/TransformTask" \
--Transform.TransformTask-transform-file-mask "json" \
--Transform.TransformTask-transform-parsing-rules-drop "{'column_to_drop': [False, 'NaN', 0]}" \
--Transform.TransformTask-transform-parsing-rules-filter "{'column_to_filter': ['drop_if_not_in_vip', 'drop_too'], 'filter_too': ['0']}" \
--Transform.TransformTask-transform-parsing-rules-vip "{'data_to_save_like_vip': ['vip_value_1, vip_value_2'], 'save_too': ['vip_value_3']}" \
--Transform.TransformTask-date-path-part $(date +%F --date "2022-12-01") \
\
--Load.LoadTask-load-data-path "~/luigi_tasks/LoadTask" \
--Load.LoadTask-load-file-mask "parquet"

The example above shows the launch of all tasks.

Tests:

Tests are embedded inside the pipeline.


Thank you for your interest in my work.