Skip to content

Commit

Permalink
Prevent staging DB restore and index creation from simultaneous runs
Browse files Browse the repository at this point in the history
  • Loading branch information
stacimc committed Nov 22, 2023
1 parent e8e3064 commit e49dca1
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,20 @@
from airflow.decorators import dag
from airflow.providers.amazon.aws.operators.rds import RdsDeleteDbInstanceOperator
from airflow.providers.amazon.aws.sensors.rds import RdsSnapshotExistenceSensor
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.state import State
from airflow.utils.trigger_rule import TriggerRule
from elasticsearch.recreate_staging_index.recreate_full_staging_index_dag import (
DAG_ID as RECREATE_STAGING_INDEX_DAG_ID,
)

from common.constants import (
AWS_RDS_CONN_ID,
DAG_DEFAULT_ARGS,
POSTGRES_API_STAGING_CONN_ID,
REFRESH_POKE_INTERVAL,
)
from common.sensors.utils import get_most_recent_dag_run
from common.sql import PGExecuteQueryOperator
from database.staging_database_restore import constants
from database.staging_database_restore.staging_database_restore import (
Expand Down Expand Up @@ -70,9 +77,26 @@
render_template_as_native_obj=True,
)
def restore_staging_database():
# If the `recreate_full_staging_index` DAG was manually triggered prior
# to the database restoration starting, we should wait for it to
# finish.
wait_for_recreate_full_staging_index = ExternalTaskSensor(
task_id="wait_for_recreate_full_staging_index",
external_dag_id=RECREATE_STAGING_INDEX_DAG_ID,
# Wait for the whole DAG, not just a part of it
external_task_id=None,
check_existence=False,
poke_interval=REFRESH_POKE_INTERVAL,
execution_date_fn=lambda _: get_most_recent_dag_run(
RECREATE_STAGING_INDEX_DAG_ID
),
mode="reschedule",
# Any "finished" state is sufficient for us to continue.
allowed_states=[State.SUCCESS, State.FAILED],
)
should_skip = skip_restore()
latest_snapshot = get_latest_prod_snapshot()
should_skip >> latest_snapshot
wait_for_recreate_full_staging_index >> should_skip >> latest_snapshot

ensure_snapshot_ready = RdsSnapshotExistenceSensor(
task_id="ensure_snapshot_ready",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,41 @@
from datetime import timedelta

from airflow.decorators import task, task_group
from airflow.exceptions import AirflowSensorTimeout
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.state import State

from common import ingestion_server
from common.sensors.utils import get_most_recent_dag_run
from data_refresh.data_refresh_types import DATA_REFRESH_CONFIGS
from database.staging_database_restore.constants import (
DAG_ID as STAGING_DB_RESTORE_DAG_ID,
)


@task(retries=0)
def prevent_concurrency_with_staging_database_restore(**context):
wait_for_dag = ExternalTaskSensor(
task_id="check_for_running_staging_db_restore",
external_dag_id=STAGING_DB_RESTORE_DAG_ID,
# Set timeout to 0 to prevent retries. If the staging DB restoration is running,
# immediately fail the staging index creation DAG.
timeout=0,
# Wait for the whole DAG, not just a part of it
external_task_id=None,
check_existence=False,
execution_date_fn=lambda _: get_most_recent_dag_run(STAGING_DB_RESTORE_DAG_ID),
# Any "finished" state is sufficient for us to continue.
allowed_states=[State.SUCCESS, State.FAILED],
mode="reschedule",
)
try:
wait_for_dag.execute(context)
except AirflowSensorTimeout:
raise ValueError(
"Concurrency check failed. Staging index creation cannot start"
" during staging DB restoration."
)


@task
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,16 @@
timestamp. The DAG awaits the completion of the index creation and then points
the `<media_type>-full` alias to the newly created index.
Optionally, the `target_alias_override` param can be used to override the target
alias that is pointed to the new index. When the alias is assigned, it is removed
from any other index to which it previously pointed. The `delete_old_index` param
can optionally be enabled in order to delete the index previously pointed to by
the alias, if applicable.
Required Dagrun Configuration parameters:
* media_type: the media type for which to create a new index.
Optional params:
* target_alias_override: Override the alias that is pointed to the new index. By
default this is `<media_type>-full`.
* delete_old_index: Whether to delete the index previously pointed to by the
target alias, if applicable. Defaults to False.
## When this DAG runs
Expand All @@ -27,6 +32,9 @@
Because this DAG runs on the staging ingestion server and staging elasticsearch
cluster, it does _not_ interfere with the `data_refresh` or
`create_filtered_index` DAGs.
However, the DAG will exit immediately if the `staging_database_restore` DAG is
running, as it operates on the staging API database.
"""
from datetime import datetime

Expand All @@ -37,6 +45,7 @@
create_index,
get_target_alias,
point_alias,
prevent_concurrency_with_staging_database_restore,
should_delete_index,
)

Expand Down Expand Up @@ -84,6 +93,8 @@
render_template_as_native_obj=True,
)
def recreate_full_staging_index():
prevent_concurrency = prevent_concurrency_with_staging_database_restore()

target_alias = get_target_alias(
media_type="{{ params.media_type }}",
target_alias_override="{{ params.target_alias_override }}",
Expand Down Expand Up @@ -145,7 +156,8 @@ def recreate_full_staging_index():
)

# Set up dependencies
target_alias >> get_current_index_if_exists >> new_index_suffix
prevent_concurrency >> target_alias >> get_current_index_if_exists
get_current_index_if_exists >> new_index_suffix
new_index_suffix >> do_create_index >> do_point_alias
do_point_alias >> check_if_should_delete_index
check_if_should_delete_index >> [delete_old_index, notify_complete]
Expand Down

0 comments on commit e49dca1

Please sign in to comment.