From 3d83f488978fa8b20cb62f611c2ab7a335270d8e Mon Sep 17 00:00:00 2001 From: Staci Mullins <63313398+stacimc@users.noreply.github.com> Date: Tue, 25 Jul 2023 13:19:39 -0700 Subject: [PATCH] Add popularity refresh DAGs (#2592) * Move popularity metrics task factory into new popularity module * Add popularity refresh dag factory * Add tests * Pass timeout correctly * Add DAG docs * Remove TODO * Remove unused arg, reorder * Move REFRESH_POKE_INTERVAL into constant * Ensure last_updated_date is set after refreshing constants, add tests * Add triggerer to docker-compose --- catalog/DAGs.md | 56 +++++ catalog/dags/common/constants.py | 1 + catalog/dags/common/ingestion_server.py | 13 +- catalog/dags/common/popularity/sql.py | 48 +++++ catalog/dags/data_refresh/dag_factory.py | 14 +- .../data_refresh/data_refresh_task_factory.py | 7 +- catalog/dags/popularity/__init__.py | 0 catalog/dags/popularity/dag_factory.py | 201 ++++++++++++++++++ .../popularity/popularity_refresh_types.py | 61 ++++++ ...refresh_popularity_metrics_task_factory.py | 15 +- .../common/popularity/test_dag_factory.py | 95 +++++++++ .../tests/dags/common/popularity/test_sql.py | 41 ++++ docker-compose.yml | 11 + 13 files changed, 541 insertions(+), 22 deletions(-) create mode 100644 catalog/dags/popularity/__init__.py create mode 100644 catalog/dags/popularity/dag_factory.py create mode 100644 catalog/dags/popularity/popularity_refresh_types.py rename catalog/dags/{data_refresh => popularity}/refresh_popularity_metrics_task_factory.py (88%) create mode 100644 catalog/tests/dags/common/popularity/test_dag_factory.py diff --git a/catalog/DAGs.md b/catalog/DAGs.md index 8ca21abe20a..47bb1878762 100644 --- a/catalog/DAGs.md +++ b/catalog/DAGs.md @@ -20,6 +20,7 @@ The following are DAGs grouped by their primary tag: 1. [Maintenance](#maintenance) 1. [Oauth](#oauth) 1. [Other](#other) +1. [Popularity Refresh](#popularity_refresh) 1. [Provider](#provider) 1. [Provider Reingestion](#provider-reingestion) @@ -71,6 +72,13 @@ The following are DAGs grouped by their primary tag: | --------------------------------------------------------- | ----------------- | | [`flickr_thumbnails_removal`](#flickr_thumbnails_removal) | `None` | +## Popularity Refresh + +| DAG ID | Schedule Interval | +| ------------------------------------------------------- | ----------------- | +| [`audio_popularity_refresh`](#audio_popularity_refresh) | `None` | +| [`image_popularity_refresh`](#image_popularity_refresh) | `None` | + ## Provider | DAG ID | Schedule Interval | Dated | Media Type(s) | @@ -113,6 +121,7 @@ The following is documentation associated with each DAG (where available): 1. [`add_license_url`](#add_license_url) 1. [`airflow_log_cleanup`](#airflow_log_cleanup) 1. [`audio_data_refresh`](#audio_data_refresh) +1. [`audio_popularity_refresh`](#audio_popularity_refresh) 1. [`batched_update`](#batched_update) 1. [`check_silenced_dags`](#check_silenced_dags) 1. [`create_filtered_audio_index`](#create_filtered_audio_index) @@ -126,6 +135,7 @@ The following is documentation associated with each DAG (where available): 1. [`flickr_workflow`](#flickr_workflow) 1. [`freesound_workflow`](#freesound_workflow) 1. [`image_data_refresh`](#image_data_refresh) +1. [`image_popularity_refresh`](#image_popularity_refresh) 1. [`inaturalist_workflow`](#inaturalist_workflow) 1. [`jamendo_workflow`](#jamendo_workflow) 1. [`metropolitan_museum_reingestion_workflow`](#metropolitan_museum_reingestion_workflow) @@ -221,6 +231,29 @@ and related PRs: - [[Feature] Data refresh orchestration DAG](https://github.com/WordPress/openverse-catalog/issues/353) - [[Feature] Merge popularity calculations and data refresh into a single DAG](https://github.com/WordPress/openverse-catalog/issues/453) +## `audio_popularity_refresh` + +### Popularity Refresh DAG Factory + +This file generates our popularity refresh DAGs using a factory function. + +For the given media type these DAGs will first update the popularity metrics +table, adding any new metrics and updating the percentile that is used in +calculating the popularity constants. It then refreshes the popularity constants +view, which recalculates the popularity constant for each provider. + +Once the constants have been updated, the DAG will trigger a `batched_update` +DagRun for each provider of this media_type that is configured to support +popularity data. The batched update recalculates standardized popularity scores +for all records, using the new constant. When the updates are complete, all +records have up-to-date popularity data. This DAG can be run concurrently with +data refreshes and regular ingestion. + +You can find more background information on this process in the following +implementation plan: + +- [[Implementation Plan] Decoupling Popularity Calculations from the Data Refresh](https://docs.openverse.org/projects/proposals/popularity_optimizations/20230420-implementation_plan_popularity_optimizations.html) + ## `batched_update` Batched Update DAG @@ -531,6 +564,29 @@ and related PRs: - [[Feature] Data refresh orchestration DAG](https://github.com/WordPress/openverse-catalog/issues/353) - [[Feature] Merge popularity calculations and data refresh into a single DAG](https://github.com/WordPress/openverse-catalog/issues/453) +## `image_popularity_refresh` + +### Popularity Refresh DAG Factory + +This file generates our popularity refresh DAGs using a factory function. + +For the given media type these DAGs will first update the popularity metrics +table, adding any new metrics and updating the percentile that is used in +calculating the popularity constants. It then refreshes the popularity constants +view, which recalculates the popularity constant for each provider. + +Once the constants have been updated, the DAG will trigger a `batched_update` +DagRun for each provider of this media_type that is configured to support +popularity data. The batched update recalculates standardized popularity scores +for all records, using the new constant. When the updates are complete, all +records have up-to-date popularity data. This DAG can be run concurrently with +data refreshes and regular ingestion. + +You can find more background information on this process in the following +implementation plan: + +- [[Implementation Plan] Decoupling Popularity Calculations from the Data Refresh](https://docs.openverse.org/projects/proposals/popularity_optimizations/20230420-implementation_plan_popularity_optimizations.html) + ## `inaturalist_workflow` Provider: iNaturalist diff --git a/catalog/dags/common/constants.py b/catalog/dags/common/constants.py index 40fcaa9c4d7..5ec93f6e9e6 100644 --- a/catalog/dags/common/constants.py +++ b/catalog/dags/common/constants.py @@ -35,3 +35,4 @@ AWS_CONN_ID = "aws_default" AWS_RDS_CONN_ID = os.environ.get("AWS_RDS_CONN_ID", AWS_CONN_ID) ES_PROD_HTTP_CONN_ID = "elasticsearch_http_production" +REFRESH_POKE_INTERVAL = int(os.getenv("DATA_REFRESH_POKE_INTERVAL", 60 * 30)) diff --git a/catalog/dags/common/ingestion_server.py b/catalog/dags/common/ingestion_server.py index f3ac3eeff47..3ae2d9709cc 100644 --- a/catalog/dags/common/ingestion_server.py +++ b/catalog/dags/common/ingestion_server.py @@ -8,13 +8,16 @@ from airflow.providers.http.sensors.http import HttpSensor from requests import Response -from common.constants import ES_PROD_HTTP_CONN_ID, XCOM_PULL_TEMPLATE +from common.constants import ( + ES_PROD_HTTP_CONN_ID, + REFRESH_POKE_INTERVAL, + XCOM_PULL_TEMPLATE, +) logger = logging.getLogger(__name__) -POKE_INTERVAL = int(os.getenv("DATA_REFRESH_POKE_INTERVAL", 60 * 15)) # Minimum number of records we expect to get back from ES when querying an index. THRESHOLD_RESULT_COUNT = int(os.getenv("ES_INDEX_READINESS_RECORD_COUNT", 10_000)) @@ -118,7 +121,7 @@ def wait_for_task( action: str, task_trigger: SimpleHttpOperator, timeout: timedelta, - poke_interval: int = POKE_INTERVAL, + poke_interval: int = REFRESH_POKE_INTERVAL, ) -> HttpSensor: return HttpSensor( task_id=f"wait_for_{action.lower()}", @@ -137,7 +140,7 @@ def trigger_and_wait_for_task( model: str, timeout: timedelta, data: dict | None = None, - poke_interval: int = POKE_INTERVAL, + poke_interval: int = REFRESH_POKE_INTERVAL, ) -> tuple[SimpleHttpOperator, HttpSensor]: trigger = trigger_task(action, model, data) waiter = wait_for_task(action, trigger, timeout, poke_interval) @@ -149,7 +152,7 @@ def index_readiness_check( media_type: str, index_suffix: str, timeout: timedelta = timedelta(days=1), - poke_interval: int = POKE_INTERVAL, + poke_interval: int = REFRESH_POKE_INTERVAL, ) -> HttpSensor: """ Poll the Elasticsearch index, returning true only when results greater diff --git a/catalog/dags/common/popularity/sql.py b/catalog/dags/common/popularity/sql.py index 75a8228fab2..5799cf56556 100644 --- a/catalog/dags/common/popularity/sql.py +++ b/catalog/dags/common/popularity/sql.py @@ -439,6 +439,54 @@ def create_media_view( postgres.run(create_audioset_view_query()) +def get_providers_with_popularity_data_for_media_type( + postgres_conn_id: str, + media_type: str = IMAGE, + constants_view: str = IMAGE_POPULARITY_CONSTANTS_VIEW, + pg_timeout: float = timedelta(minutes=10).total_seconds(), +): + """ + Return a list of distinct `provider`s that support popularity data, + for the given media type. + """ + if media_type == AUDIO: + constants_view = AUDIO_POPULARITY_CONSTANTS_VIEW + + postgres = PostgresHook( + postgres_conn_id=postgres_conn_id, default_statement_timeout=pg_timeout + ) + providers = postgres.get_records( + f"SELECT DISTINCT provider FROM public.{constants_view};" + ) + + return [x[0] for x in providers] + + +def format_update_standardized_popularity_query( + media_type=IMAGE, + standardized_popularity_func=STANDARDIZED_IMAGE_POPULARITY_FUNCTION, + table_name=TABLE_NAMES[IMAGE], + db_columns=IMAGE_TABLE_COLUMNS, + db_view_name=IMAGE_VIEW_NAME, + db_view_id_idx=IMAGE_VIEW_ID_IDX, + db_view_provider_fid_idx=IMAGE_VIEW_PROVIDER_FID_IDX, + task: AbstractOperator = None, +): + """ + Create a SQL query for updating the standardized popularity for the given + media type. Only the `SET ...` portion of the query is returned, to be used + by a `batched_update` DagRun. + """ + if media_type == AUDIO: + table_name = TABLE_NAMES[AUDIO] + standardized_popularity_func = STANDARDIZED_AUDIO_POPULARITY_FUNCTION + + return ( + f"SET {col.STANDARDIZED_POPULARITY.db_name} = {standardized_popularity_func}" + f"({table_name}.{PARTITION}, {table_name}.{METADATA_COLUMN})" + ) + + def update_db_view( postgres_conn_id, media_type=IMAGE, db_view_name=IMAGE_VIEW_NAME, task=None ): diff --git a/catalog/dags/data_refresh/dag_factory.py b/catalog/dags/data_refresh/dag_factory.py index b00c3495cdb..d03bb70df15 100644 --- a/catalog/dags/data_refresh/dag_factory.py +++ b/catalog/dags/data_refresh/dag_factory.py @@ -42,6 +42,13 @@ from airflow.settings import SASession from airflow.utils.session import provide_session from airflow.utils.state import State +from popularity.refresh_popularity_metrics_task_factory import ( + GROUP_ID as REFRESH_POPULARITY_METRICS_GROUP_ID, +) +from popularity.refresh_popularity_metrics_task_factory import ( + UPDATE_MEDIA_POPULARITY_METRICS_TASK_ID, + create_refresh_popularity_metrics_task_group, +) from common.constants import ( DAG_DEFAULT_ARGS, @@ -56,13 +63,6 @@ GROUP_ID as RECREATE_MATVIEW_GROUP_ID, ) from data_refresh.recreate_view_data_task_factory import create_recreate_view_data_task -from data_refresh.refresh_popularity_metrics_task_factory import ( - GROUP_ID as REFRESH_POPULARITY_METRICS_GROUP_ID, -) -from data_refresh.refresh_popularity_metrics_task_factory import ( - UPDATE_MEDIA_POPULARITY_METRICS_TASK_ID, - create_refresh_popularity_metrics_task_group, -) from data_refresh.reporting import report_record_difference, report_status diff --git a/catalog/dags/data_refresh/data_refresh_task_factory.py b/catalog/dags/data_refresh/data_refresh_task_factory.py index 9ac2fa2bf87..6f6d78d1415 100644 --- a/catalog/dags/data_refresh/data_refresh_task_factory.py +++ b/catalog/dags/data_refresh/data_refresh_task_factory.py @@ -56,7 +56,7 @@ from airflow.utils.task_group import TaskGroup from common import ingestion_server -from common.constants import XCOM_PULL_TEMPLATE +from common.constants import REFRESH_POKE_INTERVAL, XCOM_PULL_TEMPLATE from common.sensors.single_run_external_dags_sensor import SingleRunExternalDAGsSensor from common.sensors.utils import get_most_recent_dag_run from data_refresh.data_refresh_types import DataRefresh @@ -98,7 +98,6 @@ def create_data_refresh_task_group( will not run concurrently with any dependent DAG. """ - poke_interval = int(os.getenv("DATA_REFRESH_POKE_INTERVAL", 60 * 15)) target_alias = data_refresh.media_type # TODO: Change when using versioned aliases with TaskGroup(group_id="data_refresh") as data_refresh_group: @@ -108,7 +107,7 @@ def create_data_refresh_task_group( task_id="wait_for_data_refresh", external_dag_ids=external_dag_ids, check_existence=True, - poke_interval=poke_interval, + poke_interval=REFRESH_POKE_INTERVAL, mode="reschedule", pool=DATA_REFRESH_POOL, ) @@ -131,7 +130,7 @@ def create_data_refresh_task_group( # Wait for the whole DAG, not just a part of it external_task_id=None, check_existence=False, - poke_interval=poke_interval, + poke_interval=REFRESH_POKE_INTERVAL, execution_date_fn=lambda _: get_most_recent_dag_run( create_filtered_index_dag_id ), diff --git a/catalog/dags/popularity/__init__.py b/catalog/dags/popularity/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/catalog/dags/popularity/dag_factory.py b/catalog/dags/popularity/dag_factory.py new file mode 100644 index 00000000000..826c0a12335 --- /dev/null +++ b/catalog/dags/popularity/dag_factory.py @@ -0,0 +1,201 @@ +""" +# Popularity Refresh DAG Factory + +This file generates our popularity refresh DAGs using a factory function. + +For the given media type these DAGs will first update the popularity metrics table, +adding any new metrics and updating the percentile that is used in calculating the +popularity constants. It then refreshes the popularity constants view, which +recalculates the popularity constant for each provider. + +Once the constants have been updated, the DAG will trigger a `batched_update` +DagRun for each provider of this media_type that is configured to support popularity +data. The batched update recalculates standardized popularity scores for all +records, using the new constant. When the updates are complete, all records have +up-to-date popularity data. This DAG can be run concurrently with data refreshes +and regular ingestion. + + +You can find more background information on this process in the following +implementation plan: + +- [[Implementation Plan] Decoupling Popularity Calculations from the Data Refresh]( +https://docs.openverse.org/projects/proposals/popularity_optimizations/20230420-implementation_plan_popularity_optimizations.html) +""" +import logging +from datetime import datetime + +from airflow import DAG +from airflow.decorators import task +from airflow.operators.trigger_dagrun import TriggerDagRunOperator +from popularity.popularity_refresh_types import ( + POPULARITY_REFRESH_CONFIGS, + PopularityRefresh, +) +from popularity.refresh_popularity_metrics_task_factory import ( + create_refresh_popularity_metrics_task_group, +) + +from common import slack +from common.constants import DAG_DEFAULT_ARGS, POSTGRES_CONN_ID, REFRESH_POKE_INTERVAL +from common.popularity import sql +from database.batched_update.constants import DAG_ID as BATCHED_UPDATE_DAG_ID + + +logger = logging.getLogger(__name__) + + +@task +def notify_slack(text: str, media_type: str, dag_id: str) -> None: + slack.send_message( + text, + username=f"{media_type} Popularity Refresh", + icon_emoji=":database:", + dag_id=dag_id, + ) + + +@task +def get_last_updated_time(): + """ + Return the current utc datetime, which will be used when building the batched + update queries. Once new popularity constants are calculated, they will + automatically be used to calculate popularity scores at ingestion. The popularity + refresh need only update the scores for records which were last updated before + this time. + + Getting this cutoff time is pulled out into a separate task to ensure that it + runs only when the `refresh_popularity_metrics_and_constants` task is complete. + """ + return datetime.utcnow() + + +@task +def get_providers_update_confs( + postgres_conn_id: str, + popularity_refresh: PopularityRefresh, + last_updated_time: datetime, +): + """ + Build a list of DagRun confs for each provider of this media type. The confs will + be used by the `batched_update` DAG to perform a batched update of all existing + records, to recalculate their standardized_popularity with the new popularity + constant. Providers that do not support popularity data are omitted. + """ + # For the media type, get a list of the providers who support popularity data + providers = sql.get_providers_with_popularity_data_for_media_type( + postgres_conn_id, popularity_refresh.media_type + ) + + # For each provider, create a conf that will be used by the batched_update to + # refresh standardized popularity scores. + return [ + { + # Uniquely identify the query + "query_id": ( + f"{provider}_popularity_refresh_{last_updated_time.strftime('%Y%m%d')}" + ), + "table_name": popularity_refresh.media_type, + # Query used to select records that should be refreshed + "select_query": ( + f"WHERE provider='{provider}' AND updated_on <" + f" '{last_updated_time.strftime('%Y-%m-%d %H:%M:%S')}'" + ), + # Query used to update the standardized_popularity + "update_query": sql.format_update_standardized_popularity_query( + popularity_refresh.media_type + ), + "batch_size": 10_000, + "update_timeout": ( + popularity_refresh.refresh_popularity_batch_timeout.total_seconds() + ), + "dry_run": False, + "resume_update": False, + } + for provider in providers + ] + + +def create_popularity_refresh_dag(popularity_refresh: PopularityRefresh): + """ + Instantiate a DAG for a popularity refresh. + + For the given media type, this DAG will recalculate the popularity constants and + then refresh all existing standardized popularity scores with the new constant. + + Required Arguments: + + popularity_refresh: dataclass containing configuration information for the DAG + """ + default_args = { + **DAG_DEFAULT_ARGS, + **popularity_refresh.default_args, + } + + dag = DAG( + dag_id=popularity_refresh.dag_id, + default_args=default_args, + start_date=popularity_refresh.start_date, + schedule=popularity_refresh.schedule, + max_active_runs=1, + catchup=False, + doc_md=__doc__, + tags=["popularity_refresh"], + ) + + with dag: + # Refresh the underlying popularity tables. This step recalculates the + # popularity constants, which will later be used to calculate updated + # standardized popularity scores. + refresh_popularity_metrics = create_refresh_popularity_metrics_task_group( + popularity_refresh + ) + + # Once popularity constants have been calculated, establish the cutoff time + # after which records that have updates do not need to be refreshed. + get_cutoff_time = get_last_updated_time() + + # For each provider that supports popularity data for this media type, trigger a + # batched_update to recalculate all old standardized popularity scores using the + # newly refreshed constant. + refresh_popularity_scores = TriggerDagRunOperator.partial( + task_id="refresh_popularity", + trigger_dag_id=BATCHED_UPDATE_DAG_ID, + # Wait for all the dagruns to finish + wait_for_completion=True, + # Release the worker slot while waiting + deferrable=True, + poke_interval=REFRESH_POKE_INTERVAL, + retries=0, + ).expand( + # Build the conf for each provider + conf=get_providers_update_confs( + POSTGRES_CONN_ID, popularity_refresh, get_cutoff_time + ) + ) + + notify_complete = notify_slack( + text=( + f"{popularity_refresh.media_type.capitalize()} Popularity Refresh" + " Complete" + ), + media_type=popularity_refresh.media_type, + dag_id=popularity_refresh.dag_id, + ) + + # Set up task dependencies + ( + refresh_popularity_metrics + >> get_cutoff_time + >> refresh_popularity_scores + >> notify_complete + ) + + return dag + + +# Generate a popularity refresh DAG for each media type. +for popularity_refresh in POPULARITY_REFRESH_CONFIGS: + globals()[popularity_refresh.dag_id] = create_popularity_refresh_dag( + popularity_refresh + ) diff --git a/catalog/dags/popularity/popularity_refresh_types.py b/catalog/dags/popularity/popularity_refresh_types.py new file mode 100644 index 00000000000..a8aefd46559 --- /dev/null +++ b/catalog/dags/popularity/popularity_refresh_types.py @@ -0,0 +1,61 @@ +""" +# Popularity Refresh DAG Configuration +This file defines the type for the `PopularityRefresh`, a dataclass containing +configuration for a Popularity Refresh DAG, and defines the actual +`POPULARITY_REFRESH_CONFIGS` for each of our media types. This configuration info +is used to generate the dynamic Popularity Refresh dags. +""" +from dataclasses import dataclass, field +from datetime import datetime, timedelta + + +@dataclass +class PopularityRefresh: + """ + Configuration object for a popularity refresh DAG. + + Required Constructor Arguments: + + media_type: str describing the media type to be refreshed. + + Optional Constructor Arguments: + + default_args: dictionary which is passed to the + airflow.dag.DAG __init__ method. + start_date: datetime.datetime giving the + first valid logical date of the DAG. + schedule: string giving the schedule on which the DAG + should be run. Passed to the + airflow.dag.DAG __init__ method. + refresh_popularity_batch_timeout: timedelta expressing the amount of time + refreshing popularity scores for an individual + batch of records may take. + refresh_metrics_timeout: timedelta expressing amount of time the + refresh popularity metrics and constants + may take. + """ + + dag_id: str = field(init=False) + media_type: str + default_args: dict | None = field(default_factory=dict) + start_date: datetime = datetime(2023, 1, 1) + # The default schedule is initially set to None while we assess the performance + # of refreshes. The schedule will be updated in + # https://github.com/WordPress/openverse/issues/2092 + schedule: str | None = None + # Initial timeouts are generous; they should be updated after assessing the + # performance in https://github.com/WordPress/openverse/issues/2092 + refresh_popularity_batch_timeout: timedelta = timedelta(hours=1) + refresh_metrics_timeout: timedelta = timedelta(hours=1) + + def __post_init__(self): + self.dag_id = f"{self.media_type}_popularity_refresh" + + +POPULARITY_REFRESH_CONFIGS = [ + PopularityRefresh( + media_type="image", + refresh_metrics_timeout=timedelta(hours=24), + ), + PopularityRefresh(media_type="audio"), +] diff --git a/catalog/dags/data_refresh/refresh_popularity_metrics_task_factory.py b/catalog/dags/popularity/refresh_popularity_metrics_task_factory.py similarity index 88% rename from catalog/dags/data_refresh/refresh_popularity_metrics_task_factory.py rename to catalog/dags/popularity/refresh_popularity_metrics_task_factory.py index ea1ca191126..f4e4e217005 100644 --- a/catalog/dags/data_refresh/refresh_popularity_metrics_task_factory.py +++ b/catalog/dags/popularity/refresh_popularity_metrics_task_factory.py @@ -10,6 +10,7 @@ """ from airflow.operators.python import PythonOperator from airflow.utils.task_group import TaskGroup +from popularity.popularity_refresh_types import PopularityRefresh from common.constants import POSTGRES_CONN_ID from common.popularity import sql @@ -22,7 +23,9 @@ UPDATE_MEDIA_POPULARITY_CONSTANTS_TASK_ID = "update_media_popularity_constants_view" -def create_refresh_popularity_metrics_task_group(data_refresh: DataRefresh): +def create_refresh_popularity_metrics_task_group( + refresh_config: DataRefresh | PopularityRefresh, +): """ Create tasks related to refreshing popularity statistics. @@ -33,10 +36,10 @@ def create_refresh_popularity_metrics_task_group(data_refresh: DataRefresh): Required Arguments: - data_refresh: configuration data for the data refresh + refresh_config: configuration data for the refresh """ - media_type = data_refresh.media_type - execution_timeout = data_refresh.refresh_metrics_timeout + media_type = refresh_config.media_type + execution_timeout = refresh_config.refresh_metrics_timeout with TaskGroup(group_id=GROUP_ID) as refresh_all_popularity_data: update_metrics = PythonOperator( @@ -58,7 +61,7 @@ def create_refresh_popularity_metrics_task_group(data_refresh: DataRefresh): python_callable=reporting.report_status, op_kwargs={ "media_type": media_type, - "dag_id": data_refresh.dag_id, + "dag_id": refresh_config.dag_id, "message": "Popularity metrics update complete | " "_Next: popularity constants view update_", }, @@ -83,7 +86,7 @@ def create_refresh_popularity_metrics_task_group(data_refresh: DataRefresh): python_callable=reporting.report_status, op_kwargs={ "media_type": media_type, - "dag_id": data_refresh.dag_id, + "dag_id": refresh_config.dag_id, "message": "Popularity constants view update complete | " "_Next: refresh matview_", }, diff --git a/catalog/tests/dags/common/popularity/test_dag_factory.py b/catalog/tests/dags/common/popularity/test_dag_factory.py new file mode 100644 index 00000000000..3e1d9befc1f --- /dev/null +++ b/catalog/tests/dags/common/popularity/test_dag_factory.py @@ -0,0 +1,95 @@ +from unittest import mock + +import pytest +from airflow.models import DagRun +from airflow.models.dag import DAG +from airflow.utils.session import create_session +from airflow.utils.timezone import datetime +from airflow.utils.types import DagRunType +from popularity.dag_factory import get_providers_update_confs +from popularity.popularity_refresh_types import PopularityRefresh + +from catalog.tests.test_utils.sql import POSTGRES_CONN_ID + + +TEST_DAG_ID = "popularity_refresh_dag_factory_test_dag" +TEST_DAG = DAG(TEST_DAG_ID, default_args={"owner": "airflow"}) +TEST_DAY = datetime(2023, 1, 1) + + +@pytest.fixture(autouse=True) +def clean_db(): + with create_session() as session: + session.query(DagRun).filter(DagRun.dag_id == TEST_DAG_ID).delete() + + +def _create_dagrun(start_date, dag_state, conf={}): + return TEST_DAG.create_dagrun( + start_date=start_date, + execution_date=start_date, + data_interval=(start_date, start_date), + state=dag_state, + run_type=DagRunType.MANUAL, + conf=conf, + ) + + +@pytest.mark.parametrize( + "providers, media_type, expected_confs", + [ + # No providers for this media type + ([], "image", []), + ( + ["foo_provider"], + "image", + [ + { + "query_id": "foo_provider_popularity_refresh_20230101", + "table_name": "image", + "select_query": "WHERE provider='foo_provider' AND updated_on < '2023-01-01 00:00:00'", + "update_query": "SET standardized_popularity = standardized_image_popularity(image.provider, image.meta_data)", + "batch_size": 10000, + "update_timeout": 3600.0, + "dry_run": False, + "resume_update": False, + }, + ], + ), + ( + ["my_provider", "your_provider"], + "audio", + [ + { + "query_id": "my_provider_popularity_refresh_20230101", + "table_name": "audio", + "select_query": "WHERE provider='my_provider' AND updated_on < '2023-01-01 00:00:00'", + "update_query": "SET standardized_popularity = standardized_audio_popularity(audio.provider, audio.meta_data)", + "batch_size": 10000, + "update_timeout": 3600.0, + "dry_run": False, + "resume_update": False, + }, + { + "query_id": "your_provider_popularity_refresh_20230101", + "table_name": "audio", + "select_query": "WHERE provider='your_provider' AND updated_on < '2023-01-01 00:00:00'", + "update_query": "SET standardized_popularity = standardized_audio_popularity(audio.provider, audio.meta_data)", + "batch_size": 10000, + "update_timeout": 3600.0, + "dry_run": False, + "resume_update": False, + }, + ], + ), + ], +) +def test_get_providers_update_confs(providers, media_type, expected_confs): + with mock.patch( + "common.popularity.sql.get_providers_with_popularity_data_for_media_type", + return_value=providers, + ): + actual_confs = get_providers_update_confs.function( + POSTGRES_CONN_ID, PopularityRefresh(media_type=media_type), TEST_DAY + ) + + assert actual_confs == expected_confs diff --git a/catalog/tests/dags/common/popularity/test_sql.py b/catalog/tests/dags/common/popularity/test_sql.py index 1573a6f8019..d54f72a71e4 100644 --- a/catalog/tests/dags/common/popularity/test_sql.py +++ b/catalog/tests/dags/common/popularity/test_sql.py @@ -382,6 +382,47 @@ def test_constants_view_handles_zeros_and_missing( assert expect_row == pytest.approx(sorted_row) +def test_get_providers_with_popularity_data_for_media_type( + postgres_with_image_table, table_info, mock_pg_hook_task +): + data_query = dedent( + f""" + INSERT INTO {table_info.image} ( + created_on, updated_on, provider, foreign_identifier, url, + meta_data, license, removed_from_source + ) + VALUES + ( + NOW(), NOW(), 'my_provider', 'fid_a', 'https://test.com/a.jpg', + '{{"views": 0, "description": "cats"}}', 'cc0', false + ), + ( + NOW(), NOW(), 'diff_provider', 'fid_b', 'https://test.com/b.jpg', + '{{"views": 50, "description": "cats"}}', 'cc0', false + ), + ( + NOW(), NOW(), 'provider_without_popularity', 'fid_b', 'https://test.com/b.jpg', + '{{"views": 50, "description": "cats"}}', 'cc0', false + ) + ; + """ + ) + metrics = { + "my_provider": {"metric": "views", "percentile": 0.8}, + "diff_provider": {"metric": "comments", "percentile": 0.8}, + } + _set_up_popularity_constants( + postgres_with_image_table, data_query, metrics, table_info, mock_pg_hook_task + ) + + expected_providers = ["diff_provider", "my_provider"] + actual_providers = sql.get_providers_with_popularity_data_for_media_type( + POSTGRES_CONN_ID, media_type="image", constants_view=table_info.constants + ) + + assert actual_providers == expected_providers + + def test_standardized_popularity_function_calculates( postgres_with_image_table, table_info, mock_pg_hook_task ): diff --git a/docker-compose.yml b/docker-compose.yml index 6bff3f6e448..22ff17db7e1 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -120,6 +120,16 @@ services: _AIRFLOW_WWW_USER_LASTNAME: Flow _AIRFLOW_WWW_USER_EMAIL: airflow@example.com + # Dev changes for the triggerer + triggerer: + <<: *airflow-common + depends_on: + - upstream_db + - s3 + expose: + - "8794" # Used for logs + command: triggerer + # Dev changes for the webserver container webserver: <<: *airflow-common @@ -127,6 +137,7 @@ services: - upstream_db - s3 - scheduler + - triggerer command: webserver ports: - "${AIRFLOW_PORT}:8080"