Skip to content
This repository has been archived by the owner on Jan 13, 2022. It is now read-only.

Clean preexisting data using ImageStore #517

Merged
merged 17 commits into from
Oct 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions src/cc_catalog_airflow/dags/cleaner_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from copy import deepcopy
from datetime import datetime
import logging
import os
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from util import config, pg_cleaner, operator_util

logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s: %(message)s",
level=logging.INFO,
)

logger = logging.getLogger(__name__)

DAG_ID = "postgres_image_cleaner"
DB_CONN_ID = os.getenv("OPENLEDGER_CONN_ID", "postgres_openledger_testing")
PREFIX_LENGTH = 1
DESIRED_PREFIX_LENGTH = 3
CONCURRENCY = 16


def create_id_partitioned_cleaner_dag(
dag_id=DAG_ID,
prefix_length=PREFIX_LENGTH,
postgres_conn_id=DB_CONN_ID,
start_date=datetime(1970, 1, 1),
concurrency=CONCURRENCY,
default_args=config.DAG_DEFAULT_ARGS,
):
args = deepcopy(default_args)
args.update(start_date=start_date)
dag = DAG(
dag_id=dag_id,
default_args=args,
concurrency=concurrency,
max_active_runs=concurrency,
schedule_interval=None,
start_date=start_date,
catchup=False,
)
hex_prefixes = pg_cleaner.hex_counter(prefix_length)
with dag:
cleaner_list = [
_get_pg_cleaner_operator(dag, prefix, postgres_conn_id)
for prefix in hex_prefixes
]
start_task = operator_util.get_log_operator(dag, dag.dag_id, "Started")
end_task = operator_util.get_log_operator(dag, dag.dag_id, "Ended")
start_task >> cleaner_list >> end_task
return dag


def _get_pg_cleaner_operator(
dag,
prefix,
postgres_conn_id,
desired_length=DESIRED_PREFIX_LENGTH,
delay=CONCURRENCY,
):
task_id = f"clean_{prefix}"
return PythonOperator(
task_id=task_id,
python_callable=pg_cleaner.clean_prefix_loop,
op_args=[postgres_conn_id, prefix],
op_kwargs={
"desired_prefix_length": desired_length,
"delay_minutes": delay,
},
depends_on_past=False,
dag=dag,
)


globals()[DAG_ID] = create_id_partitioned_cleaner_dag()
42 changes: 32 additions & 10 deletions src/cc_catalog_airflow/dags/loader_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from airflow import DAG

from util.loader import operators
from util.pg_cleaner import OVERWRITE_DIR


