Skip to content

Commit

Permalink
Use run_sql where possible
Browse files Browse the repository at this point in the history
  • Loading branch information
stacimc committed May 16, 2024
1 parent 515e06c commit 6a259e0
Showing 1 changed file with 4 additions and 10 deletions.
14 changes: 4 additions & 10 deletions catalog/dags/data_refresh/copy_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,6 @@ def _run_sql(
return postgres.run(query, handler=handler)


@task
def create_fdw_extension(downstream_conn_id: str):
"""Create the FDW extension if it does not exist."""
downstream_pg = PostgresHook(
postgres_conn_id=downstream_conn_id, default_statement_timeout=10.0
)
downstream_pg.run(queries.CREATE_FDW_EXTENSION_QUERY)


@task
def initialize_fdw(
upstream_conn_id: str,
Expand Down Expand Up @@ -302,7 +293,10 @@ def copy_upstream_tables(
downstream_conn_id = POSTGRES_API_CONN_IDS.get(environment)
upstream_conn_id = POSTGRES_CONN_ID

create_fdw = create_fdw_extension(downstream_conn_id=downstream_conn_id)
create_fdw = _run_sql.override(task_id="create_fdw")(
postgres_conn_id=downstream_conn_id,
sql_template=queries.CREATE_FDW_EXTENSION_QUERY,
)

init_fdw = initialize_fdw(
upstream_conn_id=upstream_conn_id,
Expand Down

0 comments on commit 6a259e0

Please sign in to comment.