Skip to content
This repository has been archived by the owner on Jan 13, 2022. It is now read-only.

Commit

Permalink
Merge pull request #517 from creativecommons/clean_preexisting_data_w…
Browse files Browse the repository at this point in the history
…ith_disk_write

Clean preexisting data using ImageStore
  • Loading branch information
mathemancer authored Oct 20, 2020
2 parents cabfa11 + ca3cdec commit 727e355
Show file tree
Hide file tree
Showing 13 changed files with 1,403 additions and 17 deletions.
75 changes: 75 additions & 0 deletions src/cc_catalog_airflow/dags/cleaner_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from copy import deepcopy
from datetime import datetime
import logging
import os
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from util import config, pg_cleaner, operator_util

logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s: %(message)s",
level=logging.INFO,
)

logger = logging.getLogger(__name__)

DAG_ID = "postgres_image_cleaner"
DB_CONN_ID = os.getenv("OPENLEDGER_CONN_ID", "postgres_openledger_testing")
PREFIX_LENGTH = 1
DESIRED_PREFIX_LENGTH = 3
CONCURRENCY = 16


def create_id_partitioned_cleaner_dag(
dag_id=DAG_ID,
prefix_length=PREFIX_LENGTH,
postgres_conn_id=DB_CONN_ID,
start_date=datetime(1970, 1, 1),
concurrency=CONCURRENCY,
default_args=config.DAG_DEFAULT_ARGS,
):
args = deepcopy(default_args)
args.update(start_date=start_date)
dag = DAG(
dag_id=dag_id,
default_args=args,
concurrency=concurrency,
max_active_runs=concurrency,
schedule_interval=None,
start_date=start_date,
catchup=False,
)
hex_prefixes = pg_cleaner.hex_counter(prefix_length)
with dag:
cleaner_list = [
_get_pg_cleaner_operator(dag, prefix, postgres_conn_id)
for prefix in hex_prefixes
]
start_task = operator_util.get_log_operator(dag, dag.dag_id, "Started")
end_task = operator_util.get_log_operator(dag, dag.dag_id, "Ended")
start_task >> cleaner_list >> end_task
return dag


def _get_pg_cleaner_operator(
dag,
prefix,
postgres_conn_id,
desired_length=DESIRED_PREFIX_LENGTH,
delay=CONCURRENCY,
):
task_id = f"clean_{prefix}"
return PythonOperator(
task_id=task_id,
python_callable=pg_cleaner.clean_prefix_loop,
op_args=[postgres_conn_id, prefix],
op_kwargs={
"desired_prefix_length": desired_length,
"delay_minutes": delay,
},
depends_on_past=False,
dag=dag,
)


globals()[DAG_ID] = create_id_partitioned_cleaner_dag()
42 changes: 32 additions & 10 deletions src/cc_catalog_airflow/dags/loader_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from airflow import DAG

from util.loader import operators
from util.pg_cleaner import OVERWRITE_DIR


