From 7d89cd3de174079a483ae004dbd38e30f3d46b27 Mon Sep 17 00:00:00 2001 From: Krystle Salazar Date: Fri, 2 Jun 2023 12:23:57 -0400 Subject: [PATCH 1/6] Add `flickr_thumbnails_removal` DAG --- catalog/dags/flickr_thumbs_removal.py | 72 +++++++++++++++++++++++++++ 1 file changed, 72 insertions(+) create mode 100644 catalog/dags/flickr_thumbs_removal.py diff --git a/catalog/dags/flickr_thumbs_removal.py b/catalog/dags/flickr_thumbs_removal.py new file mode 100644 index 00000000000..3911cf1f03c --- /dev/null +++ b/catalog/dags/flickr_thumbs_removal.py @@ -0,0 +1,72 @@ +import logging +from datetime import timedelta +from textwrap import dedent + +from airflow.decorators import dag, task + +from common.constants import DAG_DEFAULT_ARGS, POSTGRES_CONN_ID +from common.slack import send_message +from common.sql import PostgresHook + + +logger = logging.getLogger(__name__) + + +DAG_ID = "flickr_thumbnails_removal" + + +@dag( + dag_id=DAG_ID, + default_args={ + **DAG_DEFAULT_ARGS, + "retries": 0, + "execution_timeout": timedelta(days=7), + }, + schedule=None, + catchup=False, + doc_md=__doc__, +) +def flickr_thumbnails_removal(): + pg = PostgresHook(postgres_conn_id=POSTGRES_CONN_ID) + select_conditions = "FROM image WHERE provider = 'flickr' AND thumbnail IS NOT NULL" + + @task() + def count(): + num_thumbs = pg.get_first(f"SELECT COUNT(*) {select_conditions}")[0] + logger.info(f"Flickr thumbnails found: {num_thumbs}.") + + return num_thumbs + + @task() + def delete(num_thumbs): + if num_thumbs == 0: + logger.info("No Flickr thumbnails found.") + + while num_thumbs > 0: + query = dedent( + f""" + UPDATE image SET thumbnail = NULL WHERE identifier IN + (SELECT identifier {select_conditions} FETCH FIRST 10000 ROWS ONLY) + """ + ) + pg.run(query) + num_thumbs -= 10000 + logger.info( + f"Flickr thumbnails left: {num_thumbs if num_thumbs > 0 else 0}." + ) + + @task() + def report(): + msg = ( + "All Flickr thumbnails were successfully removed. " + f"The `{DAG_ID}` DAG can be retired." + ) + send_message(msg, DAG_ID) + + num_thumbs = count() + d = delete(num_thumbs) + r = report() + d >> r + + +flickr_thumbnails_removal() From b8873c6d74c34818cac9ddbb10fd8d8eadbf11ec Mon Sep 17 00:00:00 2001 From: Krystle Salazar Date: Fri, 2 Jun 2023 12:25:43 -0400 Subject: [PATCH 2/6] Avoid shadowing built-in name 'license' in Flickr DAG --- catalog/dags/providers/provider_api_scripts/flickr.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/catalog/dags/providers/provider_api_scripts/flickr.py b/catalog/dags/providers/provider_api_scripts/flickr.py index 39e090c5469..a1cfede822c 100644 --- a/catalog/dags/providers/provider_api_scripts/flickr.py +++ b/catalog/dags/providers/provider_api_scripts/flickr.py @@ -120,9 +120,9 @@ def ingest_records(self, **kwargs): for start_ts, end_ts in self.large_batches: # For each large batch, ingest records for that interval one license # type at a time. - for license in LICENSE_INFO.keys(): + for license_ in LICENSE_INFO.keys(): super().ingest_records_for_timestamp_pair( - start_ts=start_ts, end_ts=end_ts, license=license + start_ts=start_ts, end_ts=end_ts, license=license_ ) logger.info("Completed large batch processing by license type.") @@ -139,14 +139,14 @@ def get_next_query_params(self, prev_query_params, **kwargs): # license will be available in the params if we're dealing # with a large batch. If not, fall back to all licenses - license = kwargs.get("license", self.default_license_param) + license_ = kwargs.get("license", self.default_license_param) return { "min_upload_date": start_timestamp, "max_upload_date": end_timestamp, "page": 0, "api_key": self.api_key, - "license": license, + "license": license_, "per_page": self.batch_limit, "method": "flickr.photos.search", "media": "photos", From fe6cb1fc252dfcf52cb3310244ab612efd56e4ee Mon Sep 17 00:00:00 2001 From: Krystle Salazar Date: Fri, 2 Jun 2023 12:27:39 -0400 Subject: [PATCH 3/6] Expose the upstream DB port --- docker-compose.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 289a912f85e..f9d80b8e06a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -51,8 +51,8 @@ services: context: ./docker/upstream_db/ target: db image: openverse-upstream_db - expose: - - "5432" + ports: + - "50255:5432" volumes: - catalog-postgres:/var/lib/postgresql/data - ./sample_data:/sample_data From 19b48064a164d23ab8170ee8576ab5184cab4b08 Mon Sep 17 00:00:00 2001 From: Krystle Salazar Date: Fri, 2 Jun 2023 12:54:25 -0400 Subject: [PATCH 4/6] Update DAG docs --- catalog/DAGs.md | 13 +++++++++++++ catalog/dags/flickr_thumbs_removal.py | 4 ++++ 2 files changed, 17 insertions(+) diff --git a/catalog/DAGs.md b/catalog/DAGs.md index b54ed379a5b..6d79ac75cff 100644 --- a/catalog/DAGs.md +++ b/catalog/DAGs.md @@ -19,6 +19,7 @@ The following are DAGs grouped by their primary tag: 1. [Database](#database) 1. [Maintenance](#maintenance) 1. [Oauth](#oauth) +1. [Other](#other) 1. [Provider](#provider) 1. [Provider Reingestion](#provider-reingestion) @@ -63,6 +64,12 @@ The following are DAGs grouped by their primary tag: | [`oauth2_authorization`](#oauth2_authorization) | `None` | | [`oauth2_token_refresh`](#oauth2_token_refresh) | `0 */12 * * *` | +## Other + +| DAG ID | Schedule Interval | +| --------------------------------------------------------- | ----------------- | +| [`flickr_thumbnails_removal`](#flickr_thumbnails_removal) | `None` | + ## Provider | DAG ID | Schedule Interval | Dated | Media Type(s) | @@ -113,6 +120,7 @@ The following is documentation associated with each DAG (where available): 1. [`finnish_museums_workflow`](#finnish_museums_workflow) 1. [`flickr_audit_sub_provider_workflow`](#flickr_audit_sub_provider_workflow) 1. [`flickr_reingestion_workflow`](#flickr_reingestion_workflow) +1. [`flickr_thumbnails_removal`](#flickr_thumbnails_removal) 1. [`flickr_workflow`](#flickr_workflow) 1. [`freesound_workflow`](#freesound_workflow) 1. [`image_data_refresh`](#image_data_refresh) @@ -394,6 +402,11 @@ Output: TSV file containing the images and the respective meta-data. Notes: https://www.flickr.com/help/terms/api Rate limit: 3600 requests per hour. +## `flickr_thumbnails_removal` + +One-time run DAG to remove progressively all the old Flickr thumbnails, as they +were determined to be unsuitable for the Openverse UI requirements. + ## `flickr_workflow` Content Provider: Flickr diff --git a/catalog/dags/flickr_thumbs_removal.py b/catalog/dags/flickr_thumbs_removal.py index 3911cf1f03c..ab4921738dc 100644 --- a/catalog/dags/flickr_thumbs_removal.py +++ b/catalog/dags/flickr_thumbs_removal.py @@ -1,3 +1,7 @@ +""" +One-time run DAG to remove progressively all the old Flickr thumbnails, +as they were determined to be unsuitable for the Openverse UI requirements. +""" import logging from datetime import timedelta from textwrap import dedent From eaf5fad22c1c16a63e0fc63a7ee08e74181efce5 Mon Sep 17 00:00:00 2001 From: Krystle Salazar Date: Mon, 5 Jun 2023 18:48:21 -0400 Subject: [PATCH 5/6] Add locking clause Co-authored-by: Staci Mullins <63313398+stacimc@users.noreply.github.com> --- catalog/dags/flickr_thumbs_removal.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/catalog/dags/flickr_thumbs_removal.py b/catalog/dags/flickr_thumbs_removal.py index ab4921738dc..fabfc712d88 100644 --- a/catalog/dags/flickr_thumbs_removal.py +++ b/catalog/dags/flickr_thumbs_removal.py @@ -50,7 +50,7 @@ def delete(num_thumbs): query = dedent( f""" UPDATE image SET thumbnail = NULL WHERE identifier IN - (SELECT identifier {select_conditions} FETCH FIRST 10000 ROWS ONLY) + (SELECT identifier {select_conditions} FETCH FIRST 10000 ROWS ONLY FOR UPDATE SKIP LOCKED) """ ) pg.run(query) From 3bdce2838abd4f85785af787fd4ec2324cb4e3c6 Mon Sep 17 00:00:00 2001 From: Krystle Salazar Date: Mon, 5 Jun 2023 18:51:21 -0400 Subject: [PATCH 6/6] Log only the first run of the update --- catalog/dags/flickr_thumbs_removal.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/catalog/dags/flickr_thumbs_removal.py b/catalog/dags/flickr_thumbs_removal.py index fabfc712d88..5c8615c2ea5 100644 --- a/catalog/dags/flickr_thumbs_removal.py +++ b/catalog/dags/flickr_thumbs_removal.py @@ -43,21 +43,25 @@ def count(): @task() def delete(num_thumbs): + log_sql = True if num_thumbs == 0: logger.info("No Flickr thumbnails found.") while num_thumbs > 0: query = dedent( f""" - UPDATE image SET thumbnail = NULL WHERE identifier IN - (SELECT identifier {select_conditions} FETCH FIRST 10000 ROWS ONLY FOR UPDATE SKIP LOCKED) + UPDATE image SET thumbnail = NULL WHERE identifier IN ( + SELECT identifier {select_conditions} + FETCH FIRST 10000 ROWS ONLY FOR UPDATE SKIP LOCKED + ) """ ) - pg.run(query) + pg.run(query, log_sql=log_sql) num_thumbs -= 10000 logger.info( f"Flickr thumbnails left: {num_thumbs if num_thumbs > 0 else 0}." ) + log_sql = False @task() def report():