Skip to content

Commit

Permalink
Update documenation
Browse files Browse the repository at this point in the history
  • Loading branch information
stacimc committed May 2, 2024
1 parent be7b388 commit 9d2c216
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 25 deletions.
8 changes: 6 additions & 2 deletions catalog/dags/data_refresh/copy_data.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
"""
TODO Update
# Copy Data TaskGroup
TaskGroup for doing the copy data step
This module contains the Airflow tasks used for copying upstream (Catalog)
tables into new temporary tables in the downstream (API) database. This
is one of the initial steps of the data refresh. These temporary tables
will later be used to create new Elasticsearch indices, and ultimately
will be promoted to the live media tables in the API.
"""

Expand Down
34 changes: 17 additions & 17 deletions catalog/dags/data_refresh/dag_factory.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,27 @@
"""
# Data Refresh DAG Factory
TODO update
This file generates our data refresh DAGs using a factory function.
For the given media type these DAGs will initiate a data refresh on the
ingestion server and await the success or failure of that task.
A data refresh occurs on the Ingestion server in the Openverse project. This is a task
which imports data from the upstream Catalog database into the API, copies contents
to a new Elasticsearch index, and finally makes the index "live". This process is
necessary to make new content added to the Catalog by our provider DAGs available
to the API. You can read more in the [README](
https://github.com/WordPress/openverse/blob/main/ingestion_server/README.md
) Importantly, the data refresh TaskGroup is also configured to handle concurrency
requirements of the Ingestion server. Finally, once the origin indexes have been
refreshed, the corresponding filtered index creation DAG is triggered.
This file generates our data refresh DAGs for each media type and environment using a factory function. The data refresh is a process which makes new content added to the Catalog by our provider DAGs available to the API.
The data refresh has the following high level steps:
* Copy Data: An FDW extension is used to connect the API database to the upstream (catalog) database. The entire contents of the upstream media table are copied into a new temp table in the API database. This temp table will later replace the main media table in the API.
* Create Index: Create a new Elasticsearch index, matching the configuration of the existing media index.
* Distributed Reindex: Convert each record from the new temp table to the format required by an Elasticsearch document, and then reindex them into the newly created index.
* Create and Populate Filtered Index: Create a new Elasticsearch index matching the configuration of the existing filtered index, and then reindex documents into it from the new media index, applying appropriate filtering for sensitive terms.
* Reapply Constraints: Recreate indices and constraints from the original API tables on the new temp tables.
* Promote Table: Drop the old media table in the API and rename the temp table and its indices, which has the effect of promoting them/replacing the old table.
* Promote Index: Promote the new Elasticsearch index by unlinking the given alias from the existing index and moving it to the new one. (Used for both the main and filtered indices.)
* Delete Index: Delete the old Elasticsearch index. (Used for both the main and filtered indices.)
Importantly, the data refresh DAGs are also configured to handle concurrency
requirements of the reindexing steps.
You can find more background information on this process in the following
issues and related PRs:
- [[Feature] Data refresh orchestration DAG](
https://github.com/WordPress/openverse-catalog/issues/353)
- [[Implementation Plan] Ingestion Server Removal](
https://docs.openverse.org/projects/proposals/ingestion_server_removal/20240328-implementation_plan_ingestion_server_removal.html)
- [[Feature] Merge popularity calculations and data refresh into a single DAG](
https://github.com/WordPress/openverse-catalog/issues/453)
"""
Expand Down
65 changes: 59 additions & 6 deletions documentation/catalog/reference/DAGs.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,16 @@ The following are DAGs grouped by their primary tag:

### Data Refresh

