diff --git a/catalog/dags/common/sensors/constants.py b/catalog/dags/common/sensors/constants.py new file mode 100644 index 00000000000..8d94c9bf395 --- /dev/null +++ b/catalog/dags/common/sensors/constants.py @@ -0,0 +1,19 @@ +from common.constants import PRODUCTION, STAGING + + +# These DagTags are used to identify DAGs which should not be run concurrently +# with one another. + +# Used to identify DAGs for each environment which affect the Elasticsearch cluster +# and should not be run simultaneously +PRODUCTION_ES_CONCURRENCY_TAG = "production_elasticsearch_concurrency" +STAGING_ES_CONCURRENCY_TAG = "staging_elasticsearch_concurrency" + +# Used to identify DAGs which affect the staging API database in such a +# way that they should not be run simultaneously +STAGING_DB_CONCURRENCY_TAG = "staging_api_database_concurrency" + +ES_CONCURRENCY_TAGS = { + PRODUCTION: PRODUCTION_ES_CONCURRENCY_TAG, + STAGING: STAGING_ES_CONCURRENCY_TAG, +} diff --git a/catalog/dags/common/sensors/utils.py b/catalog/dags/common/sensors/utils.py index 08c3bb9a6ac..f6cb544ccbb 100644 --- a/catalog/dags/common/sensors/utils.py +++ b/catalog/dags/common/sensors/utils.py @@ -2,14 +2,18 @@ from airflow.decorators import task, task_group from airflow.exceptions import AirflowSensorTimeout -from airflow.models import DagRun +from airflow.models import DagModel, DagRun, DagTag from airflow.sensors.external_task import ExternalTaskSensor +from airflow.utils.session import provide_session from airflow.utils.state import State from common.constants import REFRESH_POKE_INTERVAL -def get_most_recent_dag_run(dag_id) -> list[datetime] | datetime: +THREE_DAYS = 60 * 60 * 24 * 3 + + +def _get_most_recent_dag_run(dag_id) -> list[datetime] | datetime: """ Retrieve the most recent DAG run's execution date. @@ -35,9 +39,40 @@ def get_most_recent_dag_run(dag_id) -> list[datetime] | datetime: return [] -def wait_for_external_dag(external_dag_id: str, task_id: str | None = None): +@task +def get_dags_with_concurrency_tag( + tag: str, excluded_dag_ids: list[str], session=None, dag=None +): + """ + Get a list of DAG ids with the given tag. The id of the running DAG is excluded, + as well as any ids in the `excluded_dag_ids` list. """ - Return a Sensor task which will wait if the given external DAG is + dags = session.query(DagModel).filter(DagModel.tags.any(DagTag.name == tag)).all() + dag_ids = [dag.dag_id for dag in dags] + + running_dag_id = dag.dag_id + if running_dag_id not in dag_ids: + raise ValueError( + f"The `{running_dag_id}` DAG tried preventing concurrency with the `{tag}`," + " tag, but does not have the tag itself. To ensure that other DAGs with this" + f" tag will also avoid running concurrently with `{running_dag_id}`, it must" + f"have the `{tag}` tag applied." + ) + + # Return just the ids of DAGs to prevent concurrency with. This excludes the running dag id, + # and any supplied `excluded_dag_ids` + return [id for id in dag_ids if id not in {*excluded_dag_ids, running_dag_id}] + + +@task +def wait_for_external_dag( + external_dag_id: str, + task_id: str | None = None, + timeout: int | None = THREE_DAYS, + **context, +): + """ + Execute a Sensor task which will wait if the given external DAG is running. To fully ensure that the waiting DAG and the external DAG do not run @@ -51,28 +86,39 @@ def wait_for_external_dag(external_dag_id: str, task_id: str | None = None): if not task_id: task_id = f"wait_for_{external_dag_id}" - return ExternalTaskSensor( + sensor = ExternalTaskSensor( task_id=task_id, poke_interval=REFRESH_POKE_INTERVAL, external_dag_id=external_dag_id, # Wait for the whole DAG, not just a part of it external_task_id=None, check_existence=False, - execution_date_fn=lambda _: get_most_recent_dag_run(external_dag_id), + execution_date_fn=lambda _: _get_most_recent_dag_run(external_dag_id), mode="reschedule", # Any "finished" state is sufficient for us to continue allowed_states=[State.SUCCESS, State.FAILED], + # execution_timeout for the task does not include time that the sensor + # was up for reschedule but not actually running. `timeout` does + timeout=timeout, ) + sensor.execute(context) + @task_group(group_id="wait_for_external_dags") -def wait_for_external_dags(external_dag_ids: list[str]): +@provide_session +def wait_for_external_dags_with_tag( + tag: str, excluded_dag_ids: list[str] = None, session=None +): """ - Wait for all DAGs with the given external DAG ids to no longer be - in a running state before continuing. + Wait until all DAGs with the given `tag`, excluding those identified by the + `excluded_dag_ids`, are no longer in the running state before continuing. """ - for dag_id in external_dag_ids: - wait_for_external_dag(dag_id) + external_dag_ids = get_dags_with_concurrency_tag.override( + task_id=f"get_dags_in_{tag}_group" + )(tag=tag, excluded_dag_ids=excluded_dag_ids or [], session=session) + + wait_for_external_dag.expand(external_dag_id=external_dag_ids) @task(retries=0) @@ -81,18 +127,35 @@ def prevent_concurrency_with_dag(external_dag_id: str, **context): Prevent concurrency with the given external DAG, by failing immediately if that DAG is running. """ - - wait_for_dag = wait_for_external_dag( - external_dag_id=external_dag_id, - task_id=f"check_for_running_{external_dag_id}", - ) - wait_for_dag.timeout = 0 try: - wait_for_dag.execute(context) + wait_for_external_dag.function( + external_dag_id=external_dag_id, + task_id=f"check_for_running_{external_dag_id}", + timeout=0, + **context, + ) except AirflowSensorTimeout: raise ValueError(f"Concurrency check with {external_dag_id} failed.") +@task_group(group_id="prevent_concurrency_with_dags") +@provide_session +def prevent_concurrency_with_dags_with_tag( + tag: str, excluded_dag_ids: list[str] = None, session=None +): + """ + Prevent concurrency with any DAGs that have the given `tag`, excluding + those identified by the `excluded_dag_ids`. Concurrency is prevented by + failing the task immediately if any of the tagged DAGs are in the running + state. + """ + external_dag_ids = get_dags_with_concurrency_tag.override( + task_id=f"get_dags_in_{tag}_group" + )(tag=tag, excluded_dag_ids=excluded_dag_ids or [], session=session) + + prevent_concurrency_with_dag.expand(external_dag_id=external_dag_ids) + + @task(retries=0) def is_concurrent_with_any(external_dag_ids: list[str], **context): """ @@ -109,12 +172,3 @@ def is_concurrent_with_any(external_dag_ids: list[str], **context): # Explicit return None to clarify expectations return None - - -@task_group(group_id="prevent_concurrency") -def prevent_concurrency_with_dags(external_dag_ids: list[str]): - """Fail immediately if any of the given external dags are in progress.""" - for dag_id in external_dag_ids: - prevent_concurrency_with_dag.override( - task_id=f"prevent_concurrency_with_{dag_id}" - )(dag_id) diff --git a/catalog/dags/data_refresh/create_filtered_index_dag.py b/catalog/dags/data_refresh/create_filtered_index_dag.py index 20f365f982a..05af2c15050 100644 --- a/catalog/dags/data_refresh/create_filtered_index_dag.py +++ b/catalog/dags/data_refresh/create_filtered_index_dag.py @@ -42,8 +42,9 @@ There are two mechanisms that prevent this from happening: -1. The filtered index creation DAGs are not allowed to run if a data refresh -for the media type is already running. +1. The filtered index creation DAGs fail immediately if any of the DAGs that are +tagged as part of the `production-es-concurrency` group (including the data +refreshes) are currently running. 2. The data refresh DAGs will wait for any pre-existing filtered index creation DAG runs for the media type to finish before continuing. @@ -56,15 +57,13 @@ from airflow import DAG from airflow.models.param import Param -from common.constants import DAG_DEFAULT_ARGS, PRODUCTION -from common.sensors.utils import prevent_concurrency_with_dags +from common.constants import DAG_DEFAULT_ARGS +from common.sensors.constants import PRODUCTION_ES_CONCURRENCY_TAG +from common.sensors.utils import prevent_concurrency_with_dags_with_tag from data_refresh.create_filtered_index import ( create_filtered_index_creation_task_groups, ) from data_refresh.data_refresh_types import DATA_REFRESH_CONFIGS, DataRefresh -from elasticsearch_cluster.create_new_es_index.create_new_es_index_types import ( - CREATE_NEW_INDEX_CONFIGS, -) # Note: We can't use the TaskFlow `@dag` DAG factory decorator @@ -88,7 +87,7 @@ def create_filtered_index_creation_dag(data_refresh: DataRefresh): default_args=DAG_DEFAULT_ARGS, schedule=None, start_date=datetime(2023, 4, 1), - tags=["data_refresh"], + tags=["data_refresh", PRODUCTION_ES_CONCURRENCY_TAG], max_active_runs=1, catchup=False, doc_md=__doc__, @@ -117,14 +116,11 @@ def create_filtered_index_creation_dag(data_refresh: DataRefresh): }, render_template_as_native_obj=True, ) as dag: - # Immediately fail if the associated data refresh is running, or the - # create_new_production_es_index DAG is running. This prevents multiple - # DAGs from reindexing from a single production index simultaneously. - prevent_concurrency = prevent_concurrency_with_dags( - external_dag_ids=[ - data_refresh.dag_id, - CREATE_NEW_INDEX_CONFIGS[PRODUCTION].dag_id, - ] + # Immediately fail if any DAG that operates on the production elasticsearch + # cluster is running. This prevents multiple DAGs from reindexing from a + # single production index simultaneously. + prevent_concurrency = prevent_concurrency_with_dags_with_tag( + tag=PRODUCTION_ES_CONCURRENCY_TAG, ) # Once the concurrency check has passed, actually create the filtered diff --git a/catalog/dags/data_refresh/dag_factory.py b/catalog/dags/data_refresh/dag_factory.py index 358a0b3e212..c9d7026377d 100644 --- a/catalog/dags/data_refresh/dag_factory.py +++ b/catalog/dags/data_refresh/dag_factory.py @@ -34,6 +34,7 @@ OPENLEDGER_API_CONN_ID, XCOM_PULL_TEMPLATE, ) +from common.sensors.constants import PRODUCTION_ES_CONCURRENCY_TAG from common.sql import PGExecuteQueryOperator, single_value from data_refresh.data_refresh_task_factory import create_data_refresh_task_group from data_refresh.data_refresh_types import DATA_REFRESH_CONFIGS, DataRefresh @@ -70,7 +71,7 @@ def create_data_refresh_dag(data_refresh: DataRefresh, external_dag_ids: Sequenc max_active_runs=1, catchup=False, doc_md=__doc__, - tags=["data_refresh"], + tags=["data_refresh", PRODUCTION_ES_CONCURRENCY_TAG], ) with dag: diff --git a/catalog/dags/data_refresh/data_refresh_task_factory.py b/catalog/dags/data_refresh/data_refresh_task_factory.py index 3ad20a806d3..c1e09066be8 100644 --- a/catalog/dags/data_refresh/data_refresh_task_factory.py +++ b/catalog/dags/data_refresh/data_refresh_task_factory.py @@ -55,16 +55,14 @@ from airflow.utils.trigger_rule import TriggerRule from common import cloudwatch, ingestion_server -from common.constants import PRODUCTION, XCOM_PULL_TEMPLATE +from common.constants import XCOM_PULL_TEMPLATE +from common.sensors.constants import PRODUCTION_ES_CONCURRENCY_TAG from common.sensors.single_run_external_dags_sensor import SingleRunExternalDAGsSensor -from common.sensors.utils import wait_for_external_dags +from common.sensors.utils import wait_for_external_dags_with_tag from data_refresh.create_filtered_index import ( create_filtered_index_creation_task_groups, ) from data_refresh.data_refresh_types import DataRefresh -from elasticsearch_cluster.create_new_es_index.create_new_es_index_types import ( - CREATE_NEW_INDEX_CONFIGS, -) logger = logging.getLogger(__name__) @@ -123,11 +121,13 @@ def create_data_refresh_task_group( # Realistically the data refresh is too slow to beat the index creation process, # even if it was triggered immediately after one of these DAGs; however, it is # always safer to avoid the possibility of the race condition altogether. - wait_for_es_dags = wait_for_external_dags.override(group_id="wait_for_es_dags")( - external_dag_ids=[ - data_refresh.filtered_index_dag_id, - CREATE_NEW_INDEX_CONFIGS[PRODUCTION].dag_id, - ] + wait_for_es_dags = wait_for_external_dags_with_tag.override( + group_id="wait_for_es_dags" + )( + tag=PRODUCTION_ES_CONCURRENCY_TAG, + # Exclude the other data refresh DAG ids, as waiting on these was handled in + # the previous task. + excluded_dag_ids=external_dag_ids, ) tasks.append([wait_for_data_refresh, wait_for_es_dags]) diff --git a/catalog/dags/database/staging_database_restore/staging_database_restore_dag.py b/catalog/dags/database/staging_database_restore/staging_database_restore_dag.py index 85bed3561b8..ef40fe78277 100644 --- a/catalog/dags/database/staging_database_restore/staging_database_restore_dag.py +++ b/catalog/dags/database/staging_database_restore/staging_database_restore_dag.py @@ -1,5 +1,5 @@ """ -# Update the staging database +# Staging Database Restore DAG This DAG is responsible for updating the staging database using the most recent snapshot of the production database. @@ -35,7 +35,8 @@ DAG_DEFAULT_ARGS, POSTGRES_API_STAGING_CONN_ID, ) -from common.sensors.utils import wait_for_external_dag +from common.sensors.constants import STAGING_DB_CONCURRENCY_TAG +from common.sensors.utils import wait_for_external_dags_with_tag from common.sql import PGExecuteQueryOperator from database.staging_database_restore import constants from database.staging_database_restore.staging_database_restore import ( @@ -48,9 +49,6 @@ restore_staging_from_snapshot, skip_restore, ) -from elasticsearch_cluster.recreate_staging_index.recreate_full_staging_index import ( - DAG_ID as RECREATE_STAGING_INDEX_DAG_ID, -) log = logging.getLogger(__name__) @@ -60,7 +58,7 @@ dag_id=constants.DAG_ID, schedule="@monthly", start_date=datetime(2023, 5, 1), - tags=["database"], + tags=["database", STAGING_DB_CONCURRENCY_TAG], max_active_runs=1, dagrun_timeout=timedelta(days=1), catchup=False, @@ -76,9 +74,10 @@ def restore_staging_database(): # If the `recreate_full_staging_index` DAG was manually triggered prior # to the database restoration starting, we should wait for it to - # finish. - wait_for_recreate_full_staging_index = wait_for_external_dag( - external_dag_id=RECREATE_STAGING_INDEX_DAG_ID, + # finish. It is not necessary to wait on any of the other ES DAGs as + # they do not directly affect the database. + wait_for_recreate_full_staging_index = wait_for_external_dags_with_tag( + tag=STAGING_DB_CONCURRENCY_TAG ) should_skip = skip_restore() latest_snapshot = get_latest_prod_snapshot() diff --git a/catalog/dags/elasticsearch_cluster/create_new_es_index/create_new_es_index_dag.py b/catalog/dags/elasticsearch_cluster/create_new_es_index/create_new_es_index_dag.py index 54191bb162e..2c1ee6117f5 100644 --- a/catalog/dags/elasticsearch_cluster/create_new_es_index/create_new_es_index_dag.py +++ b/catalog/dags/elasticsearch_cluster/create_new_es_index/create_new_es_index_dag.py @@ -100,6 +100,13 @@ } } ``` + +## Race conditions + +Each DAG will fail immediately if any of the DAGs tagged as part of the +es-concurrency group for the DAG's environment is running. (E.g., the +`create_new_staging_es_index` DAG fails immediately if any DAGs tagged with +`staging-es-concurrency` are running.) """ import logging @@ -111,7 +118,7 @@ from common import elasticsearch as es from common import slack from common.constants import AUDIO, DAG_DEFAULT_ARGS, MEDIA_TYPES -from common.sensors.utils import prevent_concurrency_with_dags +from common.sensors.utils import prevent_concurrency_with_dags_with_tag from elasticsearch_cluster.create_new_es_index.create_new_es_index import ( GET_CURRENT_INDEX_CONFIG_TASK_NAME, GET_FINAL_INDEX_CONFIG_TASK_NAME, @@ -129,15 +136,15 @@ logger = logging.getLogger(__name__) -def create_new_es_index_dag(config: CreateNewIndex): +def create_new_es_index_dag(dag_config: CreateNewIndex): dag = DAG( - dag_id=config.dag_id, + dag_id=dag_config.dag_id, default_args=DAG_DEFAULT_ARGS, schedule=None, max_active_runs=1, catchup=False, doc_md=__doc__, - tags=["elasticsearch"], + tags=["elasticsearch", dag_config.prevent_concurrency_tag], render_template_as_native_obj=True, params={ "media_type": Param( @@ -218,9 +225,13 @@ def create_new_es_index_dag(config: CreateNewIndex): ) with dag: - prevent_concurrency = prevent_concurrency_with_dags(config.blocking_dags) + # Fail early if any other DAG that operates on the relevant elasticsearch cluster + # is running + prevent_concurrency = prevent_concurrency_with_dags_with_tag( + tag=dag_config.prevent_concurrency_tag, + ) - es_host = es.get_es_host(environment=config.environment) + es_host = es.get_es_host(environment=dag_config.environment) index_name = get_index_name( media_type="{{ params.media_type }}", @@ -260,8 +271,8 @@ def create_new_es_index_dag(config: CreateNewIndex): destination_index=index_name, source_index="{{ params.source_index or params.media_type }}", query="{{ params.query }}", - timeout=config.reindex_timeout, - requests_per_second=config.requests_per_second, + timeout=dag_config.reindex_timeout, + requests_per_second=dag_config.requests_per_second, es_host=es_host, ) @@ -294,6 +305,6 @@ def create_new_es_index_dag(config: CreateNewIndex): return dag -for config in CREATE_NEW_INDEX_CONFIGS.values(): +for dag_config in CREATE_NEW_INDEX_CONFIGS.values(): # Generate the DAG for this environment - globals()[config.dag_id] = create_new_es_index_dag(config) + globals()[dag_config.dag_id] = create_new_es_index_dag(dag_config) diff --git a/catalog/dags/elasticsearch_cluster/create_new_es_index/create_new_es_index_types.py b/catalog/dags/elasticsearch_cluster/create_new_es_index/create_new_es_index_types.py index cf631e4cb0f..578e7d92048 100644 --- a/catalog/dags/elasticsearch_cluster/create_new_es_index/create_new_es_index_types.py +++ b/catalog/dags/elasticsearch_cluster/create_new_es_index/create_new_es_index_types.py @@ -4,13 +4,7 @@ from airflow.models import Variable from common.constants import PRODUCTION, STAGING -from data_refresh.data_refresh_types import DATA_REFRESH_CONFIGS -from database.staging_database_restore.constants import ( - DAG_ID as STAGING_DB_RESTORE_DAG_ID, -) -from elasticsearch_cluster.recreate_staging_index.recreate_full_staging_index import ( - DAG_ID as RECREATE_STAGING_INDEX_DAG_ID, -) +from common.sensors.constants import ES_CONCURRENCY_TAGS @dataclass @@ -20,26 +14,27 @@ class CreateNewIndex: Required Constructor Arguments: - environment: str representation of the environment in which to - create the new index - blocking_dags: list of dags with which to prevent concurrency; the - generated create_new_es_index dag will fail - immediately if any of these dags are running. - reindex_timeout: timedelta expressing maximum amount of time the - reindexing step may take - requests_per_second: number of requests to send per second during ES - reindexing, used to throttle the reindex step + environment: str representation of the environment in which to + create the new index + prevent_concurrency_tag: tag used to identify dags with which to prevent + concurrency immediately if any of these dags are + running. + reindex_timeout: timedelta expressing maximum amount of time the + reindexing step may take + requests_per_second: number of requests to send per second during ES + reindexing, used to throttle the reindex step """ dag_id: str = field(init=False) es_host: str = field(init=False) + prevent_concurrency_tag: str = field(init=False) environment: str - blocking_dags: list requests_per_second: int | None = None reindex_timeout: timedelta = timedelta(hours=12) def __post_init__(self): self.dag_id = f"create_new_{self.environment}_es_index" + self.prevent_concurrency_tag = ES_CONCURRENCY_TAGS[self.environment] if not self.requests_per_second: self.requests_per_second = Variable.get( @@ -50,17 +45,8 @@ def __post_init__(self): CREATE_NEW_INDEX_CONFIGS = { STAGING: CreateNewIndex( environment=STAGING, - blocking_dags=[RECREATE_STAGING_INDEX_DAG_ID, STAGING_DB_RESTORE_DAG_ID], ), PRODUCTION: CreateNewIndex( environment=PRODUCTION, - blocking_dags=( - # Block on all the data refreshes - [data_refresh.dag_id for data_refresh in DATA_REFRESH_CONFIGS.values()] - + [ # Block on the filtered index creation DAGs - data_refresh.filtered_index_dag_id - for data_refresh in DATA_REFRESH_CONFIGS.values() - ] - ), ), } diff --git a/catalog/dags/elasticsearch_cluster/create_proportional_by_source_staging_index/create_proportional_by_source_staging_index_dag.py b/catalog/dags/elasticsearch_cluster/create_proportional_by_source_staging_index/create_proportional_by_source_staging_index_dag.py index 9d4ccd087b5..aeecd9381ab 100644 --- a/catalog/dags/elasticsearch_cluster/create_proportional_by_source_staging_index/create_proportional_by_source_staging_index_dag.py +++ b/catalog/dags/elasticsearch_cluster/create_proportional_by_source_staging_index/create_proportional_by_source_staging_index_dag.py @@ -28,15 +28,10 @@ ## Race conditions -Because this DAG runs on the staging ingestion server and staging elasticsearch -cluster, it does _not_ interfere with the `data_refresh` or -`create_filtered_index` DAGs. - -However, as the DAG operates on the staging API database it will exit -immediately if any of the following DAGs are running: -* `staging_database_restore` -* `recreate_full_staging_index` -* `create_new_staging_es_index` +Because this DAG runs on the staging elasticsearch cluster, it does _not_ interfere + with the production `data_refresh` or `create_filtered_index` DAGs. However, it will + fail immediately if any of the DAGs tagged as part of the `staging-es-concurrency` + group are running. """ from datetime import datetime, timedelta @@ -53,19 +48,11 @@ MEDIA_TYPES, STAGING, ) -from common.sensors.utils import prevent_concurrency_with_dags -from database.staging_database_restore.constants import ( - DAG_ID as STAGING_DB_RESTORE_DAG_ID, -) -from elasticsearch_cluster.create_new_es_index.create_new_es_index_types import ( - CREATE_NEW_INDEX_CONFIGS, -) +from common.sensors.constants import STAGING_ES_CONCURRENCY_TAG +from common.sensors.utils import prevent_concurrency_with_dags_with_tag from elasticsearch_cluster.create_proportional_by_source_staging_index import ( create_proportional_by_source_staging_index as create_index, ) -from elasticsearch_cluster.recreate_staging_index.recreate_full_staging_index import ( - DAG_ID as RECREATE_STAGING_INDEX_DAG_ID, -) DAG_ID = "create_proportional_by_source_staging_index" @@ -76,7 +63,7 @@ default_args=DAG_DEFAULT_ARGS, schedule=None, start_date=datetime(2024, 1, 31), - tags=["database", "elasticsearch"], + tags=["elasticsearch", STAGING_ES_CONCURRENCY_TAG], max_active_runs=1, catchup=False, doc_md=__doc__, @@ -118,12 +105,8 @@ ) def create_proportional_by_source_staging_index(): # Fail early if any conflicting DAGs are running - prevent_concurrency = prevent_concurrency_with_dags( - external_dag_ids=[ - STAGING_DB_RESTORE_DAG_ID, - RECREATE_STAGING_INDEX_DAG_ID, - CREATE_NEW_INDEX_CONFIGS[STAGING].dag_id, - ] + prevent_concurrency = prevent_concurrency_with_dags_with_tag( + tag=STAGING_ES_CONCURRENCY_TAG, ) es_host = es.get_es_host(environment=STAGING) diff --git a/catalog/dags/elasticsearch_cluster/point_es_alias/point_es_alias_dag.py b/catalog/dags/elasticsearch_cluster/point_es_alias/point_es_alias_dag.py index b2f441297bd..cccbf890a16 100644 --- a/catalog/dags/elasticsearch_cluster/point_es_alias/point_es_alias_dag.py +++ b/catalog/dags/elasticsearch_cluster/point_es_alias/point_es_alias_dag.py @@ -12,6 +12,13 @@ ## When this DAG runs This DAG is on a `None` schedule and is run manually. + +## Race conditions + +Each DAG will fail immediately if any of the DAGs tagged as part of the +es-concurrency group for the DAG's environment is running. (E.g., the +`point_staging_alias` DAG fails immediately if any DAGs tagged with +`staging-es-concurrency` are running.) """ from datetime import datetime @@ -26,6 +33,8 @@ DAG_DEFAULT_ARGS, ENVIRONMENTS, ) +from common.sensors.constants import ES_CONCURRENCY_TAGS +from common.sensors.utils import prevent_concurrency_with_dags_with_tag def point_es_alias_dag(environment: str): @@ -34,7 +43,7 @@ def point_es_alias_dag(environment: str): default_args=DAG_DEFAULT_ARGS, schedule=None, start_date=datetime(2024, 1, 31), - tags=["database", "elasticsearch"], + tags=["elasticsearch", ES_CONCURRENCY_TAGS[environment]], max_active_runs=1, catchup=False, doc_md=__doc__, @@ -67,6 +76,12 @@ def point_es_alias_dag(environment: str): ) with dag: + # Fail early if any other DAG that operates on the elasticsearch cluster for + # this environment is running + prevent_concurrency = prevent_concurrency_with_dags_with_tag( + tag=ES_CONCURRENCY_TAGS[environment], + ) + es_host = es.get_es_host(environment=environment) point_alias = es.point_alias( @@ -85,7 +100,7 @@ def point_es_alias_dag(environment: str): icon_emoji=":elasticsearch:", ) - es_host >> point_alias >> notify_completion + prevent_concurrency >> es_host >> point_alias >> notify_completion for environment in ENVIRONMENTS: diff --git a/catalog/dags/elasticsearch_cluster/recreate_staging_index/recreate_full_staging_index_dag.py b/catalog/dags/elasticsearch_cluster/recreate_staging_index/recreate_full_staging_index_dag.py index 0be1bc848dc..3b16056941e 100644 --- a/catalog/dags/elasticsearch_cluster/recreate_staging_index/recreate_full_staging_index_dag.py +++ b/catalog/dags/elasticsearch_cluster/recreate_staging_index/recreate_full_staging_index_dag.py @@ -34,11 +34,9 @@ cluster, it does _not_ interfere with the `data_refresh` or `create_filtered_index` DAGs. -However, as the DAG operates on the staging API database it will exit -immediately if any of the following DAGs are running: -* `staging_database_restore` -* `create_proportional_by_provider_staging_index` -* `create_new_staging_es_index` +However, as the DAG operates on the staging API database and ES cluster it will exit +immediately if any of the DAGs tagged as part of the `staging_es_concurrency` group +are already running. """ from datetime import datetime @@ -52,16 +50,13 @@ AUDIO, DAG_DEFAULT_ARGS, MEDIA_TYPES, - STAGING, XCOM_PULL_TEMPLATE, ) -from common.sensors.utils import prevent_concurrency_with_dags -from database.staging_database_restore.constants import ( - DAG_ID as STAGING_DB_RESTORE_DAG_ID, -) -from elasticsearch_cluster.create_new_es_index.create_new_es_index_types import ( - CREATE_NEW_INDEX_CONFIGS, +from common.sensors.constants import ( + STAGING_DB_CONCURRENCY_TAG, + STAGING_ES_CONCURRENCY_TAG, ) +from common.sensors.utils import prevent_concurrency_with_dags_with_tag from elasticsearch_cluster.recreate_staging_index.recreate_full_staging_index import ( DAG_ID, create_index, @@ -76,7 +71,12 @@ default_args=DAG_DEFAULT_ARGS, schedule=None, start_date=datetime(2023, 4, 1), - tags=["database", "elasticsearch"], + tags=[ + "database", + "elasticsearch", + STAGING_DB_CONCURRENCY_TAG, + STAGING_ES_CONCURRENCY_TAG, + ], max_active_runs=1, catchup=False, doc_md=__doc__, @@ -108,13 +108,21 @@ render_template_as_native_obj=True, ) def recreate_full_staging_index(): - # Fail early if the staging_db_restore DAG or the create_new_staging_es_index DAG + # Fail early if any other DAG that operates on the staging elasticsearch cluster # is running - prevent_concurrency = prevent_concurrency_with_dags( - external_dag_ids=[ - STAGING_DB_RESTORE_DAG_ID, - CREATE_NEW_INDEX_CONFIGS[STAGING].dag_id, - ] + prevent_concurrency_es = prevent_concurrency_with_dags_with_tag.override( + group_id="prevent_concurrency_with_elasticsearch_dags" + )( + tag=STAGING_ES_CONCURRENCY_TAG, + ) + + # Because this DAG pulls records from the staging API database during reindexing + # rather than reindexing from another ES index, it must also prevent concurrency + # with DAGs that affect the staging DB. + prevent_concurrency_db = prevent_concurrency_with_dags_with_tag.override( + group_id="prevent_concurrency_with_api_db_dags" + )( + tag=STAGING_DB_CONCURRENCY_TAG, ) target_alias = get_target_alias( @@ -178,8 +186,10 @@ def recreate_full_staging_index(): ) # Set up dependencies - prevent_concurrency >> target_alias >> get_current_index_if_exists - get_current_index_if_exists >> new_index_suffix + prevent_concurrency_es >> target_alias + prevent_concurrency_db >> target_alias + + target_alias >> get_current_index_if_exists >> new_index_suffix new_index_suffix >> do_create_index >> do_point_alias do_point_alias >> check_if_should_delete_index check_if_should_delete_index >> [delete_old_index, notify_complete] diff --git a/catalog/tests/dags/common/sensors/test_utils.py b/catalog/tests/dags/common/sensors/test_utils.py index 67e599ddf27..be28e65d92d 100644 --- a/catalog/tests/dags/common/sensors/test_utils.py +++ b/catalog/tests/dags/common/sensors/test_utils.py @@ -5,7 +5,7 @@ from airflow.utils.timezone import datetime from airflow.utils.types import DagRunType -from common.sensors.utils import get_most_recent_dag_run +from common.sensors.utils import _get_most_recent_dag_run TEST_DAG_ID = "data_refresh_dag_factory_test_dag" @@ -29,11 +29,11 @@ def test_get_most_recent_dag_run_returns_most_recent_execution_date( most_recent = datetime(2023, 5, 10) for i in range(3): _create_dagrun(most_recent - timedelta(days=i), sample_dag_id_fixture) - assert get_most_recent_dag_run(sample_dag_id_fixture) == most_recent + assert _get_most_recent_dag_run(sample_dag_id_fixture) == most_recent def test_get_most_recent_dag_run_returns_empty_list_when_no_runs( sample_dag_id_fixture, clean_db ): # Relies on ``clean_db`` cleaning up DagRuns from other tests - assert get_most_recent_dag_run(sample_dag_id_fixture) == [] + assert _get_most_recent_dag_run(sample_dag_id_fixture) == [] diff --git a/documentation/catalog/reference/DAGs.md b/documentation/catalog/reference/DAGs.md index ef44e6c1f50..05c05534b73 100644 --- a/documentation/catalog/reference/DAGs.md +++ b/documentation/catalog/reference/DAGs.md @@ -41,18 +41,15 @@ The following are DAGs grouped by their primary tag: ### Database -| DAG ID | Schedule Interval | -| --------------------------------------------------------------------------------------------- | ----------------- | -| [`batched_update`](#batched_update) | `None` | -| [`create_proportional_by_source_staging_index`](#create_proportional_by_source_staging_index) | `None` | -| [`delete_records`](#delete_records) | `None` | -| [`point_production_es_alias`](#point_production_es_alias) | `None` | -| [`point_staging_es_alias`](#point_staging_es_alias) | `None` | -| [`recreate_audio_popularity_calculation`](#recreate_audio_popularity_calculation) | `None` | -| [`recreate_full_staging_index`](#recreate_full_staging_index) | `None` | -| [`recreate_image_popularity_calculation`](#recreate_image_popularity_calculation) | `None` | -| [`report_pending_reported_media`](#report_pending_reported_media) | `@weekly` | -| [`staging_database_restore`](#staging_database_restore) | `@monthly` | +| DAG ID | Schedule Interval | +| --------------------------------------------------------------------------------- | ----------------- | +| [`batched_update`](#batched_update) | `None` | +| [`delete_records`](#delete_records) | `None` | +| [`recreate_audio_popularity_calculation`](#recreate_audio_popularity_calculation) | `None` | +| [`recreate_full_staging_index`](#recreate_full_staging_index) | `None` | +| [`recreate_image_popularity_calculation`](#recreate_image_popularity_calculation) | `None` | +| [`report_pending_reported_media`](#report_pending_reported_media) | `@weekly` | +| [`staging_database_restore`](#staging_database_restore) | `@monthly` | ### Elasticsearch @@ -60,6 +57,9 @@ The following are DAGs grouped by their primary tag: | ----------------------------------------------------------------------------------------------- | ----------------- | | [`create_new_production_es_index`](#create_new_production_es_index) | `None` | | [`create_new_staging_es_index`](#create_new_staging_es_index) | `None` | +| [`create_proportional_by_source_staging_index`](#create_proportional_by_source_staging_index) | `None` | +| [`point_production_es_alias`](#point_production_es_alias) | `None` | +| [`point_staging_es_alias`](#point_staging_es_alias) | `None` | | [`production_elasticsearch_cluster_healthcheck`](#production_elasticsearch_cluster_healthcheck) | `*/15 * * * *` | | [`staging_elasticsearch_cluster_healthcheck`](#staging_elasticsearch_cluster_healthcheck) | `*/15 * * * *` | @@ -433,8 +433,9 @@ source from which to pull documents. There are two mechanisms that prevent this from happening: -1. The filtered index creation DAGs are not allowed to run if a data refresh for - the media type is already running. +1. The filtered index creation DAGs fail immediately if any of the DAGs that are + tagged as part of the `production-es-concurrency` group (including the data + refreshes) are currently running. 2. The data refresh DAGs will wait for any pre-existing filtered index creation DAG runs for the media type to finish before continuing. @@ -488,8 +489,9 @@ source from which to pull documents. There are two mechanisms that prevent this from happening: -1. The filtered index creation DAGs are not allowed to run if a data refresh for - the media type is already running. +1. The filtered index creation DAGs fail immediately if any of the DAGs that are + tagged as part of the `production-es-concurrency` group (including the data + refreshes) are currently running. 2. The data refresh DAGs will wait for any pre-existing filtered index creation DAG runs for the media type to finish before continuing. @@ -600,6 +602,13 @@ The resulting, merged configuration will be: } ``` +##### Race conditions + +Each DAG will fail immediately if any of the DAGs tagged as part of the +es-concurrency group for the DAG's environment is running. (E.g., the +`create_new_staging_es_index` DAG fails immediately if any DAGs tagged with +`staging-es-concurrency` are running.) + ### `create_new_staging_es_index` #### Create New ES Index DAG @@ -704,6 +713,13 @@ The resulting, merged configuration will be: } ``` +##### Race conditions + +Each DAG will fail immediately if any of the DAGs tagged as part of the +es-concurrency group for the DAG's environment is running. (E.g., the +`create_new_staging_es_index` DAG fails immediately if any DAGs tagged with +`staging-es-concurrency` are running.) + ### `create_proportional_by_source_staging_index` #### Create Proportional By Source Staging Index DAG @@ -732,16 +748,10 @@ This DAG is on a `None` schedule and is run manually. ##### Race conditions -Because this DAG runs on the staging ingestion server and staging elasticsearch -cluster, it does _not_ interfere with the `data_refresh` or -`create_filtered_index` DAGs. - -However, as the DAG operates on the staging API database it will exit -immediately if any of the following DAGs are running: - -- `staging_database_restore` -- `recreate_full_staging_index` -- `create_new_staging_es_index` +Because this DAG runs on the staging elasticsearch cluster, it does _not_ +interfere with the production `data_refresh` or `create_filtered_index` DAGs. +However, it will fail immediately if any of the DAGs tagged as part of the +`staging-es-concurrency` group are running. ### `delete_records` @@ -1071,6 +1081,13 @@ also delete that index afterward. This DAG is on a `None` schedule and is run manually. +##### Race conditions + +Each DAG will fail immediately if any of the DAGs tagged as part of the +es-concurrency group for the DAG's environment is running. (E.g., the +`point_staging_alias` DAG fails immediately if any DAGs tagged with +`staging-es-concurrency` are running.) + ### `point_staging_es_alias` #### Point ES Alias DAG @@ -1087,6 +1104,13 @@ also delete that index afterward. This DAG is on a `None` schedule and is run manually. +##### Race conditions + +Each DAG will fail immediately if any of the DAGs tagged as part of the +es-concurrency group for the DAG's environment is running. (E.g., the +`point_staging_alias` DAG fails immediately if any DAGs tagged with +`staging-es-concurrency` are running.) + ### `pr_review_reminders` #### PR Review Reminders @@ -1192,12 +1216,9 @@ Because this DAG runs on the staging ingestion server and staging elasticsearch cluster, it does _not_ interfere with the `data_refresh` or `create_filtered_index` DAGs. -However, as the DAG operates on the staging API database it will exit -immediately if any of the following DAGs are running: - -- `staging_database_restore` -- `create_proportional_by_provider_staging_index` -- `create_new_staging_es_index` +However, as the DAG operates on the staging API database and ES cluster it will +exit immediately if any of the DAGs tagged as part of the +`staging_es_concurrency` group are already running. ### `recreate_image_popularity_calculation` @@ -1277,7 +1298,7 @@ Notes: https://www.smk.dk/en/article/smk-api/ ### `staging_database_restore` -#### Update the staging database +#### Staging Database Restore DAG This DAG is responsible for updating the staging database using the most recent snapshot of the production database.