Skip to content

Commit

Permalink
Add popularity refresh DAGs (#2592)
Browse files Browse the repository at this point in the history
* Move popularity metrics task factory into new popularity module

* Add popularity refresh dag factory

* Add tests

* Pass timeout correctly

* Add DAG docs

* Remove TODO

* Remove unused arg, reorder

* Move REFRESH_POKE_INTERVAL into constant

* Ensure last_updated_date is set after refreshing constants, add tests

* Add triggerer to docker-compose
  • Loading branch information
stacimc authored Jul 25, 2023
1 parent fc6006d commit 3d83f48
Show file tree
Hide file tree
Showing 13 changed files with 541 additions and 22 deletions.
56 changes: 56 additions & 0 deletions catalog/DAGs.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ The following are DAGs grouped by their primary tag:
1. [Maintenance](#maintenance)
1. [Oauth](#oauth)
1. [Other](#other)
1. [Popularity Refresh](#popularity_refresh)
1. [Provider](#provider)
1. [Provider Reingestion](#provider-reingestion)

Expand Down Expand Up @@ -71,6 +72,13 @@ The following are DAGs grouped by their primary tag:
| --------------------------------------------------------- | ----------------- |
| [`flickr_thumbnails_removal`](#flickr_thumbnails_removal) | `None` |

## Popularity Refresh

| DAG ID | Schedule Interval |
| ------------------------------------------------------- | ----------------- |
| [`audio_popularity_refresh`](#audio_popularity_refresh) | `None` |
| [`image_popularity_refresh`](#image_popularity_refresh) | `None` |

## Provider

| DAG ID | Schedule Interval | Dated | Media Type(s) |
Expand Down Expand Up @@ -113,6 +121,7 @@ The following is documentation associated with each DAG (where available):
1. [`add_license_url`](#add_license_url)
1. [`airflow_log_cleanup`](#airflow_log_cleanup)
1. [`audio_data_refresh`](#audio_data_refresh)
1. [`audio_popularity_refresh`](#audio_popularity_refresh)
1. [`batched_update`](#batched_update)
1. [`check_silenced_dags`](#check_silenced_dags)
1. [`create_filtered_audio_index`](#create_filtered_audio_index)
Expand All @@ -126,6 +135,7 @@ The following is documentation associated with each DAG (where available):
1. [`flickr_workflow`](#flickr_workflow)
1. [`freesound_workflow`](#freesound_workflow)
1. [`image_data_refresh`](#image_data_refresh)
1. [`image_popularity_refresh`](#image_popularity_refresh)
1. [`inaturalist_workflow`](#inaturalist_workflow)
1. [`jamendo_workflow`](#jamendo_workflow)
1. [`metropolitan_museum_reingestion_workflow`](#metropolitan_museum_reingestion_workflow)
Expand Down Expand Up @@ -221,6 +231,29 @@ and related PRs:
- [[Feature] Data refresh orchestration DAG](https://github.com/WordPress/openverse-catalog/issues/353)
- [[Feature] Merge popularity calculations and data refresh into a single DAG](https://github.com/WordPress/openverse-catalog/issues/453)

## `audio_popularity_refresh`

### Popularity Refresh DAG Factory

This file generates our popularity refresh DAGs using a factory function.

For the given media type these DAGs will first update the popularity metrics
table, adding any new metrics and updating the percentile that is used in
calculating the popularity constants. It then refreshes the popularity constants
view, which recalculates the popularity constant for each provider.

Once the constants have been updated, the DAG will trigger a `batched_update`
DagRun for each provider of this media_type that is configured to support
popularity data. The batched update recalculates standardized popularity scores
for all records, using the new constant. When the updates are complete, all
records have up-to-date popularity data. This DAG can be run concurrently with
data refreshes and regular ingestion.

You can find more background information on this process in the following
implementation plan:

- [[Implementation Plan] Decoupling Popularity Calculations from the Data Refresh](https://docs.openverse.org/projects/proposals/popularity_optimizations/20230420-implementation_plan_popularity_optimizations.html)

## `batched_update`

Batched Update DAG
Expand Down Expand Up @@ -531,6 +564,29 @@ and related PRs:
- [[Feature] Data refresh orchestration DAG](https://github.com/WordPress/openverse-catalog/issues/353)
- [[Feature] Merge popularity calculations and data refresh into a single DAG](https://github.com/WordPress/openverse-catalog/issues/453)

## `image_popularity_refresh`

### Popularity Refresh DAG Factory

This file generates our popularity refresh DAGs using a factory function.

For the given media type these DAGs will first update the popularity metrics
table, adding any new metrics and updating the percentile that is used in
calculating the popularity constants. It then refreshes the popularity constants
view, which recalculates the popularity constant for each provider.

Once the constants have been updated, the DAG will trigger a `batched_update`
DagRun for each provider of this media_type that is configured to support
popularity data. The batched update recalculates standardized popularity scores
for all records, using the new constant. When the updates are complete, all
records have up-to-date popularity data. This DAG can be run concurrently with
data refreshes and regular ingestion.

You can find more background information on this process in the following
implementation plan:

- [[Implementation Plan] Decoupling Popularity Calculations from the Data Refresh](https://docs.openverse.org/projects/proposals/popularity_optimizations/20230420-implementation_plan_popularity_optimizations.html)

## `inaturalist_workflow`

Provider: iNaturalist
Expand Down
1 change: 1 addition & 0 deletions catalog/dags/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,4 @@
AWS_CONN_ID = "aws_default"
AWS_RDS_CONN_ID = os.environ.get("AWS_RDS_CONN_ID", AWS_CONN_ID)
ES_PROD_HTTP_CONN_ID = "elasticsearch_http_production"
REFRESH_POKE_INTERVAL = int(os.getenv("DATA_REFRESH_POKE_INTERVAL", 60 * 30))
13 changes: 8 additions & 5 deletions catalog/dags/common/ingestion_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@
from airflow.providers.http.sensors.http import HttpSensor
from requests import Response

from common.constants import ES_PROD_HTTP_CONN_ID, XCOM_PULL_TEMPLATE
from common.constants import (
ES_PROD_HTTP_CONN_ID,
REFRESH_POKE_INTERVAL,
XCOM_PULL_TEMPLATE,
)


logger = logging.getLogger(__name__)


POKE_INTERVAL = int(os.getenv("DATA_REFRESH_POKE_INTERVAL", 60 * 15))
# Minimum number of records we expect to get back from ES when querying an index.
THRESHOLD_RESULT_COUNT = int(os.getenv("ES_INDEX_READINESS_RECORD_COUNT", 10_000))

Expand Down Expand Up @@ -118,7 +121,7 @@ def wait_for_task(
action: str,
task_trigger: SimpleHttpOperator,
timeout: timedelta,
poke_interval: int = POKE_INTERVAL,
poke_interval: int = REFRESH_POKE_INTERVAL,
) -> HttpSensor:
return HttpSensor(
task_id=f"wait_for_{action.lower()}",
Expand All @@ -137,7 +140,7 @@ def trigger_and_wait_for_task(
model: str,
timeout: timedelta,
data: dict | None = None,
poke_interval: int = POKE_INTERVAL,
poke_interval: int = REFRESH_POKE_INTERVAL,
) -> tuple[SimpleHttpOperator, HttpSensor]:
trigger = trigger_task(action, model, data)
waiter = wait_for_task(action, trigger, timeout, poke_interval)
Expand All @@ -149,7 +152,7 @@ def index_readiness_check(
media_type: str,
index_suffix: str,
timeout: timedelta = timedelta(days=1),
poke_interval: int = POKE_INTERVAL,
poke_interval: int = REFRESH_POKE_INTERVAL,
) -> HttpSensor:
"""
Poll the Elasticsearch index, returning true only when results greater
Expand Down
48 changes: 48 additions & 0 deletions catalog/dags/common/popularity/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,54 @@ def create_media_view(
postgres.run(create_audioset_view_query())


def get_providers_with_popularity_data_for_media_type(
postgres_conn_id: str,
media_type: str = IMAGE,
constants_view: str = IMAGE_POPULARITY_CONSTANTS_VIEW,
pg_timeout: float = timedelta(minutes=10).total_seconds(),
):
"""
Return a list of distinct `provider`s that support popularity data,
for the given media type.
"""
if media_type == AUDIO:
constants_view = AUDIO_POPULARITY_CONSTANTS_VIEW

postgres = PostgresHook(
postgres_conn_id=postgres_conn_id, default_statement_timeout=pg_timeout
)
providers = postgres.get_records(
f"SELECT DISTINCT provider FROM public.{constants_view};"
)

return [x[0] for x in providers]


def format_update_standardized_popularity_query(
media_type=IMAGE,
standardized_popularity_func=STANDARDIZED_IMAGE_POPULARITY_FUNCTION,
table_name=TABLE_NAMES[IMAGE],
db_columns=IMAGE_TABLE_COLUMNS,
db_view_name=IMAGE_VIEW_NAME,
db_view_id_idx=IMAGE_VIEW_ID_IDX,
db_view_provider_fid_idx=IMAGE_VIEW_PROVIDER_FID_IDX,
task: AbstractOperator = None,
):
"""
Create a SQL query for updating the standardized popularity for the given
media type. Only the `SET ...` portion of the query is returned, to be used
by a `batched_update` DagRun.
"""
if media_type == AUDIO:
table_name = TABLE_NAMES[AUDIO]
standardized_popularity_func = STANDARDIZED_AUDIO_POPULARITY_FUNCTION

return (
f"SET {col.STANDARDIZED_POPULARITY.db_name} = {standardized_popularity_func}"
f"({table_name}.{PARTITION}, {table_name}.{METADATA_COLUMN})"
)


def update_db_view(
postgres_conn_id, media_type=IMAGE, db_view_name=IMAGE_VIEW_NAME, task=None
):
Expand Down
14 changes: 7 additions & 7 deletions catalog/dags/data_refresh/dag_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@
from airflow.settings import SASession
from airflow.utils.session import provide_session
from airflow.utils.state import State
from popularity.refresh_popularity_metrics_task_factory import (
GROUP_ID as REFRESH_POPULARITY_METRICS_GROUP_ID,
)
from popularity.refresh_popularity_metrics_task_factory import (
UPDATE_MEDIA_POPULARITY_METRICS_TASK_ID,
create_refresh_popularity_metrics_task_group,
)

from common.constants import (
DAG_DEFAULT_ARGS,
Expand All @@ -56,13 +63,6 @@
GROUP_ID as RECREATE_MATVIEW_GROUP_ID,
)
from data_refresh.recreate_view_data_task_factory import create_recreate_view_data_task
from data_refresh.refresh_popularity_metrics_task_factory import (
GROUP_ID as REFRESH_POPULARITY_METRICS_GROUP_ID,
)
from data_refresh.refresh_popularity_metrics_task_factory import (
UPDATE_MEDIA_POPULARITY_METRICS_TASK_ID,
create_refresh_popularity_metrics_task_group,
)
from data_refresh.reporting import report_record_difference, report_status


Expand Down
7 changes: 3 additions & 4 deletions catalog/dags/data_refresh/data_refresh_task_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
from airflow.utils.task_group import TaskGroup

from common import ingestion_server
from common.constants import XCOM_PULL_TEMPLATE
from common.constants import REFRESH_POKE_INTERVAL, XCOM_PULL_TEMPLATE
from common.sensors.single_run_external_dags_sensor import SingleRunExternalDAGsSensor
from common.sensors.utils import get_most_recent_dag_run
from data_refresh.data_refresh_types import DataRefresh
Expand Down Expand Up @@ -98,7 +98,6 @@ def create_data_refresh_task_group(
will not run concurrently with any dependent DAG.
"""

poke_interval = int(os.getenv("DATA_REFRESH_POKE_INTERVAL", 60 * 15))
target_alias = data_refresh.media_type # TODO: Change when using versioned aliases

with TaskGroup(group_id="data_refresh") as data_refresh_group:
Expand All @@ -108,7 +107,7 @@ def create_data_refresh_task_group(
task_id="wait_for_data_refresh",
external_dag_ids=external_dag_ids,
check_existence=True,
poke_interval=poke_interval,
poke_interval=REFRESH_POKE_INTERVAL,
mode="reschedule",
pool=DATA_REFRESH_POOL,
)
Expand All @@ -131,7 +130,7 @@ def create_data_refresh_task_group(
# Wait for the whole DAG, not just a part of it
external_task_id=None,
check_existence=False,
poke_interval=poke_interval,
poke_interval=REFRESH_POKE_INTERVAL,
execution_date_fn=lambda _: get_most_recent_dag_run(
create_filtered_index_dag_id
),
Expand Down
Empty file.
Loading

0 comments on commit 3d83f48

Please sign in to comment.