diff --git a/catalog/dags/database/staging_database_restore/staging_database_restore_dag.py b/catalog/dags/database/staging_database_restore/staging_database_restore_dag.py index c9f57a4a8dc..3d2939a0ce3 100644 --- a/catalog/dags/database/staging_database_restore/staging_database_restore_dag.py +++ b/catalog/dags/database/staging_database_restore/staging_database_restore_dag.py @@ -28,13 +28,20 @@ from airflow.decorators import dag from airflow.providers.amazon.aws.operators.rds import RdsDeleteDbInstanceOperator from airflow.providers.amazon.aws.sensors.rds import RdsSnapshotExistenceSensor +from airflow.sensors.external_task import ExternalTaskSensor +from airflow.utils.state import State from airflow.utils.trigger_rule import TriggerRule +from elasticsearch.recreate_staging_index.recreate_full_staging_index_dag import ( + DAG_ID as RECREATE_STAGING_INDEX_DAG_ID, +) from common.constants import ( AWS_RDS_CONN_ID, DAG_DEFAULT_ARGS, POSTGRES_API_STAGING_CONN_ID, + REFRESH_POKE_INTERVAL, ) +from common.sensors.utils import get_most_recent_dag_run from common.sql import PGExecuteQueryOperator from database.staging_database_restore import constants from database.staging_database_restore.staging_database_restore import ( @@ -70,9 +77,26 @@ render_template_as_native_obj=True, ) def restore_staging_database(): + # If the `recreate_full_staging_index` DAG was manually triggered prior + # to the database restoration starting, we should wait for it to + # finish. + wait_for_recreate_full_staging_index = ExternalTaskSensor( + task_id="wait_for_recreate_full_staging_index", + external_dag_id=RECREATE_STAGING_INDEX_DAG_ID, + # Wait for the whole DAG, not just a part of it + external_task_id=None, + check_existence=False, + poke_interval=REFRESH_POKE_INTERVAL, + execution_date_fn=lambda _: get_most_recent_dag_run( + RECREATE_STAGING_INDEX_DAG_ID + ), + mode="reschedule", + # Any "finished" state is sufficient for us to continue. + allowed_states=[State.SUCCESS, State.FAILED], + ) should_skip = skip_restore() latest_snapshot = get_latest_prod_snapshot() - should_skip >> latest_snapshot + wait_for_recreate_full_staging_index >> should_skip >> latest_snapshot ensure_snapshot_ready = RdsSnapshotExistenceSensor( task_id="ensure_snapshot_ready", diff --git a/catalog/dags/elasticsearch/recreate_staging_index/recreate_full_staging_index.py b/catalog/dags/elasticsearch/recreate_staging_index/recreate_full_staging_index.py index 74e7d6fe9f5..712e004c927 100644 --- a/catalog/dags/elasticsearch/recreate_staging_index/recreate_full_staging_index.py +++ b/catalog/dags/elasticsearch/recreate_staging_index/recreate_full_staging_index.py @@ -1,9 +1,41 @@ from datetime import timedelta from airflow.decorators import task, task_group +from airflow.exceptions import AirflowSensorTimeout +from airflow.sensors.external_task import ExternalTaskSensor +from airflow.utils.state import State from common import ingestion_server +from common.sensors.utils import get_most_recent_dag_run from data_refresh.data_refresh_types import DATA_REFRESH_CONFIGS +from database.staging_database_restore.constants import ( + DAG_ID as STAGING_DB_RESTORE_DAG_ID, +) + + +@task(retries=0) +def prevent_concurrency_with_staging_database_restore(**context): + wait_for_dag = ExternalTaskSensor( + task_id="check_for_running_staging_db_restore", + external_dag_id=STAGING_DB_RESTORE_DAG_ID, + # Set timeout to 0 to prevent retries. If the staging DB restoration is running, + # immediately fail the staging index creation DAG. + timeout=0, + # Wait for the whole DAG, not just a part of it + external_task_id=None, + check_existence=False, + execution_date_fn=lambda _: get_most_recent_dag_run(STAGING_DB_RESTORE_DAG_ID), + # Any "finished" state is sufficient for us to continue. + allowed_states=[State.SUCCESS, State.FAILED], + mode="reschedule", + ) + try: + wait_for_dag.execute(context) + except AirflowSensorTimeout: + raise ValueError( + "Concurrency check failed. Staging index creation cannot start" + " during staging DB restoration." + ) @task diff --git a/catalog/dags/elasticsearch/recreate_staging_index/recreate_full_staging_index_dag.py b/catalog/dags/elasticsearch/recreate_staging_index/recreate_full_staging_index_dag.py index 319bb40d196..d2857c2b383 100644 --- a/catalog/dags/elasticsearch/recreate_staging_index/recreate_full_staging_index_dag.py +++ b/catalog/dags/elasticsearch/recreate_staging_index/recreate_full_staging_index_dag.py @@ -12,11 +12,16 @@ timestamp. The DAG awaits the completion of the index creation and then points the `-full` alias to the newly created index. -Optionally, the `target_alias_override` param can be used to override the target -alias that is pointed to the new index. When the alias is assigned, it is removed -from any other index to which it previously pointed. The `delete_old_index` param -can optionally be enabled in order to delete the index previously pointed to by -the alias, if applicable. +Required Dagrun Configuration parameters: + +* media_type: the media type for which to create a new index. + +Optional params: + +* target_alias_override: Override the alias that is pointed to the new index. By + default this is `-full`. +* delete_old_index: Whether to delete the index previously pointed to by the + target alias, if applicable. Defaults to False. ## When this DAG runs @@ -27,6 +32,9 @@ Because this DAG runs on the staging ingestion server and staging elasticsearch cluster, it does _not_ interfere with the `data_refresh` or `create_filtered_index` DAGs. + +However, the DAG will exit immediately if the `staging_database_restore` DAG is +running, as it operates on the staging API database. """ from datetime import datetime @@ -37,6 +45,7 @@ create_index, get_target_alias, point_alias, + prevent_concurrency_with_staging_database_restore, should_delete_index, ) @@ -84,6 +93,8 @@ render_template_as_native_obj=True, ) def recreate_full_staging_index(): + prevent_concurrency = prevent_concurrency_with_staging_database_restore() + target_alias = get_target_alias( media_type="{{ params.media_type }}", target_alias_override="{{ params.target_alias_override }}", @@ -145,7 +156,8 @@ def recreate_full_staging_index(): ) # Set up dependencies - target_alias >> get_current_index_if_exists >> new_index_suffix + prevent_concurrency >> target_alias >> get_current_index_if_exists + get_current_index_if_exists >> new_index_suffix new_index_suffix >> do_create_index >> do_point_alias do_point_alias >> check_if_should_delete_index check_if_should_delete_index >> [delete_old_index, notify_complete]