From 0a99d4d4f744f806b00a25b8696c8a6e4be89cba Mon Sep 17 00:00:00 2001 From: Daniel van der Ende Date: Mon, 11 Sep 2023 20:42:32 +0200 Subject: [PATCH] Add mock_pipeline_test to GHA --- .github/workflows/ci.yml | 69 +++++++++++++++++++++++++--------- dags/dbt/profiles.yml | 11 ++++++ dags/spark/generate_data.py | 8 +++- mock_pipeline_requirements.txt | 3 ++ 4 files changed, 72 insertions(+), 19 deletions(-) create mode 100644 mock_pipeline_requirements.txt diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5306860..27f5894 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -3,8 +3,44 @@ on: push jobs: - lint: +# lint: +# runs-on: ubuntu-latest +# steps: +# - uses: actions/checkout@v3 +# +# - uses: actions/setup-python@v4 +# with: +# python-version: '3.10' +# +# - name: Install flake8 +# run: pip install flake8 +# +# - name: Run flake8 +# run: flake8 dags/ +# +# integrity_test: +# runs-on: ubuntu-latest +# steps: +# - uses: actions/checkout@v3 +# +# - uses: actions/setup-python@v4 +# with: +# python-version: '3.10' +# +# - name: Install integrity test requirements +# run: pip install -r integrity_tests/requirements.txt +# +# - name: Initialize Airflow DB +# run: airflow db init +# +# - name: Run integrity tests +# run: coverage run -m pytest integrity_tests/* + + mock_pipeline_test: runs-on: ubuntu-latest +# needs: +# - lint +# - integrity_test steps: - uses: actions/checkout@v3 @@ -12,26 +48,23 @@ jobs: with: python-version: '3.10' - - name: Install flake8 - run: pip install flake8 + - name: Install system dependencies + run: sudo apt-get install -y libsasl2-dev build-essential - - name: Run flake8 - run: flake8 dags/ + - name: Install mock_pipeline test requirements + run: pip install -r mock_pipeline_requirements.txt - integrity_test: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v3 + - name: Generate mock-pipeline data + run: spark-submit --name spark-data-generate dags/spark/generate_data.py --warehouse-path . - - uses: actions/setup-python@v4 - with: - python-version: '3.10' + - name: debug + run: ls -lah - - name: Install integrity test requirements - run: pip install -r integrity_tests/requirements.txt + - name: Run dbt + working-directory: dags/dbt + run : dbt run --target mock_pipeline - - name: Initialize Airflow DB - run: airflow db init + - name: Run dbt tests + working-directory: dags/dbt + run : dbt test --target mock_pipeline - - name: Run integrity tests - run: coverage run -m pytest integrity_tests/* diff --git a/dags/dbt/profiles.yml b/dags/dbt/profiles.yml index e68be8b..83ca7f6 100644 --- a/dags/dbt/profiles.yml +++ b/dags/dbt/profiles.yml @@ -11,3 +11,14 @@ transactions: "spark.sql.warehouse.dir": "/opt/airflow/spark-warehouse" "spark.sql.parquet.compression.codec": "gzip" "spark.hadoop.javax.jdo.option.ConnectionURL": "jdbc:derby:;databaseName=/opt/airflow/metastore_db;create=true" + + mock_pipeline: + type: spark + host: localhost + method: session + schema: bank + server_side_parameters: + "spark.databricks.delta.schema.autoMerge.enabled": "True" + "spark.sql.warehouse.dir": "." + "spark.sql.parquet.compression.codec": "gzip" + "spark.hadoop.javax.jdo.option.ConnectionURL": "jdbc:derby:;databaseName=/opt/airflow/metastore_db;create=true" diff --git a/dags/spark/generate_data.py b/dags/spark/generate_data.py index 4f32794..4a937e8 100644 --- a/dags/spark/generate_data.py +++ b/dags/spark/generate_data.py @@ -1,3 +1,5 @@ +import argparse + from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, DoubleType, StringType, DateType, BooleanType from random import uniform, sample, randint @@ -87,8 +89,12 @@ def run_job(spark): if __name__ == "__main__": + parser = argparse.ArgumentParser(description='Generate data') + parser.add_argument('--warehouse-path', default='/opt/airflow/spark-warehouse') + + args = parser.parse_args() spark = SparkSession.builder \ - .config('spark.sql.warehouse.dir', '/opt/airflow/spark-warehouse') \ + .config('spark.sql.warehouse.dir', args.warehouse_path) \ .config('spark.sql.parquet.compression.codec', 'gzip') \ .enableHiveSupport() \ .getOrCreate() diff --git a/mock_pipeline_requirements.txt b/mock_pipeline_requirements.txt new file mode 100644 index 0000000..7a69f1a --- /dev/null +++ b/mock_pipeline_requirements.txt @@ -0,0 +1,3 @@ +dbt-spark[PyHive]==1.7.0b1 +dbt-core==1.7.0b1 +pyspark==3.4.0