logging.basicConfig(
Expand All @@ -14,16 +15,17 @@
logger = logging.getLogger(__name__)

DAG_ID = 'tsv_to_postgres_loader'
OVERWRITE_DAG_ID = 'tsv_to_postgres_loader_overwrite'
DB_CONN_ID = os.getenv('OPENLEDGER_CONN_ID', 'postgres_openledger_testing')
AWS_CONN_ID = os.getenv('AWS_CONN_ID', 'no_aws_conn_id')
CCCATALOG_STORAGE_BUCKET = os.getenv('CCCATALOG_STORAGE_BUCKET')
MINIMUM_FILE_AGE_MINUTES = int(os.getenv('LOADER_FILE_AGE', 15))
CONCURRENCY = 5
SCHEDULE_CRON = '* * * * *'
TIMESTAMP_TEMPLATE = '{{ ts_nodash }}'

OUTPUT_DIR_PATH = os.path.realpath(os.getenv('OUTPUT_DIR', '/tmp/'))


DAG_DEFAULT_ARGS = {
'owner': 'data-eng-admin',
'depends_on_past': False,
Expand All @@ -44,7 +46,8 @@ def create_dag(
aws_conn_id=AWS_CONN_ID,
output_dir=OUTPUT_DIR_PATH,
storage_bucket=CCCATALOG_STORAGE_BUCKET,
minimum_file_age_minutes=MINIMUM_FILE_AGE_MINUTES
minimum_file_age_minutes=MINIMUM_FILE_AGE_MINUTES,
overwrite=False
):
dag = DAG(
dag_id=dag_id,
Expand All @@ -54,28 +57,37 @@ def create_dag(
schedule_interval=schedule_cron,
catchup=False
)
if overwrite is True:
identifier = 'overwrite' + TIMESTAMP_TEMPLATE
else:
identifier = TIMESTAMP_TEMPLATE

with dag:
stage_oldest_tsv_file = operators.get_file_staging_operator(
dag,
output_dir,
minimum_file_age_minutes
minimum_file_age_minutes,
identifier=identifier,
)
create_loading_table = operators.get_table_creator_operator(
dag,
postgres_conn_id
postgres_conn_id,
identifier=identifier,
)
copy_to_s3 = operators.get_copy_to_s3_operator(
dag,
output_dir,
storage_bucket,
aws_conn_id
aws_conn_id,
identifier=identifier,
)
load_s3_data = operators.get_load_s3_data_operator(
dag,
storage_bucket,
aws_conn_id,
postgres_conn_id
postgres_conn_id,
overwrite=overwrite,
identifier=identifier,
)
one_failed_s3 = operators.get_one_failed_switch(
dag,
Expand All @@ -84,7 +96,9 @@ def create_dag(
load_local_data = operators.get_load_local_data_operator(
dag,
output_dir,
postgres_conn_id
postgres_conn_id,
overwrite=overwrite,
identifier=identifier,
)
one_success_save = operators.get_one_success_switch(
dag,
Expand All @@ -100,19 +114,22 @@ def create_dag(
)
delete_staged_file = operators.get_file_deletion_operator(
dag,
output_dir
output_dir,
identifier=identifier,
)
one_failed_delete = operators.get_one_failed_switch(
dag,
'delete'
)
drop_loading_table = operators.get_drop_table_operator(
dag,
postgres_conn_id
postgres_conn_id,
identifier=identifier,
)
move_staged_failures = operators.get_failure_moving_operator(
dag,
output_dir
output_dir,
identifier=identifier,
)
(
stage_oldest_tsv_file
Expand All @@ -132,3 +149,8 @@ def create_dag(


globals()[DAG_ID] = create_dag()
globals()[OVERWRITE_DAG_ID] = create_dag(
dag_id=OVERWRITE_DAG_ID,
output_dir=os.path.join(OUTPUT_DIR_PATH, OVERWRITE_DIR),
overwrite=True
)
14 changes: 14 additions & 0 deletions src/cc_catalog_airflow/dags/test_cleaner_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import os
from airflow.models import DagBag

FILE_DIR = os.path.abspath(os.path.dirname(__file__))


def test_dag_loads_with_no_errors(tmpdir):
tmp_directory = str(tmpdir)
print(tmp_directory)
dag_bag = DagBag(dag_folder=tmp_directory, include_examples=False)
dag_bag.process_file(os.path.join(FILE_DIR, "cleaner_workflow.py"))
print(dag_bag.dags)
assert len(dag_bag.import_errors) == 0
assert len(dag_bag.dags) == 1
2 changes: 1 addition & 1 deletion src/cc_catalog_airflow/dags/test_loader_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ def test_dag_loads_with_no_errors(tmpdir):
dag_bag = DagBag(dag_folder=tmp_directory, include_examples=False)
dag_bag.process_file(os.path.join(FILE_DIR, 'loader_workflow.py'))
assert len(dag_bag.import_errors) == 0
assert len(dag_bag.dags) == 1
assert len(dag_bag.dags) == 2
1 change: 1 addition & 0 deletions src/cc_catalog_airflow/dags/util/loader/column_names.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
This file holds string constants for the column names in the image
database, as well as the loading tables in the PostgreSQL DB.
"""
IDENTIFIER = 'identifier'
FOREIGN_ID = 'foreign_identifier'
LANDING_URL = 'foreign_landing_url'
DIRECT_URL = 'url'
Expand Down
15 changes: 11 additions & 4 deletions src/cc_catalog_airflow/dags/util/loader/loader.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
from util.loader import paths, s3, sql, ingestion_column


def load_local_data(output_dir, postgres_conn_id, identifier):
def load_local_data(output_dir, postgres_conn_id, identifier, overwrite=False):
tsv_file_name = paths.get_staged_file(output_dir, identifier)
ingestion_column.check_and_fix_tsv_file(tsv_file_name)
sql.load_local_data_to_intermediate_table(
postgres_conn_id,
tsv_file_name,
identifier
)
sql.upsert_records_to_image_table(postgres_conn_id, identifier)
if overwrite is True:
sql.overwrite_records_in_image_table(postgres_conn_id, identifier)
else:
sql.upsert_records_to_image_table(postgres_conn_id, identifier)


def copy_to_s3(output_dir, bucket, identifier, aws_conn_id):
Expand All @@ -22,7 +25,8 @@ def load_s3_data(
bucket,
aws_conn_id,
postgres_conn_id,
identifier
identifier,
overwrite=False,
):
tsv_key = s3.get_staged_s3_object(identifier, bucket, aws_conn_id)
sql.load_s3_data_to_intermediate_table(
Expand All @@ -31,4 +35,7 @@ def load_s3_data(
tsv_key,
identifier
)
sql.upsert_records_to_image_table(postgres_conn_id, identifier)
if overwrite is True:
sql.overwrite_records_in_image_table(postgres_conn_id, identifier)
else:
sql.upsert_records_to_image_table(postgres_conn_id, identifier)
4 changes: 4 additions & 0 deletions src/cc_catalog_airflow/dags/util/loader/operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,13 @@ def get_load_local_data_operator(
dag,
output_dir,
postgres_conn_id,
overwrite=False,
identifier=TIMESTAMP_TEMPLATE
):
return PythonOperator(
task_id='load_local_data',
python_callable=loader.load_local_data,
op_kwargs={'overwrite': overwrite},
op_args=[output_dir, postgres_conn_id, identifier],
trigger_rule=TriggerRule.ALL_SUCCESS,
dag=dag
Expand All @@ -74,11 +76,13 @@ def get_load_s3_data_operator(
bucket,
aws_conn_id,
postgres_conn_id,
overwrite=False,
identifier=TIMESTAMP_TEMPLATE
):
return PythonOperator(
task_id='load_s3_data',
python_callable=loader.load_s3_data,
op_kwargs={'overwrite': overwrite},
op_args=[bucket, aws_conn_id, postgres_conn_id, identifier],
dag=dag
)
Expand Down
45 changes: 45 additions & 0 deletions src/cc_catalog_airflow/dags/util/loader/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,51 @@ def _merge_jsonb_arrays(column):
postgres.run(upsert_query)


def overwrite_records_in_image_table(
postgres_conn_id,
identifier,
image_table=IMAGE_TABLE_NAME
):

load_table = _get_load_table_name(identifier)
logger.info(f'Updating records in {image_table}.')
postgres = PostgresHook(postgres_conn_id=postgres_conn_id)
columns_to_update = [
col.LANDING_URL,
col.DIRECT_URL,
col.THUMBNAIL,
col.WIDTH,
col.HEIGHT,
col.FILESIZE,
col.LICENSE,
col.LICENSE_VERSION,
col.CREATOR,
col.CREATOR_URL,
col.TITLE,
col.META_DATA,
col.TAGS,
col.WATERMARKED,
]
update_set_string = ',\n'.join(
[f'{column} = {load_table}.{column}' for column in columns_to_update]
)

update_query = dedent(
f'''
UPDATE {image_table}
SET
{update_set_string}
FROM {load_table}
WHERE
{image_table}.{col.PROVIDER} = {load_table}.{col.PROVIDER}
AND
md5({image_table}.{col.FOREIGN_ID})
= md5({load_table}.{col.FOREIGN_ID});
'''
)
postgres.run(update_query)


def drop_load_table(postgres_conn_id, identifier):
load_table = _get_load_table_name(identifier)
postgres = PostgresHook(postgres_conn_id=postgres_conn_id)
Expand Down
Loading