From a5014aa8fa223991d1d5f81d2218d56500a92c8c Mon Sep 17 00:00:00 2001 From: Staci Cooper Date: Tue, 13 Feb 2024 16:43:53 -0800 Subject: [PATCH] Do not perform multiple source reindexes simultaneously --- catalog/dags/common/elasticsearch.py | 4 +++- .../create_proportional_by_source_staging_index_dag.py | 1 + 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/catalog/dags/common/elasticsearch.py b/catalog/dags/common/elasticsearch.py index c8022facd75..05cd1374671 100644 --- a/catalog/dags/common/elasticsearch.py +++ b/catalog/dags/common/elasticsearch.py @@ -96,8 +96,9 @@ def trigger_and_wait_for_reindex( es_host: str, max_docs: int | None = None, slices: Union[int, Literal["auto"]] = "auto", + max_active_tis_per_dagrun: int | None = None, ): - @task + @task(max_active_tis_per_dagrun=max_active_tis_per_dagrun) def trigger_reindex( es_host: str, destination_index: str, @@ -162,6 +163,7 @@ def _wait_for_reindex(task_id: str, expected_docs: int, es_host: str): "expected_docs": max_docs, "es_host": es_host, }, + max_active_tis_per_dagrun=max_active_tis_per_dagrun, ) trigger_reindex_task >> wait_for_reindex diff --git a/catalog/dags/elasticsearch_cluster/create_proportional_by_source_staging_index/create_proportional_by_source_staging_index_dag.py b/catalog/dags/elasticsearch_cluster/create_proportional_by_source_staging_index/create_proportional_by_source_staging_index_dag.py index 0de62252d16..da9d07cd4ba 100644 --- a/catalog/dags/elasticsearch_cluster/create_proportional_by_source_staging_index/create_proportional_by_source_staging_index_dag.py +++ b/catalog/dags/elasticsearch_cluster/create_proportional_by_source_staging_index/create_proportional_by_source_staging_index_dag.py @@ -164,6 +164,7 @@ def create_proportional_by_source_staging_index(): # not work reliably and the final proportions may be incorrect. slices=None, es_host=es_host, + max_active_tis_per_dagrun=1, ).expand_kwargs(desired_source_counts) point_alias = es.point_alias(