Skip to content

Commit

Permalink
Remove tsv_to_postgres_loader_overwrite (#286)
Browse files Browse the repository at this point in the history
* Remove tsv_to_postgres_loader_overwrite

* Remove overwrite tests & extra steps
  • Loading branch information
AetherUnbound authored Nov 8, 2021
1 parent ef614b9 commit 6173fb4
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 326 deletions.
26 changes: 7 additions & 19 deletions openverse_catalog/dags/common/loader/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,17 @@
from common.loader.paths import _extract_media_type


def load_local_data(output_dir, postgres_conn_id, identifier, ti, overwrite=False):
def load_local_data(output_dir, postgres_conn_id, identifier, ti):
tsv_file_name = paths.get_staged_file(output_dir, identifier)
tsv_version = ti.xcom_pull(task_ids="stage_oldest_tsv_file", key="tsv_version")

media_type = _extract_media_type(tsv_file_name)
sql.load_local_data_to_intermediate_table(
postgres_conn_id, tsv_file_name, identifier
)
if overwrite is True:
sql.overwrite_records_in_db_table(
postgres_conn_id, identifier, media_type=media_type, tsv_version=tsv_version
)
else:
sql.upsert_records_to_db_table(
postgres_conn_id, identifier, media_type=media_type, tsv_version=tsv_version
)
sql.upsert_records_to_db_table(
postgres_conn_id, identifier, media_type=media_type, tsv_version=tsv_version
)


def copy_to_s3(output_dir, bucket, identifier, aws_conn_id):
Expand All @@ -34,7 +29,6 @@ def load_s3_data(
postgres_conn_id,
identifier,
ti,
overwrite=False,
):
media_type = ti.xcom_pull(task_ids="stage_oldest_tsv_file", key="media_type")
tsv_version = ti.xcom_pull(task_ids="stage_oldest_tsv_file", key="tsv_version")
Expand All @@ -47,12 +41,6 @@ def load_s3_data(
sql.load_s3_data_to_intermediate_table(
postgres_conn_id, bucket, tsv_key, identifier, media_type
)

if overwrite is True:
sql.overwrite_records_in_db_table(
postgres_conn_id, identifier, media_type=media_type, tsv_version=tsv_version
)
else:
sql.upsert_records_to_db_table(
postgres_conn_id, identifier, media_type=media_type, tsv_version=tsv_version
)
sql.upsert_records_to_db_table(
postgres_conn_id, identifier, media_type=media_type, tsv_version=tsv_version
)
5 changes: 1 addition & 4 deletions openverse_catalog/dags/common/loader/operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,11 @@ def get_table_creator_operator(postgres_conn_id, identifier=TIMESTAMP_TEMPLATE):


def get_load_local_data_operator(
output_dir, postgres_conn_id, overwrite=False, identifier=TIMESTAMP_TEMPLATE
output_dir, postgres_conn_id, identifier=TIMESTAMP_TEMPLATE
):
return PythonOperator(
task_id="load_local_data",
python_callable=loader.load_local_data,
op_kwargs={"overwrite": overwrite},
op_args=[output_dir, postgres_conn_id, identifier],
trigger_rule=TriggerRule.ALL_SUCCESS,
)
Expand All @@ -55,13 +54,11 @@ def get_load_s3_data_operator(
bucket,
aws_conn_id,
postgres_conn_id,
overwrite=False,
identifier=TIMESTAMP_TEMPLATE,
):
return PythonOperator(
task_id="load_s3_data",
python_callable=loader.load_s3_data,
op_kwargs={"overwrite": overwrite},
op_args=[bucket, aws_conn_id, postgres_conn_id, identifier],
)

Expand Down
36 changes: 0 additions & 36 deletions openverse_catalog/dags/common/loader/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,42 +261,6 @@ def upsert_records_to_db_table(
postgres.run(upsert_query)


def overwrite_records_in_db_table(
postgres_conn_id,
identifier,
db_table=None,
media_type=IMAGE,
tsv_version=CURRENT_TSV_VERSION,
):
if db_table is None:
db_table = TABLE_NAMES.get(media_type, TABLE_NAMES[IMAGE])
load_table = _get_load_table_name(identifier, media_type=media_type)
logger.info(f"Updating records in {db_table}. {tsv_version}")
postgres = PostgresHook(postgres_conn_id=postgres_conn_id)
columns_to_update = TSV_COLUMNS[media_type]
update_set_string = ",\n ".join(
[
f"{column.db_name} = {load_table}.{column.db_name}"
for column in columns_to_update
]
)

update_query = dedent(
f"""
UPDATE {db_table}
SET
{update_set_string}
FROM {load_table}
WHERE
{db_table}.{col.PROVIDER.db_name} = {load_table}.{col.PROVIDER.db_name}
AND
md5({db_table}.{col.FOREIGN_ID.db_name})
= md5({load_table}.{col.FOREIGN_ID.db_name});
"""
)
postgres.run(update_query)


def drop_load_table(postgres_conn_id, identifier, ti):
media_type = ti.xcom_pull(task_ids="stage_oldest_tsv_file", key="media_type")
if media_type is None:
Expand Down
173 changes: 67 additions & 106 deletions openverse_catalog/dags/database/loader_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,131 +46,92 @@
https://github.com/creativecommons/cccatalog/issues/334)
"""
import logging
import os
from datetime import datetime, timedelta

from airflow import DAG
from common.loader import operators
from common.pg_cleaner import OVERWRITE_DIR


logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s: %(message)s", level=logging.INFO
)

logger = logging.getLogger(__name__)

DAG_ID = "tsv_to_postgres_loader"
OVERWRITE_DAG_ID = "tsv_to_postgres_loader_overwrite"
DB_CONN_ID = os.getenv("OPENLEDGER_CONN_ID", "postgres_openledger_testing")
AWS_CONN_ID = os.getenv("AWS_CONN_ID", "no_aws_conn_id")
OPENVERSE_BUCKET = os.getenv("OPENVERSE_BUCKET")
MINIMUM_FILE_AGE_MINUTES = int(os.getenv("LOADER_FILE_AGE", 15))
CONCURRENCY = 5
SCHEDULE_CRON = "* * * * *"
TIMESTAMP_TEMPLATE = "{{ ts_nodash }}"

OUTPUT_DIR_PATH = os.path.realpath(os.getenv("OUTPUT_DIR", "/tmp/"))

DAG_DEFAULT_ARGS = {
"owner": "data-eng-admin",
"depends_on_past": False,
"start_date": datetime(2020, 1, 15),
"email_on_retry": False,
"retries": 2,
"retry_delay": timedelta(seconds=15),
}


def create_dag(
dag = DAG(
dag_id=DAG_ID,
args=DAG_DEFAULT_ARGS,
default_args={
"owner": "data-eng-admin",
"depends_on_past": False,
"start_date": datetime(2020, 1, 15),
"email_on_retry": False,
"retries": 2,
"retry_delay": timedelta(seconds=15),
},
concurrency=CONCURRENCY,
max_active_runs=CONCURRENCY,
schedule_cron=SCHEDULE_CRON,
postgres_conn_id=DB_CONN_ID,
aws_conn_id=AWS_CONN_ID,
output_dir=OUTPUT_DIR_PATH,
storage_bucket=OPENVERSE_BUCKET,
minimum_file_age_minutes=MINIMUM_FILE_AGE_MINUTES,
overwrite=False,
):
dag = DAG(
dag_id=dag_id,
default_args=args,
concurrency=concurrency,
max_active_runs=max_active_runs,
schedule_interval=schedule_cron,
catchup=False,
)
if overwrite is True:
identifier = "overwrite" + TIMESTAMP_TEMPLATE
else:
identifier = TIMESTAMP_TEMPLATE

with dag:
stage_oldest_tsv_file = operators.get_file_staging_operator(
output_dir,
minimum_file_age_minutes,
identifier=identifier,
)
create_loading_table = operators.get_table_creator_operator(
postgres_conn_id,
identifier=identifier,
)
copy_to_s3 = operators.get_copy_to_s3_operator(
output_dir,
storage_bucket,
aws_conn_id,
identifier=identifier,
)
load_s3_data = operators.get_load_s3_data_operator(
storage_bucket,
aws_conn_id,
postgres_conn_id,
overwrite=overwrite,
identifier=identifier,
)
one_failed_s3 = operators.get_one_failed_switch("s3")
load_local_data = operators.get_load_local_data_operator(
output_dir,
postgres_conn_id,
overwrite=overwrite,
identifier=identifier,
)
one_success_save = operators.get_one_success_switch("save")
all_done_save = operators.get_all_done_switch("save")
all_failed_save = operators.get_all_failed_switch("save")
delete_staged_file = operators.get_file_deletion_operator(
output_dir,
identifier=identifier,
)
one_failed_delete = operators.get_one_failed_switch("delete")
drop_loading_table = operators.get_drop_table_operator(
postgres_conn_id,
identifier=identifier,
)
move_staged_failures = operators.get_failure_moving_operator(
output_dir,
identifier=identifier,
)
(stage_oldest_tsv_file >> [create_loading_table, copy_to_s3] >> load_s3_data)
[copy_to_s3, load_s3_data] >> one_failed_s3
[create_loading_table, one_failed_s3] >> load_local_data
[copy_to_s3, load_local_data] >> one_success_save
[copy_to_s3, load_local_data] >> all_done_save
[copy_to_s3, load_local_data] >> all_failed_save
[one_success_save, all_done_save] >> delete_staged_file
[load_s3_data, load_local_data] >> drop_loading_table
delete_staged_file >> one_failed_delete
[one_failed_delete, all_failed_save] >> move_staged_failures
return dag


globals()[DAG_ID] = create_dag()
globals()[OVERWRITE_DAG_ID] = create_dag(
dag_id=OVERWRITE_DAG_ID,
output_dir=os.path.join(OUTPUT_DIR_PATH, OVERWRITE_DIR),
overwrite=True,
schedule_interval="* * * * *",
catchup=False,
doc_md=__doc__,
)

with dag:
stage_oldest_tsv_file = operators.get_file_staging_operator(
OUTPUT_DIR_PATH,
MINIMUM_FILE_AGE_MINUTES,
identifier=TIMESTAMP_TEMPLATE,
)
create_loading_table = operators.get_table_creator_operator(
DB_CONN_ID,
identifier=TIMESTAMP_TEMPLATE,
)
copy_to_s3 = operators.get_copy_to_s3_operator(
OUTPUT_DIR_PATH,
OPENVERSE_BUCKET,
AWS_CONN_ID,
identifier=TIMESTAMP_TEMPLATE,
)
load_s3_data = operators.get_load_s3_data_operator(
OPENVERSE_BUCKET,
AWS_CONN_ID,
DB_CONN_ID,
identifier=TIMESTAMP_TEMPLATE,
)
one_failed_s3 = operators.get_one_failed_switch("s3")
load_local_data = operators.get_load_local_data_operator(
OUTPUT_DIR_PATH,
DB_CONN_ID,
identifier=TIMESTAMP_TEMPLATE,
)
one_success_save = operators.get_one_success_switch("save")
all_done_save = operators.get_all_done_switch("save")
all_failed_save = operators.get_all_failed_switch("save")
delete_staged_file = operators.get_file_deletion_operator(
OUTPUT_DIR_PATH,
identifier=TIMESTAMP_TEMPLATE,
)
one_failed_delete = operators.get_one_failed_switch("delete")
drop_loading_table = operators.get_drop_table_operator(
DB_CONN_ID,
identifier=TIMESTAMP_TEMPLATE,
)
move_staged_failures = operators.get_failure_moving_operator(
OUTPUT_DIR_PATH,
identifier=TIMESTAMP_TEMPLATE,
)
(stage_oldest_tsv_file >> [create_loading_table, copy_to_s3] >> load_s3_data)
[copy_to_s3, load_s3_data] >> one_failed_s3
[create_loading_table, one_failed_s3] >> load_local_data
[copy_to_s3, load_local_data] >> one_success_save
[copy_to_s3, load_local_data] >> all_done_save
[copy_to_s3, load_local_data] >> all_failed_save
[one_success_save, all_done_save] >> delete_staged_file
[load_s3_data, load_local_data] >> drop_loading_table
delete_staged_file >> one_failed_delete
[one_failed_delete, all_failed_save] >> move_staged_failures
Loading

0 comments on commit 6173fb4

Please sign in to comment.