| DAG ID | Schedule Interval |
| ------------------------------------------------------------------ | ----------------- |
| [`create_filtered_audio_index`](#create_filtered_media_type_index) | `None` |
| [`create_filtered_image_index`](#create_filtered_media_type_index) | `None` |
| [`audio_data_refresh`](#media_type_data_refresh) | `0 0 * * 1` |
| [`image_data_refresh`](#media_type_data_refresh) | `0 0 * * 1` |
| DAG ID | Schedule Interval |
| ----------------------------------------------------------------------- | ----------------- |
| [`create_filtered_audio_index`](#create_filtered_media_type_index) | `None` |
| [`create_filtered_image_index`](#create_filtered_media_type_index) | `None` |
| [`production_audio_data_refresh`](#environment_media_type_data_refresh) | `0 0 * * 1` |
| [`production_image_data_refresh`](#environment_media_type_data_refresh) | `0 0 * * 1` |
| [`staging_audio_data_refresh`](#environment_media_type_data_refresh) | `0 0 * * 1` |
| [`staging_image_data_refresh`](#environment_media_type_data_refresh) | `0 0 * * 1` |
| [`audio_data_refresh`](#media_type_data_refresh) | `0 0 * * 1` |
| [`image_data_refresh`](#media_type_data_refresh) | `0 0 * * 1` |

### Database

Expand Down Expand Up @@ -175,6 +179,10 @@ The following is documentation associated with each DAG (where available):
1. [`wordpress_workflow`](#wordpress_workflow)
1. [`production_elasticsearch_cluster_healthcheck`](#environment_elasticsearch_cluster_healthcheck)
1. [`staging_elasticsearch_cluster_healthcheck`](#environment_elasticsearch_cluster_healthcheck)
1. [`production_audio_data_refresh`](#environment_media_type_data_refresh)
1. [`production_image_data_refresh`](#environment_media_type_data_refresh)
1. [`staging_audio_data_refresh`](#environment_media_type_data_refresh)
1. [`staging_image_data_refresh`](#environment_media_type_data_refresh)
1. [`audio_data_refresh`](#media_type_data_refresh)
1. [`image_data_refresh`](#media_type_data_refresh)
1. [`audio_popularity_refresh`](#media_type_popularity_refresh)
Expand Down Expand Up @@ -1206,6 +1214,51 @@ during data refresh or other similar operations.

----

### `{environment}_{media_type}_data_refresh`

#### Data Refresh DAG Factory

This file generates our data refresh DAGs for each media type and environment
using a factory function. The data refresh is a process which makes new content
added to the Catalog by our provider DAGs available to the API.

The data refresh has the following high level steps:

- Copy Data: An FDW extension is used to connect the API database to the
upstream (catalog) database. The entire contents of the upstream media table
are copied into a new temp table in the API database. This temp table will
later replace the main media table in the API.
- Create Index: Create a new Elasticsearch index, matching the configuration of
the existing media index.
- Distributed Reindex: Convert each record from the new temp table to the format
required by an Elasticsearch document, and then reindex them into the newly
created index.
- Create and Populate Filtered Index: Create a new Elasticsearch index matching
the configuration of the existing filtered index, and then reindex documents
into it from the new media index, applying appropriate filtering for sensitive
terms.
- Reapply Constraints: Recreate indices and constraints from the original API
tables on the new temp tables.
- Promote Table: Drop the old media table in the API and rename the temp table
and its indices, which has the effect of promoting them/replacing the old
table.
- Promote Index: Promote the new Elasticsearch index by unlinking the given
alias from the existing index and moving it to the new one. (Used for both the
main and filtered indices.)
- Delete Index: Delete the old Elasticsearch index. (Used for both the main and
filtered indices.)

Importantly, the data refresh DAGs are also configured to handle concurrency
requirements of the reindexing steps.

You can find more background information on this process in the following issues
and related PRs:

- [[Implementation Plan] Ingestion Server Removal](https://docs.openverse.org/projects/proposals/ingestion_server_removal/20240328-implementation_plan_ingestion_server_removal.html)
- [[Feature] Merge popularity calculations and data refresh into a single DAG](https://github.com/WordPress/openverse-catalog/issues/453)

----

### `{media_type}_data_refresh`

#### Data Refresh DAG Factory
Expand Down

0 comments on commit 9d2c216

Please sign in to comment.