Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sample app to run DBT transformations via MWAA Airflow #12

Merged
merged 3 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions airflow-dbt-transformation/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
usage: ## Shows usage for this Makefile
@cat Makefile | grep -E '^[a-zA-Z_-]+:.*?## .*$$' | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-15s\033[0m %s\n", $$1, $$2}'

install: ## Install dependencies
which awslocal || pip install 'awscli-local[ver1]'

init: ## Initialize the Airflow environment in LocalStack MWAA
awslocal s3 mb s3://snowflake-airflow
awslocal mwaa create-environment --dag-s3-path /dags \
--execution-role-arn arn:aws:iam::000000000000:role/airflow-role \
--network-configuration {} \
--source-bucket-arn arn:aws:s3:::snowflake-airflow \
--airflow-configuration-options agent.code=007,agent.name=bond \
--name my-mwaa-env \
--endpoint-url http://localhost.localstack.cloud:4566

deploy: ## Deploy the DAG to the local Airflow instance
awslocal s3 cp requirements.txt s3://snowflake-airflow/
awslocal s3 cp packages.yml s3://snowflake-airflow/dags/
awslocal s3 cp dbt_project.yml s3://snowflake-airflow/dags/
awslocal s3 cp --recursive models s3://snowflake-airflow/dags/models
awslocal s3 cp --recursive seeds s3://snowflake-airflow/dags/seeds
awslocal s3 cp airflow_dag.py s3://snowflake-airflow/dags/

.PHONY: usage install deploy start
60 changes: 60 additions & 0 deletions airflow-dbt-transformation/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# LocalStack Demo: Data Engineering with Apache Airflow, Snowflake, Snowpark, dbt & Cosmos

This project illustrates how to use the LocalStack Snowflake+MWAA to run a data transformation pipeline entirely on your local machine.

The code is based on the Snowflake Guide for [Data Engineering with Apache Airflow, Snowflake, Snowpark, dbt & Cosmos](https://quickstarts.snowflake.com/guide/data_engineering_with_apache_airflow).

## Prerequisites

- [`localstack` CLI](https://docs.localstack.cloud/getting-started/installation/#localstack-cli) with [`LOCALSTACK_AUTH_TOKEN`](https://docs.localstack.cloud/getting-started/auth-token/) environment variable set
- [`awslocal` CLI](https://docs.localstack.cloud/user-guide/integrations/aws-cli/#localstack-aws-cli-awslocal)
- [LocalStack Snowflake emulator](https://snowflake.localstack.cloud/getting-started/installation/)

## Instructions

### Start LocalStack

Start the LocalStack Snowflake emulator using the following command:

```bash
DOCKER_FLAGS='-e SF_LOG=trace' \
IMAGE_NAME=localstack/snowflake \
DEBUG=1 \
localstack start
```

### Deploy the app

The sample application provides Makefile targets to simplify the setup process.

Run the following command to initialize the Airflow environment in LocalStack (this may take a couple of seconds):
```
make init
```

After deploying the Airflow environment, you should be able to request its details, and extract the webserver URL:
```
awslocal mwaa get-environment --name my-mwaa-env
...
"Status": "AVAILABLE",
"WebserverUrl": "http://localhost.localstack.cloud:4510"
...
```

Now use the following command to deploy the Airflow DAG with our dbt transformation logic locally:
```
make deploy
```

### Use the Airflow UI to trigger a DAG run

Once the Airflow environment has spun up, and the DAG has been successfully deployed, you should be able to access the Airflow UI under http://localhost.localstack.cloud:4510/home
(Note that the port number may be different - make sure to copy the `WebserverUrl` from the output further above.)

You can now trigger a DAG run from the UI. If all goes well, the DAG execution result should look something similar to this:

<image src="etc/airflow-screenshot.png" ></image>

## License

The code in this project is licensed under the Apache 2.0 License.
99 changes: 99 additions & 0 deletions airflow-dbt-transformation/airflow_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import os
from datetime import datetime
from pathlib import Path

from airflow import settings
from airflow.models import Connection
from airflow.operators.dummy_operator import DummyOperator
from airflow.decorators import dag, task
from cosmos import DbtTaskGroup, DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig
from cosmos.profiles.snowflake.user_pass import SnowflakeUserPasswordProfileMapping
from snowflake import connector

dbt_project_path = Path("/opt/airflow/dags")

# patch Cosmos Snowflake Airflow connector, which currently doesn't support custom host yet :/
# see https://github.com/astronomer/astronomer-cosmos/blob/9420404ad9b9ad0bb4a4ffb73b50a67e4e1d077c/cosmos/profiles/snowflake/user_pass.py#L35

SnowflakeUserPasswordProfileMapping.airflow_param_mapping["host"] = "extra.host"
SnowflakeUserPasswordProfileMapping.airflow_param_mapping["port"] = "extra.port"

snowflake_connection_params = {
"user": "test",
"password": "test",
"host": "snowflake.localhost.localstack.cloud",
"port": 4566,
"account": "test",
"database": "test",
"schema": "public",
}


def create_snowflake_connection():
conn = Connection(
conn_id="snowflake_local",
conn_type="snowflake",
login="test",
password="test",
description="LocalStack Snowflake",
extra=snowflake_connection_params
)
session = settings.Session()
conn_name = session.query(Connection).filter(Connection.conn_id == conn.conn_id).first()

if str(conn_name) == str(conn.conn_id):
return None

session.add(conn)
session.commit()
return conn


create_snowflake_connection()
credentials = SnowflakeUserPasswordProfileMapping(
conn_id="snowflake_local",
profile_args={"database": "test", "schema": "public"})

profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profile_mapping=credentials)

dbt_executable = f"{os.environ['AIRFLOW_HOME']}/dbt_venv/bin/dbt"


@dag(schedule_interval="@hourly",
start_date=datetime(2024, 6, 10),
catchup=False,
dag_id="dbt_snowpark",
)
def dbt_snowpark_dag():
transform_data = DbtTaskGroup(
group_id="transform_data",
project_config=ProjectConfig(dbt_project_path),
profile_config=profile_config,
execution_config=ExecutionConfig(dbt_executable_path=dbt_executable),
operator_args={"install_deps": True},
)

intermediate = DummyOperator(task_id='intermediate')

@task
def query_result_data():
connection = connector.connect(**snowflake_connection_params)
# select rows from `PREPPED_DATA` view created by DBT transformation
result = connection.cursor().execute("SELECT * FROM PREPPED_DATA")
result = list(result)
print("-----")
print(f"Query result ({len(result)} rows):")
for row in result:
print(row)
print("-----")
result = str(result)
return result

query_result = query_result_data()
transform_data >> intermediate >> query_result


dbt_snowpark_dag = dbt_snowpark_dag()
18 changes: 18 additions & 0 deletions airflow-dbt-transformation/dbt_project.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
name: test_dbt
models:
my_project:
# Applies to all files under models/example/
transform:
schema: transform
materialized: view
analysis:
schema: analysis
materialized: view

seeds:
bookings_1:
enabled: true
bookings_2:
enabled: true
customers:
enabled: true
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
20 changes: 20 additions & 0 deletions airflow-dbt-transformation/macros/custom_demo_macros.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{% macro generate_schema_name(custom_schema_name, node) -%}
{%- set default_schema = target.schema -%}
{%- if custom_schema_name is none -%}
{{ default_schema }}
{%- else -%}
{{ custom_schema_name | trim }}
{%- endif -%}
{%- endmacro %}


{% macro set_query_tag() -%}
{% set new_query_tag = model.name %} {# always use model name #}
{% if new_query_tag %}
{% set original_query_tag = get_current_query_tag() %}
{{ log("Setting query_tag to '" ~ new_query_tag ~ "'. Will reset to '" ~ original_query_tag ~ "' after materialization.") }}
{% do run_query("alter session set query_tag = '{}'".format(new_query_tag)) %}
{{ return(original_query_tag)}}
{% endif %}
{{ return(none)}}
{% endmacro %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
SELECT
BOOKING_DATE,
HOTEL,
COUNT(ID) as count_bookings
FROM {{ ref('prepped_data') }}
GROUP BY
BOOKING_DATE,
HOTEL
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
SELECT
BOOKING_DATE,
HOTEL,
COST,
AVG(COST) OVER (
ORDER BY BOOKING_DATE ROWS BETWEEN 29 PRECEDING AND CURRENT ROW
) as "30_DAY_AVG_COST",
COST - AVG(COST) OVER (
ORDER BY BOOKING_DATE ROWS BETWEEN 29 PRECEDING AND CURRENT ROW
) as "DIFF_BTW_ACTUAL_AVG"
FROM {{ ref('prepped_data') }}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{{ dbt_utils.union_relations(
relations=[ref('bookings_1'), ref('bookings_2')]
) }}
5 changes: 5 additions & 0 deletions airflow-dbt-transformation/models/transform/customer.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
SELECT ID
, FIRST_NAME
, LAST_NAME
, birthdate
FROM {{ ref('customers') }}
11 changes: 11 additions & 0 deletions airflow-dbt-transformation/models/transform/prepped_data.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
SELECT A.ID
, FIRST_NAME
, LAST_NAME
, birthdate
, BOOKING_REFERENCE
, HOTEL
, BOOKING_DATE
, COST
FROM {{ref('customer')}} A
JOIN {{ref('combined_bookings')}} B
on A.ID = B.ID
3 changes: 3 additions & 0 deletions airflow-dbt-transformation/packages.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
packages:
- package: dbt-labs/dbt_utils
version: [">=1.0.0", "<2.0.0"]
4 changes: 4 additions & 0 deletions airflow-dbt-transformation/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
apache-airflow-providers-snowflake
astronomer-cosmos
dbt-snowflake
snowflake-connector-python
8 changes: 8 additions & 0 deletions airflow-dbt-transformation/seeds/bookings_1.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
id,booking_reference,hotel,booking_date,cost
1,232323231,Pan Pacific,2021-03-19,100
1,232323232,Fullerton,2021-03-20,200
1,232323233,Fullerton,2021-04-20,300
1,232323234,Jackson Square,2021-03-21,400
1,232323235,Mayflower,2021-06-20,500
1,232323236,Suncity,2021-03-19,600
1,232323237,Fullerton,2021-08-20,700
8 changes: 8 additions & 0 deletions airflow-dbt-transformation/seeds/bookings_2.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
id,booking_reference,hotel,booking_date,cost
2,332323231,Fullerton,2021-03-19,100
2,332323232,Jackson Square,2021-03-20,300
2,332323233,Suncity,2021-03-20,300
2,332323234,Jackson Square,2021-03-21,300
2,332323235,Fullerton,2021-06-20,300
2,332323236,Suncity,2021-03-19,300
2,332323237,Berkly,2021-05-20,200
3 changes: 3 additions & 0 deletions airflow-dbt-transformation/seeds/customers.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
id,first_name,last_name,birthdate,membership_no
1,jim,jone,1989-03-19,12334
2,adrian,lee,1990-03-10,12323