logging.basicConfig(
Expand All @@ -14,16 +15,17 @@
logger = logging.getLogger(__name__)

DAG_ID = 'tsv_to_postgres_loader'
OVERWRITE_DAG_ID = 'tsv_to_postgres_loader_overwrite'
DB_CONN_ID = os.getenv('OPENLEDGER_CONN_ID', 'postgres_openledger_testing')
AWS_CONN_ID = os.getenv('AWS_CONN_ID', 'no_aws_conn_id')
CCCATALOG_STORAGE_BUCKET = os.getenv('CCCATALOG_STORAGE_BUCKET')
MINIMUM_FILE_AGE_MINUTES = int(os.getenv('LOADER_FILE_AGE', 15))
CONCURRENCY = 5
SCHEDULE_CRON = '* * * * *'
TIMESTAMP_TEMPLATE = '{{ ts_nodash }}'

OUTPUT_DIR_PATH = os.path.realpath(os.getenv('OUTPUT_DIR', '/tmp/'))


DAG_DEFAULT_ARGS = {
'owner': 'data-eng-admin',
'depends_on_past': False,
Expand All @@ -44,7 +46,8 @@ def create_dag(
aws_conn_id=AWS_CONN_ID,
output_dir=OUTPUT_DIR_PATH,
storage_bucket=CCCATALOG_STORAGE_BUCKET,
minimum_file_age_minutes=MINIMUM_FILE_AGE_MINUTES
minimum_file_age_minutes=MINIMUM_FILE_AGE_MINUTES,
overwrite=False
):
dag = DAG(
dag_id=dag_id,
Expand All @@ -54,28 +57,37 @@ def create_dag(
schedule_interval=schedule_cron,
catchup=False
)
if overwrite is True:
identifier = 'overwrite' + TIMESTAMP_TEMPLATE
else:
identifier = TIMESTAMP_TEMPLATE

with dag:
stage_oldest_tsv_file = operators.get_file_staging_operator(
dag,
output_dir,
minimum_file_age_minutes
minimum_file_age_minutes,
identifier=identifier,
)
create_loading_table = operators.get_table_creator_operator(
dag,
postgres_conn_id
postgres_conn_id,
identifier=identifier,
)
copy_to_s3 = operators.get_copy_to_s3_operator(
dag,
output_dir,
storage_bucket,
aws_conn_id
aws_conn_id,
identifier=identifier,
)
load_s3_data = operators.get_load_s3_data_operator(
dag,
storage_bucket,
aws_conn_id,
postgres_conn_id
postgres_conn_id,
overwrite=overwrite,
identifier=identifier,
)
one_failed_s3 = operators.get_one_failed_switch(
dag,
Expand All @@ -84,7 +96,9 @@ def create_dag(
load_local_data = operators.get_load_local_data_operator(
dag,
output_dir,
postgres_conn_id
postgres_conn_id,
overwrite=overwrite,
identifier=identifier,
)
one_success_save = operators.get_one_success_switch(
dag,
Expand All @@ -100,19 +114,22 @@ def create_dag(
)
delete_staged_file = operators.get_file_deletion_operator(
dag,
output_dir
output_dir,
identifier=identifier,
)
one_failed_delete = operators.get_one_failed_switch(
dag,
'delete'
)
drop_loading_table = operators.get_drop_table_operator(
dag,
postgres_conn_id
postgres_conn_id,
identifier=identifier,
)
move_staged_failures = operators.get_failure_moving_operator(
dag,
output_dir
output_dir,
identifier=identifier,
)
(
stage_oldest_tsv_file
Expand All @@ -132,3 +149,8 @@ def create_dag(


globals()[DAG_ID] = create_dag()
globals()[OVERWRITE_DAG_ID] = create_dag(
dag_id=OVERWRITE_DAG_ID,
output_dir=os.path.join(OUTPUT_DIR_PATH, OVERWRITE_DIR),
overwrite=True
)
14 changes: 14 additions & 0 deletions src/cc_catalog_airflow/dags/test_cleaner_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import os
from airflow.models import DagBag

FILE_DIR = os.path.abspath(os.path.dirname(__file__))


def test_dag_loads_with_no_errors(tmpdir):
tmp_directory = str(tmpdir)
print(tmp_directory)
dag_bag = DagBag(dag_folder=tmp_directory, include_examples=False)
dag_bag.process_file(os.path.join(FILE_DIR, "cleaner_workflow.py"))
print(dag_bag.dags)
assert len(dag_bag.import_errors) == 0
assert len(dag_bag.dags) == 1
2 changes: 1 addition & 1 deletion src/cc_catalog_airflow/dags/test_loader_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ def test_dag_loads_with_no_errors(tmpdir):
dag_bag = DagBag(dag_folder=tmp_directory, include_examples=False)
dag_bag.process_file(os.path.join(FILE_DIR, 'loader_workflow.py'))
assert len(dag_bag.import_errors) == 0
assert len(dag_bag.dags) == 1
assert len(dag_bag.dags) == 2
1 change: 1 addition & 0 deletions src/cc_catalog_airflow/dags/util/loader/column_names.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
This file holds string constants for the column names in the image
database, as well as the loading tables in the PostgreSQL DB.
"""
IDENTIFIER = 'identifier'
FOREIGN_ID = 'foreign_identifier'
LANDING_URL = 'foreign_landing_url'
DIRECT_URL = 'url'
Expand Down
15 changes: 11 additions & 4 deletions src/cc_catalog_airflow/dags/util/loader/loader.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
from util.loader import paths, s3, sql, ingestion_column


def load_local_data(output_dir, postgres_conn_id, identifier):
def load_local_data(output_dir, postgres_conn_id, identifier, overwrite=False):
tsv_file_name = paths.get_staged_file(output_dir, identifier)
ingestion_column.check_and_fix_tsv_file(tsv_file_name)
sql.load_local_data_to_intermediate_table(
postgres_conn_id,
tsv_file_name,
identifier
)
sql.upsert_records_to_image_table(postgres_conn_id, identifier)
if overwrite is True:
sql.overwrite_records_in_image_table(postgres_conn_id, identifier)
else:
sql.upsert_records_to_image_table(postgres_conn_id, identifier)


def copy_to_s3(output_dir, bucket, identifier, aws_conn_id):
Expand All @@ -22,7 +25,8 @@ def load_s3_data(
bucket,
aws_conn_id,
postgres_conn_id,
identifier
identifier,
overwrite=False,
):
tsv_key = s3.get_staged_s3_object(identifier, bucket, aws_conn_id)
sql.load_s3_data_to_intermediate_table(
Expand All @@ -31,4 +35,7 @@ def load_s3_data(
tsv_key,
identifier
)
sql.upsert_records_to_image_table(postgres_conn_id, identifier)
if overwrite is True:
sql.overwrite_records_in_image_table(postgres_conn_id, identifier)
else:
sql.upsert_records_to_image_table(postgres_conn_id, identifier)
4 changes: 4 additions & 0 deletions src/cc_catalog_airflow/dags/util/loader/operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,13 @@ def get_load_local_data_operator(
dag,
output_dir,
postgres_conn_id,
overwrite=False,
identifier=TIMESTAMP_TEMPLATE
):
return PythonOperator(
task_id='load_local_data',
python_callable=loader.load_local_data,
op_kwargs={'overwrite': overwrite},
op_args=[output_dir, postgres_conn_id, identifier],
trigger_rule=TriggerRule.ALL_SUCCESS,
dag=dag
Expand All @@ -74,11 +76,13 @@ def get_load_s3_data_operator(
bucket,
aws_conn_id,
postgres_conn_id,
overwrite=False,
identifier=TIMESTAMP_TEMPLATE
):
return PythonOperator(
task_id='load_s3_data',
python_callable=loader.load_s3_data,
op_kwargs={'overwrite': overwrite},
op_args=[bucket, aws_conn_id, postgres_conn_id, identifier],
dag=dag
)
Expand Down
45 changes: 45 additions & 0 deletions src/cc_catalog_airflow/dags/util/loader/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,51 @@ def _merge_jsonb_arrays(column):
postgres.run(upsert_query)


def overwrite_records_in_image_table(
postgres_conn_id,
identifier,
image_table=IMAGE_TABLE_NAME
):

load_table = _get_load_table_name(identifier)
logger.info(f'Updating records in {image_table}.')
postgres = PostgresHook(postgres_conn_id=postgres_conn_id)
columns_to_update = [
col.LANDING_URL,
col.DIRECT_URL,
col.THUMBNAIL,
col.WIDTH,
col.HEIGHT,
col.FILESIZE,
col.LICENSE,
col.LICENSE_VERSION,
col.CREATOR,
col.CREATOR_URL,
col.TITLE,
col.META_DATA,
col.TAGS,
col.WATERMARKED,
]
update_set_string = ',\n'.join(
[f'{column} = {load_table}.{column}' for column in columns_to_update]
)

update_query = dedent(
f'''
UPDATE {image_table}
SET
{update_set_string}
FROM {load_table}
WHERE
{image_table}.{col.PROVIDER} = {load_table}.{col.PROVIDER}
AND
md5({image_table}.{col.FOREIGN_ID})
= md5({load_table}.{col.FOREIGN_ID});
'''
)
postgres.run(update_query)


def drop_load_table(postgres_conn_id, identifier):
load_table = _get_load_table_name(identifier)
postgres = PostgresHook(postgres_conn_id=postgres_conn_id)
Expand Down
Loading

0 comments on commit 727e355

Please sign in to comment.