diff --git a/catalog/DAGs.md b/catalog/DAGs.md index b54ed379a5b..6d79ac75cff 100644 --- a/catalog/DAGs.md +++ b/catalog/DAGs.md @@ -19,6 +19,7 @@ The following are DAGs grouped by their primary tag: 1. [Database](#database) 1. [Maintenance](#maintenance) 1. [Oauth](#oauth) +1. [Other](#other) 1. [Provider](#provider) 1. [Provider Reingestion](#provider-reingestion) @@ -63,6 +64,12 @@ The following are DAGs grouped by their primary tag: | [`oauth2_authorization`](#oauth2_authorization) | `None` | | [`oauth2_token_refresh`](#oauth2_token_refresh) | `0 */12 * * *` | +## Other + +| DAG ID | Schedule Interval | +| --------------------------------------------------------- | ----------------- | +| [`flickr_thumbnails_removal`](#flickr_thumbnails_removal) | `None` | + ## Provider | DAG ID | Schedule Interval | Dated | Media Type(s) | @@ -113,6 +120,7 @@ The following is documentation associated with each DAG (where available): 1. [`finnish_museums_workflow`](#finnish_museums_workflow) 1. [`flickr_audit_sub_provider_workflow`](#flickr_audit_sub_provider_workflow) 1. [`flickr_reingestion_workflow`](#flickr_reingestion_workflow) +1. [`flickr_thumbnails_removal`](#flickr_thumbnails_removal) 1. [`flickr_workflow`](#flickr_workflow) 1. [`freesound_workflow`](#freesound_workflow) 1. [`image_data_refresh`](#image_data_refresh) @@ -394,6 +402,11 @@ Output: TSV file containing the images and the respective meta-data. Notes: https://www.flickr.com/help/terms/api Rate limit: 3600 requests per hour. +## `flickr_thumbnails_removal` + +One-time run DAG to remove progressively all the old Flickr thumbnails, as they +were determined to be unsuitable for the Openverse UI requirements. + ## `flickr_workflow` Content Provider: Flickr diff --git a/catalog/dags/flickr_thumbs_removal.py b/catalog/dags/flickr_thumbs_removal.py new file mode 100644 index 00000000000..5c8615c2ea5 --- /dev/null +++ b/catalog/dags/flickr_thumbs_removal.py @@ -0,0 +1,80 @@ +""" +One-time run DAG to remove progressively all the old Flickr thumbnails, +as they were determined to be unsuitable for the Openverse UI requirements. +""" +import logging +from datetime import timedelta +from textwrap import dedent + +from airflow.decorators import dag, task + +from common.constants import DAG_DEFAULT_ARGS, POSTGRES_CONN_ID +from common.slack import send_message +from common.sql import PostgresHook + + +logger = logging.getLogger(__name__) + + +DAG_ID = "flickr_thumbnails_removal" + + +@dag( + dag_id=DAG_ID, + default_args={ + **DAG_DEFAULT_ARGS, + "retries": 0, + "execution_timeout": timedelta(days=7), + }, + schedule=None, + catchup=False, + doc_md=__doc__, +) +def flickr_thumbnails_removal(): + pg = PostgresHook(postgres_conn_id=POSTGRES_CONN_ID) + select_conditions = "FROM image WHERE provider = 'flickr' AND thumbnail IS NOT NULL" + + @task() + def count(): + num_thumbs = pg.get_first(f"SELECT COUNT(*) {select_conditions}")[0] + logger.info(f"Flickr thumbnails found: {num_thumbs}.") + + return num_thumbs + + @task() + def delete(num_thumbs): + log_sql = True + if num_thumbs == 0: + logger.info("No Flickr thumbnails found.") + + while num_thumbs > 0: + query = dedent( + f""" + UPDATE image SET thumbnail = NULL WHERE identifier IN ( + SELECT identifier {select_conditions} + FETCH FIRST 10000 ROWS ONLY FOR UPDATE SKIP LOCKED + ) + """ + ) + pg.run(query, log_sql=log_sql) + num_thumbs -= 10000 + logger.info( + f"Flickr thumbnails left: {num_thumbs if num_thumbs > 0 else 0}." + ) + log_sql = False + + @task() + def report(): + msg = ( + "All Flickr thumbnails were successfully removed. " + f"The `{DAG_ID}` DAG can be retired." + ) + send_message(msg, DAG_ID) + + num_thumbs = count() + d = delete(num_thumbs) + r = report() + d >> r + + +flickr_thumbnails_removal() diff --git a/catalog/dags/providers/provider_api_scripts/flickr.py b/catalog/dags/providers/provider_api_scripts/flickr.py index 39e090c5469..a1cfede822c 100644 --- a/catalog/dags/providers/provider_api_scripts/flickr.py +++ b/catalog/dags/providers/provider_api_scripts/flickr.py @@ -120,9 +120,9 @@ def ingest_records(self, **kwargs): for start_ts, end_ts in self.large_batches: # For each large batch, ingest records for that interval one license # type at a time. - for license in LICENSE_INFO.keys(): + for license_ in LICENSE_INFO.keys(): super().ingest_records_for_timestamp_pair( - start_ts=start_ts, end_ts=end_ts, license=license + start_ts=start_ts, end_ts=end_ts, license=license_ ) logger.info("Completed large batch processing by license type.") @@ -139,14 +139,14 @@ def get_next_query_params(self, prev_query_params, **kwargs): # license will be available in the params if we're dealing # with a large batch. If not, fall back to all licenses - license = kwargs.get("license", self.default_license_param) + license_ = kwargs.get("license", self.default_license_param) return { "min_upload_date": start_timestamp, "max_upload_date": end_timestamp, "page": 0, "api_key": self.api_key, - "license": license, + "license": license_, "per_page": self.batch_limit, "method": "flickr.photos.search", "media": "photos", diff --git a/docker-compose.yml b/docker-compose.yml index 289a912f85e..f9d80b8e06a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -51,8 +51,8 @@ services: context: ./docker/upstream_db/ target: db image: openverse-upstream_db - expose: - - "5432" + ports: + - "50255:5432" volumes: - catalog-postgres:/var/lib/postgresql/data - ./sample_data:/sample_data