Skip to content

Commit

Permalink
Update data refresh to use media tables (#2818)
Browse files Browse the repository at this point in the history
* Remove popularity steps from data refresh dags

* Update ingestion server to copy from media tables

* Fix swapped table names

* Remove tests that are no longer applicable

* Update DAG docs

* Fix ingestion server tests

* Correct method signature in test

* Capitalize Openverse
  • Loading branch information
stacimc authored Aug 25, 2023
1 parent 29a65ee commit cf5778a
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 355 deletions.
52 changes: 16 additions & 36 deletions catalog/DAGs.md
Original file line number Diff line number Diff line change
Expand Up @@ -202,24 +202,14 @@ airflow dags trigger --conf
### Data Refresh DAG Factory

This file generates our data refresh DAGs using a factory function. For the
given media type these DAGs will first refresh the popularity data, then
initiate a data refresh on the data refresh server and await the success or
failure of that task.

Popularity data for each media type is collated in a materialized view. Before
initiating a data refresh, the DAG will first refresh the view in order to
update popularity data for records that have been ingested since the last
refresh. On the first run of the month, the DAG will also refresh the underlying
tables, including the percentile values and any new popularity metrics. The DAG
can also be run with the `force_refresh_metrics` option to run this refresh
after the first of the month.

Once this step is complete, the data refresh can be initiated. A data refresh
occurs on the Ingestion server in the openverse project. This is a task which
imports data from the upstream Catalog database into the API, copies contents to
a new Elasticsearch index, and finally makes the index "live". This process is
necessary to make new content added to the Catalog by our provider DAGs
available to the API. You can read more in the
given media type these DAGs will initiate a data refresh on the ingestion server
and await the success or failure of that task.

A data refresh occurs on the Ingestion server in the Openverse project. This is
a task which imports data from the upstream Catalog database into the API,
copies contents to a new Elasticsearch index, and finally makes the index
"live". This process is necessary to make new content added to the Catalog by
our provider DAGs available to the API. You can read more in the
[README](https://github.com/WordPress/openverse/blob/main/ingestion_server/README.md)
Importantly, the data refresh TaskGroup is also configured to handle concurrency
requirements of the Ingestion server. Finally, once the origin indexes have been
Expand Down Expand Up @@ -535,24 +525,14 @@ This script can be run either to ingest the full dataset or as a dated DAG.
### Data Refresh DAG Factory

This file generates our data refresh DAGs using a factory function. For the
given media type these DAGs will first refresh the popularity data, then
initiate a data refresh on the data refresh server and await the success or
failure of that task.

Popularity data for each media type is collated in a materialized view. Before
initiating a data refresh, the DAG will first refresh the view in order to
update popularity data for records that have been ingested since the last
refresh. On the first run of the month, the DAG will also refresh the underlying
tables, including the percentile values and any new popularity metrics. The DAG
can also be run with the `force_refresh_metrics` option to run this refresh
after the first of the month.

Once this step is complete, the data refresh can be initiated. A data refresh
occurs on the Ingestion server in the openverse project. This is a task which
imports data from the upstream Catalog database into the API, copies contents to
a new Elasticsearch index, and finally makes the index "live". This process is
necessary to make new content added to the Catalog by our provider DAGs
available to the API. You can read more in the
given media type these DAGs will initiate a data refresh on the ingestion server
and await the success or failure of that task.

A data refresh occurs on the Ingestion server in the Openverse project. This is
a task which imports data from the upstream Catalog database into the API,
copies contents to a new Elasticsearch index, and finally makes the index
"live". This process is necessary to make new content added to the Catalog by
our provider DAGs available to the API. You can read more in the
[README](https://github.com/WordPress/openverse/blob/main/ingestion_server/README.md)
Importantly, the data refresh TaskGroup is also configured to handle concurrency
requirements of the Ingestion server. Finally, once the origin indexes have been
Expand Down
148 changes: 7 additions & 141 deletions catalog/dags/data_refresh/dag_factory.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,10 @@
"""
# Data Refresh DAG Factory
This file generates our data refresh DAGs using a factory function.
For the given media type these DAGs will first refresh the popularity data,
then initiate a data refresh on the data refresh server and await the
success or failure of that task.
For the given media type these DAGs will initiate a data refresh on the
ingestion server and await the success or failure of that task.
Popularity data for each media type is collated in a materialized view. Before
initiating a data refresh, the DAG will first refresh the view in order to
update popularity data for records that have been ingested since the last refresh.
On the first run of the month, the DAG will also refresh the underlying tables,
including the percentile values and any new popularity metrics. The DAG can also
be run with the `force_refresh_metrics` option to run this refresh after the first
of the month.
Once this step is complete, the data refresh can be initiated. A data refresh
occurs on the Ingestion server in the openverse project. This is a task
A data refresh occurs on the Ingestion server in the Openverse project. This is a task
which imports data from the upstream Catalog database into the API, copies contents
to a new Elasticsearch index, and finally makes the index "live". This process is
necessary to make new content added to the Catalog by our provider DAGs available
Expand All @@ -36,19 +26,8 @@
from collections.abc import Sequence

from airflow import DAG
from airflow.models.dagrun import DagRun
from airflow.operators.python import BranchPythonOperator, PythonOperator
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.settings import SASession
from airflow.utils.session import provide_session
from airflow.utils.state import State
from popularity.refresh_popularity_metrics_task_factory import (
GROUP_ID as REFRESH_POPULARITY_METRICS_GROUP_ID,
)
from popularity.refresh_popularity_metrics_task_factory import (
UPDATE_MEDIA_POPULARITY_METRICS_TASK_ID,
create_refresh_popularity_metrics_task_group,
)

from common.constants import (
DAG_DEFAULT_ARGS,
Expand All @@ -58,26 +37,11 @@
from common.sql import PGExecuteQueryOperator
from data_refresh.data_refresh_task_factory import create_data_refresh_task_group
from data_refresh.data_refresh_types import DATA_REFRESH_CONFIGS, DataRefresh
from data_refresh.recreate_view_data_task_factory import DROP_DB_VIEW_TASK_ID
from data_refresh.recreate_view_data_task_factory import (
GROUP_ID as RECREATE_MATVIEW_GROUP_ID,
)
from data_refresh.recreate_view_data_task_factory import create_recreate_view_data_task
from data_refresh.reporting import report_record_difference, report_status
from data_refresh.reporting import report_record_difference


logger = logging.getLogger(__name__)

# The first task in the recreate_matview TaskGroup
RECREATE_MATERIALIZED_VIEW_TASK_ID = (
f"{RECREATE_MATVIEW_GROUP_ID}.{DROP_DB_VIEW_TASK_ID}"
)
# The first task in the refresh_popularity_metrics TaskGroup
REFRESH_POPULARITY_METRICS_TASK_ID = (
f"{REFRESH_POPULARITY_METRICS_GROUP_ID}"
f".{UPDATE_MEDIA_POPULARITY_METRICS_TASK_ID}"
)


def _single_value(cursor):
try:
Expand All @@ -87,85 +51,11 @@ def _single_value(cursor):
raise ValueError("Unable to extract expected row data from cursor") from e


@provide_session
def _month_check(dag_id: str, session: SASession = None) -> str:
"""
Check whether there has been a previous DagRun this month.
If so, return the task_id for the matview refresh task; else, return the
task_id for refresh popularity metrics task.
Required Arguments:
dag_id: id of the currently running Dag
"""
# Get the current DagRun
DR = DagRun
current_dagrun = (
session.query(DR).filter(DR.dag_id == dag_id, DR.state == State.RUNNING)
).first()

# If `force_refresh_metrics` has been passed in the dagrun config, then
# immediately return the task_id to refresh popularity metrics without
# doing the month check.
force_refresh_metrics = current_dagrun.conf.get("force_refresh_metrics")
if force_refresh_metrics is not None:
logger.info(f"`force_refresh_metrics` is set to {force_refresh_metrics}.")
return (
REFRESH_POPULARITY_METRICS_TASK_ID
if force_refresh_metrics
else RECREATE_MATERIALIZED_VIEW_TASK_ID
)

# Get the most recent successful dagrun for this Dag
latest_dagrun = (
session.query(DR)
.filter(DR.dag_id == dag_id, DR.state == State.SUCCESS)
.order_by(DR.start_date.desc())
).first()

# No previous successful dagrun, refresh all popularity data.
if latest_dagrun is None:
return REFRESH_POPULARITY_METRICS_TASK_ID

# Check if the last dagrun was in the same month as the current run
current_date = current_dagrun.start_date
last_dagrun_date = latest_dagrun.start_date
is_last_dagrun_in_current_month = (
current_date.month == last_dagrun_date.month
and current_date.year == last_dagrun_date.year
)

return (
REFRESH_POPULARITY_METRICS_TASK_ID
if not is_last_dagrun_in_current_month
else RECREATE_MATERIALIZED_VIEW_TASK_ID
)


def _month_check_with_reporting(dag_id: str, media_type: str) -> str:
"""
Wrap the monthly check function.
This reports which step is starting and which step is next to slack.
"""
next_task_id = _month_check(dag_id)
next_step = {
REFRESH_POPULARITY_METRICS_TASK_ID: "update popularity metrics",
RECREATE_MATERIALIZED_VIEW_TASK_ID: "recreate matview",
}.get(next_task_id, "unable to determine next step")
message = f":horse_racing: Starting data refresh | _Next: {next_step}_"
report_status(media_type, message, dag_id)

return next_task_id


def create_data_refresh_dag(data_refresh: DataRefresh, external_dag_ids: Sequence[str]):
"""
Instantiate a DAG for a data refresh.
This DAG will run the popularity calculation and subsequent data refresh for the
given `media_type`.
This DAG will run the data refresh for the given `media_type`.
Required Arguments:
Expand Down Expand Up @@ -206,16 +96,6 @@ def create_data_refresh_dag(data_refresh: DataRefresh, external_dag_ids: Sequenc
AND n.nspname = 'public';
"""

# Check if this is the first DagRun of the month for this DAG.
month_check = BranchPythonOperator(
task_id="month_check",
python_callable=_month_check_with_reporting,
op_kwargs={
"dag_id": data_refresh.dag_id,
"media_type": data_refresh.media_type,
},
)

# Get the current number of records in the target API table
before_record_count = PGExecuteQueryOperator(
task_id="get_before_record_count",
Expand All @@ -225,19 +105,7 @@ def create_data_refresh_dag(data_refresh: DataRefresh, external_dag_ids: Sequenc
return_last=True,
)

# Refresh underlying popularity tables. This is required infrequently in order
# to update new popularity metrics and constants, so this branch is only taken
# if it is the first run of the month (or when forced).
refresh_popularity_metrics = create_refresh_popularity_metrics_task_group(
data_refresh
)

# Drop and recreate the materialized view. This occurs on all DagRuns and
# updates popularity data for newly ingested records. Formerly, we would refresh
# the materialized view, but that process would consistently time out.
recreate_matview = create_recreate_view_data_task(data_refresh)

# Trigger the actual data refresh on the remote data refresh server, and wait
# Trigger the data refresh on the remote ingestion server, and wait
# for it to complete.
data_refresh_group = create_data_refresh_task_group(
data_refresh, external_dag_ids
Expand Down Expand Up @@ -288,9 +156,7 @@ def create_data_refresh_dag(data_refresh: DataRefresh, external_dag_ids: Sequenc
)

# Set up task dependencies
month_check >> [refresh_popularity_metrics, recreate_matview]
before_record_count >> data_refresh_group
refresh_popularity_metrics >> recreate_matview >> data_refresh_group
data_refresh_group >> after_record_count >> report_counts
data_refresh_group >> trigger_filtered_index_creation

Expand Down
Loading

0 comments on commit cf5778a

Please sign in to comment.