Skip to content

Commit

Permalink
Merge pull request #313 from coderxio/jrlegrand/transform-update
Browse files Browse the repository at this point in the history
Change transform to list
  • Loading branch information
jrlegrand authored Sep 6, 2024
2 parents d003c90 + fb2463d commit cdcc787
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 14 deletions.
6 changes: 4 additions & 2 deletions airflow/dags/common_dag_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,12 @@ def extract(dag_id,url) -> str:


@task
def transform(dag_id, models_subdir='staging',task_id="") -> None:
def transform(dag_id, models_subdir=['staging'], task_id="") -> None:
# Task to transform data using dbt
models = [f'models/{model_subdir}/{dag_id}' for model_subdir in models_subdir]
command = ['docker', 'exec', 'dbt', 'dbt', 'run', '--select'] + models

run_subprocess_command(
command=['docker', 'exec', 'dbt','dbt', 'run', '--select', f'models/{models_subdir}/{dag_id}'],
command=command,
cwd='/dbt/sagerx'
)
9 changes: 3 additions & 6 deletions airflow/dags/nadac/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from airflow.providers.postgres.operators.postgres import PostgresOperator

from common_dag_tasks import run_subprocess_command
from common_dag_tasks import transform

starting_date = pendulum.parse("2013-12-01")

Expand Down Expand Up @@ -77,11 +77,8 @@ def get_download_url(self, year):
)
)

# Task to transform data using dbt
@task
def transform():
run_subprocess_command(['dbt', 'run', '--select', 'models/staging/nadac'], cwd='/dbt/sagerx')
transform_task = transform(dag_id)

extract() >> load >> transform()
extract() >> load >> transform_task

nadac()
10 changes: 4 additions & 6 deletions airflow/dags/rxnorm/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.hooks.postgres_hook import PostgresHook

from common_dag_tasks import run_subprocess_command
from common_dag_tasks import transform


@dag(
Expand Down Expand Up @@ -73,11 +73,9 @@ def extract(st: str):
)
)

# Task to transform data using dbt
@task
def transform():
run_subprocess_command(['docker', 'exec', 'dbt', 'dbt', 'run', '--select', 'models/staging/rxnorm', 'models/intermediate/rxnorm'], cwd='/dbt/sagerx')
<<<<<< jrlegrand/transform-update
transform_task = transform(dag_id, models_subdir=['staging', 'intermediate'])

extract(get_st(get_tgt())) >> load >> transform()
extract(get_st(get_tgt())) >> load >> transform_task

rxnorm()

0 comments on commit cdcc787

Please sign in to comment.