-
Notifications
You must be signed in to change notification settings - Fork 4
/
jaffle_shop.py
48 lines (38 loc) · 1.34 KB
/
jaffle_shop.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import os
from pathlib import Path
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from pendulum import datetime
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, ExecutionConfig
from cosmos.profiles import PostgresUserPasswordProfileMapping
DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))
profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profile_mapping=PostgresUserPasswordProfileMapping(
conn_id="airflow_db",
profile_args={"schema": "public"},
)
)
with DAG(
dag_id="jaffle_shop",
start_date=datetime(2022, 11, 27),
schedule=None,
catchup=False,
) as dag:
pre_dbt_workflow = EmptyOperator(task_id="pre_dbt_workflow")
jaffle_shop = DbtTaskGroup(
group_id="jaffle_shop",
project_config=ProjectConfig(
(DBT_ROOT_PATH / "jaffle_shop").as_posix(),
),
execution_config=ExecutionConfig(
dbt_executable_path="/usr/local/airflow/dbt_venv/bin/dbt"),
operator_args={"install_deps": True},
profile_config=profile_config,
default_args={"retries": 2},
dag=dag,
)
post_dbt_workflow = EmptyOperator(task_id="post_dbt_workflow")
pre_dbt_workflow >> jaffle_shop >> post_dbt_workflow