Skip to content

Commit

Permalink
finalize readme
Browse files Browse the repository at this point in the history
  • Loading branch information
danielvdende committed Sep 11, 2023
1 parent 52fd668 commit 7e13e6e
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 27 deletions.
30 changes: 30 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,33 @@ jobs:

- name: Run integrity tests
run: coverage run -m pytest integrity_tests/*

# mock_pipeline_test:
# runs-on: ubuntu-latest
# needs:
# - lint
# - integrity_test
# steps:
# - uses: actions/checkout@v3
#
# - uses: actions/setup-python@v4
# with:
# python-version: '3.10'
#
# - name: Install system dependencies
# run: sudo apt-get install -y libsasl2-dev build-essential
#
# - name: Install mock_pipeline test requirements
# run: pip install -r mock_pipeline_requirements.txt
#
# - name: Generate mock-pipeline data
# run: spark-submit --name spark-data-generate dags/spark/generate_data.py --warehouse-path ./spark-warehouse
#
# - name: Run dbt
# working-directory: dags/dbt
# run : dbt run --target mock_pipeline
#
# - name: Run dbt tests
# working-directory: dags/dbt
# run : dbt test --target mock_pipeline

47 changes: 21 additions & 26 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,36 +1,31 @@
# Data Testing With Airflow
[![Build Status](https://travis-ci.org/danielvdende/data-testing-with-airflow.svg?branch=master)](https://travis-ci.org/danielvdende/data-testing-with-airflow)

This repository contains simple examples of how to implement some of the Nine Circles of Data Tests described in our
[blogpost](https://medium.com/@ingwbaa/datas-inferno-7-circles-of-data-testing-hell-with-airflow-cef4adff58d8). A docker container is provided to run the DTAP and Data Tests examples. The Mock Pipeline tests and
DAG integrity tests are implemented in Travis CI tests.
This repository contains simple examples of how to implement the 7 layers of Data Testing Hell as described initially in 2017 in [this blogpost]() and revisited in 2023 during a PyData talk. For reference, the original version of the examples has been tagged with `1.0.0`. The latest code on the main branch here is that of the 2023 version.

There are a few notable differences between the versions:
1. Dbt is used in the 2023 version to implement the data tests. This is a more modern approach than the original version, which used Spark directly.
2. Travis CI has been switched out in favour of Github Actions.
3. Some small fixes have been applied to update the Integrity Tests.
4. The compression used in the examples has been set to `gzip`. This is to workaround some issues encountered when running snappy encoding on M1 CPUs.

## Running locally
To run this locally, you can use the provided Dockerfile. First, build the Docker image (assumed path is the root of this project):
```shell
docker build -t datas_inferno .
```
and then run it with:
```shell
docker run -p 5000:5000 datas_inferno
```

## DAG Integrity Tests
The DAG Integrity tests are integrated in our CI pipeline, and check if the DAG definition in your airflowfile is a valid DAG.
The DAG Integrity tests are integrated in our CI pipeline, and check if your DAG definition is a valid DAG.
This includes not only checking for typos, but also verifying there are no cycles in your DAGs, and that the operators are
used correctly.

## Mock Pipeline Tests
Mock Pipeline Tests are implemented as a CI pipeline stage, and function as unit tests for your individual DAG tasks. Dummy
data is generated and used to verify that for each expected input, an expected output follows from your code.
Mock Pipeline Tests should be as a CI pipeline stage, and function as unit tests for your individual DAG tasks. Dummy
data is generated and used to verify that for each expected input, an expected output follows from your code. Because we use dbt, this is a simple additional target in our dbt project, which ensures that the same logic is applied to a specific set of data. There were some issues getting dbt to talk to the right spark-warehouse and hive metastore in github actions. Workarounds are being looked at to resolve this, this is unrelated to the concept, but rather related to the specific environment setup.

## Data Tests
In the `dags` directory, you will find a simple DAG with 3 tasks. Each of these tasks has a companion test that is integrated
into the DAG. These tests are run on every DAG run and are meant to verify that your code makes sense when running on
real data.

## DTAP
In order to show our DTAP logic, we have included a Dockerfile, which builds a Docker image with Airflow and Spark installed.
We then clone this repo 4 times, to represent each environment. To build the docker image:

`docker build -t airflow_testing .`

Once built, you can run it with:

`docker run -p 8080:8080 airflow_testing`

This image contains all necessary logic to initialize the DAGs and connections. One part that is simulated is the promotion
of branches (i.e. environments). The 'promotion' of code from one branch (environment) to another requires write access to
the git repo, something which we don't want to provide publicly :-). To see the environments and triggering in action, kick off
the 'dev' DAG via the UI (or CLI) to see flow. Please note, the prod DAG will not run after the acc one by default, as we
prefer to use so called green-light deployments, to verify the logic and prevent unwanted production DAGruns.
Data tests are implemented in dbt in a similar way to the integrity tests.
11 changes: 11 additions & 0 deletions dags/dbt/profiles.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,14 @@ transactions:
"spark.sql.warehouse.dir": "/opt/airflow/spark-warehouse"
"spark.sql.parquet.compression.codec": "gzip"
"spark.hadoop.javax.jdo.option.ConnectionURL": "jdbc:derby:;databaseName=/opt/airflow/metastore_db;create=true"

mock_pipeline:
type: spark
host: localhost
method: session
schema: bank
server_side_parameters:
"spark.databricks.delta.schema.autoMerge.enabled": "True"
"spark.sql.warehouse.dir": "/home/runner/work/data-testing-with-airflow/data-testing-with-airflow/spark-warehouse" # For on github runner
"spark.sql.parquet.compression.codec": "gzip"
"spark.hadoop.javax.jdo.option.ConnectionURL": "jdbc:derby:;databaseName=/home/runner/work/data-testing-with-airflow/data-testing-with-airflow/metastore_db;create=true"
8 changes: 7 additions & 1 deletion dags/spark/generate_data.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import argparse

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, DoubleType, StringType, DateType, BooleanType
from random import uniform, sample, randint
Expand Down Expand Up @@ -87,8 +89,12 @@ def run_job(spark):


if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Generate data')
parser.add_argument('--warehouse-path', default='/opt/airflow/spark-warehouse')

args = parser.parse_args()
spark = SparkSession.builder \
.config('spark.sql.warehouse.dir', '/opt/airflow/spark-warehouse') \
.config('spark.sql.warehouse.dir', args.warehouse_path) \
.config('spark.sql.parquet.compression.codec', 'gzip') \
.enableHiveSupport() \
.getOrCreate()
Expand Down
3 changes: 3 additions & 0 deletions mock_pipeline_requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
dbt-spark[PyHive]==1.7.0b1
dbt-core==1.7.0b1
pyspark==3.4.0

0 comments on commit 7e13e6e

Please sign in to comment.