From 7e13e6ebd80a2c93e400b72714d374435dd18d4c Mon Sep 17 00:00:00 2001 From: Daniel van der Ende Date: Mon, 11 Sep 2023 20:42:32 +0200 Subject: [PATCH] finalize readme --- .github/workflows/ci.yml | 30 ++++++++++++++++++++++ README.md | 47 +++++++++++++++------------------- dags/dbt/profiles.yml | 11 ++++++++ dags/spark/generate_data.py | 8 +++++- mock_pipeline_requirements.txt | 3 +++ 5 files changed, 72 insertions(+), 27 deletions(-) create mode 100644 mock_pipeline_requirements.txt diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5306860..0315a8e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -35,3 +35,33 @@ jobs: - name: Run integrity tests run: coverage run -m pytest integrity_tests/* + +# mock_pipeline_test: +# runs-on: ubuntu-latest +# needs: +# - lint +# - integrity_test +# steps: +# - uses: actions/checkout@v3 +# +# - uses: actions/setup-python@v4 +# with: +# python-version: '3.10' +# +# - name: Install system dependencies +# run: sudo apt-get install -y libsasl2-dev build-essential +# +# - name: Install mock_pipeline test requirements +# run: pip install -r mock_pipeline_requirements.txt +# +# - name: Generate mock-pipeline data +# run: spark-submit --name spark-data-generate dags/spark/generate_data.py --warehouse-path ./spark-warehouse +# +# - name: Run dbt +# working-directory: dags/dbt +# run : dbt run --target mock_pipeline +# +# - name: Run dbt tests +# working-directory: dags/dbt +# run : dbt test --target mock_pipeline + diff --git a/README.md b/README.md index 0827714..0f44563 100644 --- a/README.md +++ b/README.md @@ -1,36 +1,31 @@ # Data Testing With Airflow -[![Build Status](https://travis-ci.org/danielvdende/data-testing-with-airflow.svg?branch=master)](https://travis-ci.org/danielvdende/data-testing-with-airflow) -This repository contains simple examples of how to implement some of the Nine Circles of Data Tests described in our -[blogpost](https://medium.com/@ingwbaa/datas-inferno-7-circles-of-data-testing-hell-with-airflow-cef4adff58d8). A docker container is provided to run the DTAP and Data Tests examples. The Mock Pipeline tests and -DAG integrity tests are implemented in Travis CI tests. +This repository contains simple examples of how to implement the 7 layers of Data Testing Hell as described initially in 2017 in [this blogpost]() and revisited in 2023 during a PyData talk. For reference, the original version of the examples has been tagged with `1.0.0`. The latest code on the main branch here is that of the 2023 version. + +There are a few notable differences between the versions: +1. Dbt is used in the 2023 version to implement the data tests. This is a more modern approach than the original version, which used Spark directly. +2. Travis CI has been switched out in favour of Github Actions. +3. Some small fixes have been applied to update the Integrity Tests. +4. The compression used in the examples has been set to `gzip`. This is to workaround some issues encountered when running snappy encoding on M1 CPUs. + +## Running locally +To run this locally, you can use the provided Dockerfile. First, build the Docker image (assumed path is the root of this project): +```shell +docker build -t datas_inferno . +``` +and then run it with: +```shell +docker run -p 5000:5000 datas_inferno +``` ## DAG Integrity Tests -The DAG Integrity tests are integrated in our CI pipeline, and check if the DAG definition in your airflowfile is a valid DAG. +The DAG Integrity tests are integrated in our CI pipeline, and check if your DAG definition is a valid DAG. This includes not only checking for typos, but also verifying there are no cycles in your DAGs, and that the operators are used correctly. ## Mock Pipeline Tests -Mock Pipeline Tests are implemented as a CI pipeline stage, and function as unit tests for your individual DAG tasks. Dummy -data is generated and used to verify that for each expected input, an expected output follows from your code. +Mock Pipeline Tests should be as a CI pipeline stage, and function as unit tests for your individual DAG tasks. Dummy +data is generated and used to verify that for each expected input, an expected output follows from your code. Because we use dbt, this is a simple additional target in our dbt project, which ensures that the same logic is applied to a specific set of data. There were some issues getting dbt to talk to the right spark-warehouse and hive metastore in github actions. Workarounds are being looked at to resolve this, this is unrelated to the concept, but rather related to the specific environment setup. ## Data Tests -In the `dags` directory, you will find a simple DAG with 3 tasks. Each of these tasks has a companion test that is integrated -into the DAG. These tests are run on every DAG run and are meant to verify that your code makes sense when running on -real data. - -## DTAP -In order to show our DTAP logic, we have included a Dockerfile, which builds a Docker image with Airflow and Spark installed. -We then clone this repo 4 times, to represent each environment. To build the docker image: - -`docker build -t airflow_testing .` - -Once built, you can run it with: - -`docker run -p 8080:8080 airflow_testing` - -This image contains all necessary logic to initialize the DAGs and connections. One part that is simulated is the promotion -of branches (i.e. environments). The 'promotion' of code from one branch (environment) to another requires write access to -the git repo, something which we don't want to provide publicly :-). To see the environments and triggering in action, kick off -the 'dev' DAG via the UI (or CLI) to see flow. Please note, the prod DAG will not run after the acc one by default, as we -prefer to use so called green-light deployments, to verify the logic and prevent unwanted production DAGruns. +Data tests are implemented in dbt in a similar way to the integrity tests. diff --git a/dags/dbt/profiles.yml b/dags/dbt/profiles.yml index e68be8b..67b2b02 100644 --- a/dags/dbt/profiles.yml +++ b/dags/dbt/profiles.yml @@ -11,3 +11,14 @@ transactions: "spark.sql.warehouse.dir": "/opt/airflow/spark-warehouse" "spark.sql.parquet.compression.codec": "gzip" "spark.hadoop.javax.jdo.option.ConnectionURL": "jdbc:derby:;databaseName=/opt/airflow/metastore_db;create=true" + + mock_pipeline: + type: spark + host: localhost + method: session + schema: bank + server_side_parameters: + "spark.databricks.delta.schema.autoMerge.enabled": "True" + "spark.sql.warehouse.dir": "/home/runner/work/data-testing-with-airflow/data-testing-with-airflow/spark-warehouse" # For on github runner + "spark.sql.parquet.compression.codec": "gzip" + "spark.hadoop.javax.jdo.option.ConnectionURL": "jdbc:derby:;databaseName=/home/runner/work/data-testing-with-airflow/data-testing-with-airflow/metastore_db;create=true" diff --git a/dags/spark/generate_data.py b/dags/spark/generate_data.py index 4f32794..4a937e8 100644 --- a/dags/spark/generate_data.py +++ b/dags/spark/generate_data.py @@ -1,3 +1,5 @@ +import argparse + from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, DoubleType, StringType, DateType, BooleanType from random import uniform, sample, randint @@ -87,8 +89,12 @@ def run_job(spark): if __name__ == "__main__": + parser = argparse.ArgumentParser(description='Generate data') + parser.add_argument('--warehouse-path', default='/opt/airflow/spark-warehouse') + + args = parser.parse_args() spark = SparkSession.builder \ - .config('spark.sql.warehouse.dir', '/opt/airflow/spark-warehouse') \ + .config('spark.sql.warehouse.dir', args.warehouse_path) \ .config('spark.sql.parquet.compression.codec', 'gzip') \ .enableHiveSupport() \ .getOrCreate() diff --git a/mock_pipeline_requirements.txt b/mock_pipeline_requirements.txt new file mode 100644 index 0000000..7a69f1a --- /dev/null +++ b/mock_pipeline_requirements.txt @@ -0,0 +1,3 @@ +dbt-spark[PyHive]==1.7.0b1 +dbt-core==1.7.0b1 +pyspark==3.4.0