Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DAG to remove Flickr thumbnails #2302

Merged
merged 6 commits into from
Jun 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions catalog/DAGs.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ The following are DAGs grouped by their primary tag:
1. [Database](#database)
1. [Maintenance](#maintenance)
1. [Oauth](#oauth)
1. [Other](#other)
1. [Provider](#provider)
1. [Provider Reingestion](#provider-reingestion)

Expand Down Expand Up @@ -63,6 +64,12 @@ The following are DAGs grouped by their primary tag:
| [`oauth2_authorization`](#oauth2_authorization) | `None` |
| [`oauth2_token_refresh`](#oauth2_token_refresh) | `0 */12 * * *` |

## Other

| DAG ID | Schedule Interval |
| --------------------------------------------------------- | ----------------- |
| [`flickr_thumbnails_removal`](#flickr_thumbnails_removal) | `None` |

## Provider

| DAG ID | Schedule Interval | Dated | Media Type(s) |
Expand Down Expand Up @@ -113,6 +120,7 @@ The following is documentation associated with each DAG (where available):
1. [`finnish_museums_workflow`](#finnish_museums_workflow)
1. [`flickr_audit_sub_provider_workflow`](#flickr_audit_sub_provider_workflow)
1. [`flickr_reingestion_workflow`](#flickr_reingestion_workflow)
1. [`flickr_thumbnails_removal`](#flickr_thumbnails_removal)
1. [`flickr_workflow`](#flickr_workflow)
1. [`freesound_workflow`](#freesound_workflow)
1. [`image_data_refresh`](#image_data_refresh)
Expand Down Expand Up @@ -394,6 +402,11 @@ Output: TSV file containing the images and the respective meta-data.

Notes: https://www.flickr.com/help/terms/api Rate limit: 3600 requests per hour.

## `flickr_thumbnails_removal`

One-time run DAG to remove progressively all the old Flickr thumbnails, as they
were determined to be unsuitable for the Openverse UI requirements.

## `flickr_workflow`

Content Provider: Flickr
Expand Down
80 changes: 80 additions & 0 deletions catalog/dags/flickr_thumbs_removal.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
"""
One-time run DAG to remove progressively all the old Flickr thumbnails,
as they were determined to be unsuitable for the Openverse UI requirements.
"""
import logging
from datetime import timedelta
from textwrap import dedent

from airflow.decorators import dag, task

from common.constants import DAG_DEFAULT_ARGS, POSTGRES_CONN_ID
from common.slack import send_message
from common.sql import PostgresHook


logger = logging.getLogger(__name__)


DAG_ID = "flickr_thumbnails_removal"


@dag(
dag_id=DAG_ID,
default_args={
**DAG_DEFAULT_ARGS,
"retries": 0,
"execution_timeout": timedelta(days=7),
},
schedule=None,
catchup=False,
doc_md=__doc__,
)
def flickr_thumbnails_removal():
pg = PostgresHook(postgres_conn_id=POSTGRES_CONN_ID)
select_conditions = "FROM image WHERE provider = 'flickr' AND thumbnail IS NOT NULL"

@task()
def count():
num_thumbs = pg.get_first(f"SELECT COUNT(*) {select_conditions}")[0]
logger.info(f"Flickr thumbnails found: {num_thumbs}.")

return num_thumbs

@task()
def delete(num_thumbs):
log_sql = True
if num_thumbs == 0:
logger.info("No Flickr thumbnails found.")

while num_thumbs > 0:
query = dedent(
f"""
UPDATE image SET thumbnail = NULL WHERE identifier IN (
SELECT identifier {select_conditions}
FETCH FIRST 10000 ROWS ONLY FOR UPDATE SKIP LOCKED
)
"""
)
pg.run(query, log_sql=log_sql)
num_thumbs -= 10000
logger.info(
f"Flickr thumbnails left: {num_thumbs if num_thumbs > 0 else 0}."
)
log_sql = False

@task()
def report():
msg = (
"All Flickr thumbnails were successfully removed. "
f"The `{DAG_ID}` DAG can be retired."
)
send_message(msg, DAG_ID)

num_thumbs = count()
d = delete(num_thumbs)
r = report()
d >> r


flickr_thumbnails_removal()
8 changes: 4 additions & 4 deletions catalog/dags/providers/provider_api_scripts/flickr.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,9 @@ def ingest_records(self, **kwargs):
for start_ts, end_ts in self.large_batches:
# For each large batch, ingest records for that interval one license
# type at a time.
for license in LICENSE_INFO.keys():
for license_ in LICENSE_INFO.keys():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for my own edification, what is the reason behind this rename?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pycharm was complaining it was shadowing the built-in name.

super().ingest_records_for_timestamp_pair(
start_ts=start_ts, end_ts=end_ts, license=license
start_ts=start_ts, end_ts=end_ts, license=license_
)
logger.info("Completed large batch processing by license type.")

Expand All @@ -139,14 +139,14 @@ def get_next_query_params(self, prev_query_params, **kwargs):

# license will be available in the params if we're dealing
# with a large batch. If not, fall back to all licenses
license = kwargs.get("license", self.default_license_param)
license_ = kwargs.get("license", self.default_license_param)

return {
"min_upload_date": start_timestamp,
"max_upload_date": end_timestamp,
"page": 0,
"api_key": self.api_key,
"license": license,
"license": license_,
"per_page": self.batch_limit,
"method": "flickr.photos.search",
"media": "photos",
Expand Down
4 changes: 2 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ services:
context: ./docker/upstream_db/
target: db
image: openverse-upstream_db
expose:
- "5432"
ports:
- "50255:5432"
volumes:
- catalog-postgres:/var/lib/postgresql/data
- ./sample_data:/sample_data
Expand Down