From 55be0943b3b15674e06403a91001d9fdde1cf710 Mon Sep 17 00:00:00 2001 From: Staci Cooper Date: Fri, 8 Mar 2024 14:44:29 -0800 Subject: [PATCH 01/11] Update taskgroup to optionally delete old index, add params to create_new_es_index dag --- catalog/dags/common/elasticsearch.py | 101 +++++++++++++----- .../create_new_es_index.py | 10 ++ .../create_new_es_index_dag.py | 46 +++++++- 3 files changed, 128 insertions(+), 29 deletions(-) diff --git a/catalog/dags/common/elasticsearch.py b/catalog/dags/common/elasticsearch.py index 92acc57b6fc..56f96af83c6 100644 --- a/catalog/dags/common/elasticsearch.py +++ b/catalog/dags/common/elasticsearch.py @@ -3,10 +3,11 @@ from typing import Literal, Union from airflow.decorators import task, task_group +from airflow.exceptions import AirflowSkipException from airflow.models.connection import Connection from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchPythonHook from airflow.sensors.base import PokeReturnValue -from airflow.utils.trigger_rule import TriggerRule +from elasticsearch.exceptions import NotFoundError from common.constants import REFRESH_POKE_INTERVAL @@ -175,51 +176,95 @@ def refresh_index(es_host: str, index_name: str): @task_group(group_id="point_alias") -def point_alias(index_name: str, alias: str, es_host: str): +def point_alias( + es_host: str, + target_index: str, + target_alias: str, + should_delete_old_index: bool = False, +): """ Point the target alias to the given index. If the alias is already being - used by one or more indices, it will first be removed from all of them. + used by another index, it will be removed from this index first. Optionally, + that index may also be automatically deleted. + + Required Arguments: + + es_host: Connection string for elasticsearch + target_index: Str identifier for the target index. May be either the index name + or an existing alias. + target_alias: The new alias to be applied to the target index + + Optional Arguments: + + should_delete_old_index: If True, any indices with the target alias _other_ + than the target index will be deleted. """ - @task.branch - def check_if_alias_exists(alias: str, es_host: str): - """Check if the alias already exists.""" + @task + def get_existing_index(es_host: str, target_alias: str): + """Get the index to which the target alias currently points, if it exists.""" es_conn = ElasticsearchPythonHook(hosts=[es_host]).get_conn - return ( - "point_alias.remove_existing_alias" - if es_conn.indices.exists_alias(name=alias) - else "point_alias.point_new_alias" - ) + + try: + response = es_conn.indices.get_alias(name=target_alias) + if len(response) > 1: + raise ValueError( + "Expected at most one existing index with target alias" + f"{target_alias}, but {len(response)} were found." + ) + return list(response.keys())[0] + except NotFoundError: + logger.info(f"Target alias {target_alias} does not exist.") + return None @task - def remove_existing_alias(alias: str, es_host: str): - """Remove the given alias from any indices to which it points.""" + def remove_alias_from_existing_index( + es_host: str, + index_name: str, + target_alias: str, + ): + """Remove the given alias from the given index.""" + if not index_name: + raise AirflowSkipException( + "No existing index from which to remove the target alias." + ) + es_conn = ElasticsearchPythonHook(hosts=[es_host]).get_conn response = es_conn.indices.delete_alias( - name=alias, - # Remove the alias from _all_ indices to which it currently - # applies - index="_all", + name=target_alias, + index=index_name, ) return response.get("acknowledged") @task def point_new_alias( es_host: str, - index_name: str, - alias: str, + target_index: str, + target_alias: str, ): es_conn = ElasticsearchPythonHook(hosts=[es_host]).get_conn - response = es_conn.indices.put_alias(index=index_name, name=alias) + response = es_conn.indices.put_alias(index=target_index, name=target_alias) return response.get("acknowledged") - exists_alias = check_if_alias_exists(alias, es_host) - remove_alias = remove_existing_alias(alias, es_host) + @task + def delete_old_index(es_host: str, index_name: str, should_delete_old_index: bool): + if not should_delete_old_index: + raise AirflowSkipException("`should_delete_old_index` is set to `False`.") + + es_conn = ElasticsearchPythonHook(hosts=[es_host]).get_conn + response = es_conn.indices.delete( + index=index_name, + ) + return response.get("acknowledged") + + existing_index = get_existing_index(es_host, target_alias) + remove_alias = remove_alias_from_existing_index( + es_host, existing_index, target_alias + ) + + point_alias = point_new_alias(es_host, target_index, target_alias) - point_alias = point_new_alias.override( - # The remove_alias task may be skipped. - trigger_rule=TriggerRule.NONE_FAILED, - )(es_host, index_name, alias) + delete_index = delete_old_index(es_host, existing_index, should_delete_old_index) - exists_alias >> [remove_alias, point_alias] - remove_alias >> point_alias + existing_index >> remove_alias >> point_alias + point_alias >> delete_index diff --git a/catalog/dags/elasticsearch_cluster/create_new_es_index/create_new_es_index.py b/catalog/dags/elasticsearch_cluster/create_new_es_index/create_new_es_index.py index a5e13a7da47..02ceb0940b0 100644 --- a/catalog/dags/elasticsearch_cluster/create_new_es_index/create_new_es_index.py +++ b/catalog/dags/elasticsearch_cluster/create_new_es_index/create_new_es_index.py @@ -68,3 +68,13 @@ def get_final_index_configuration( # Apply the desired index name config["index"] = index_name return config + + +@task.branch +def should_point_alias(target_alias): + if not target_alias: + # No target alias supplied, skip point_alias steps + return "notify_slack" + + # Proceed to the first step of the `point_alias` taskgroup + return "point_alias.get_existing_index" diff --git a/catalog/dags/elasticsearch_cluster/create_new_es_index/create_new_es_index_dag.py b/catalog/dags/elasticsearch_cluster/create_new_es_index/create_new_es_index_dag.py index 407af9a5fb9..1b940825b97 100644 --- a/catalog/dags/elasticsearch_cluster/create_new_es_index/create_new_es_index_dag.py +++ b/catalog/dags/elasticsearch_cluster/create_new_es_index/create_new_es_index_dag.py @@ -22,6 +22,8 @@ * `override_config`: boolean override; when True, the `index_config` will be used for the new index configuration _without_ merging any values from the source index config. +* `target_alias` : optional alias to be applied to the new index after reindexing. If the alias already applies to an existing index, it will be removed first. +* `should_delete_old_index`: whether to remove the index previously pointed to by the target_alias, if it exists. Defaults to False. ## Merging policy @@ -104,6 +106,7 @@ from airflow.utils.trigger_rule import TriggerRule from common import elasticsearch as es +from common import slack from common.constants import AUDIO, DAG_DEFAULT_ARGS, MEDIA_TYPES from common.sensors.utils import prevent_concurrency_with_dags from elasticsearch_cluster.create_new_es_index.create_new_es_index import ( @@ -113,6 +116,7 @@ get_final_index_configuration, get_index_name, merge_index_configurations, + should_point_alias, ) from elasticsearch_cluster.create_new_es_index.create_new_es_index_types import ( CREATE_NEW_INDEX_CONFIGS, @@ -191,6 +195,23 @@ def create_new_es_index_dag(config: CreateNewIndex): " configuration." ), ), + "target_alias": Param( + default=None, + type=["string", "null"], + description=( + "Optional alias which will be applied to the newly created index. If" + " the alias already exists, it will first be removed from the" + " index to which it previously pointed." + ), + ), + "should_delete_old_index": Param( + default=False, + type="boolean", + description=( + "Whether to delete the index previously pointed to by the" + " `target_alias`." + ), + ), }, ) @@ -242,11 +263,34 @@ def create_new_es_index_dag(config: CreateNewIndex): es_host=es_host, ) + check_alias = should_point_alias(target_alias="{{ params.target_alias }}") + + point_alias = es.point_alias( + es_host=es_host, + target_index=index_name, + target_alias="{{ params.target_alias }}", + should_delete_old_index="{{ params.should_delete_old_index }}", + ) + + notify_completion = slack.notify_slack.override( + trigger_rule=TriggerRule.NONE_FAILED + )( + text=( + f"New index { index_name } was successfully created with alias" + "{{ params.target_alias }}." + ), + dag_id=dag.dag_id, + username="Create New ES Index", + icon_emoji=":elasticsearch:", + ) + # Set up dependencies prevent_concurrency >> [es_host, index_name] index_name >> check_override >> [current_index_config, final_index_config] current_index_config >> merged_index_config >> final_index_config - final_index_config >> create_new_index >> reindex + final_index_config >> create_new_index >> reindex >> check_alias + check_alias >> [point_alias, notify_completion] + point_alias >> notify_completion return dag From a4a79cc1f7b8e48cdd05e6c4e9bbbdf1ac2821ba Mon Sep 17 00:00:00 2001 From: Staci Cooper Date: Fri, 8 Mar 2024 14:54:54 -0800 Subject: [PATCH 02/11] Add separate point alias DAG --- ...roportional_by_source_staging_index_dag.py | 4 +- .../point_alias/point_alias_dag.py | 92 +++++++++++++++++++ 2 files changed, 95 insertions(+), 1 deletion(-) create mode 100644 catalog/dags/elasticsearch_cluster/point_alias/point_alias_dag.py diff --git a/catalog/dags/elasticsearch_cluster/create_proportional_by_source_staging_index/create_proportional_by_source_staging_index_dag.py b/catalog/dags/elasticsearch_cluster/create_proportional_by_source_staging_index/create_proportional_by_source_staging_index_dag.py index 1903d5f0ce8..d64c673062e 100644 --- a/catalog/dags/elasticsearch_cluster/create_proportional_by_source_staging_index/create_proportional_by_source_staging_index_dag.py +++ b/catalog/dags/elasticsearch_cluster/create_proportional_by_source_staging_index/create_proportional_by_source_staging_index_dag.py @@ -171,7 +171,9 @@ def create_proportional_by_source_staging_index(): ) point_alias = es.point_alias( - index_name=destination_index_name, alias=destination_alias, es_host=es_host + es_host=es_host, + target_index=destination_index_name, + target_alias=destination_alias, ) notify_completion = slack.notify_slack( diff --git a/catalog/dags/elasticsearch_cluster/point_alias/point_alias_dag.py b/catalog/dags/elasticsearch_cluster/point_alias/point_alias_dag.py new file mode 100644 index 00000000000..50d0ef7b57d --- /dev/null +++ b/catalog/dags/elasticsearch_cluster/point_alias/point_alias_dag.py @@ -0,0 +1,92 @@ +""" +# Point Alias DAG + +This file generates our Point Alias DAGs using a factory function. A +separate DAG is generated for the staging and production environments. + +The DAGs are used to point a `target_alias` to a `target_index` in the +given environment. When the alias is applied, it is first removed from +any existing index to which it already applies; optionally, it can also +delete that index afterward. + +## When this DAG runs + +This DAG is on a `None` schedule and is run manually. +""" + +from datetime import datetime + +from airflow import DAG +from airflow.models.param import Param +from airflow.utils.trigger_rule import TriggerRule + +from common import elasticsearch as es +from common import slack +from common.constants import ( + DAG_DEFAULT_ARGS, + ENVIRONMENTS, +) + + +def point_alias_dag(environment: str): + dag = DAG( + dag_id=f"point_{environment}_alias", + default_args=DAG_DEFAULT_ARGS, + schedule=None, + start_date=datetime(2024, 1, 31), + tags=["database", "elasticsearch"], + max_active_runs=1, + catchup=False, + doc_md=__doc__, + params={ + "target_index": Param( + type="string", + description=( + "The existing Elasticsearch index to which the target alias" + " should be applied." + ), + ), + "target_alias": Param( + type="string", + description=( + "The alias which will be applied to the index. If" + " the alias already exists, it will first be removed from the" + " index to which it previously pointed." + ), + ), + "should_delete_old_index": Param( + default=False, + type="boolean", + description=( + "Whether to delete the index previously pointed to by the" + " `target_alias`." + ), + ), + }, + render_template_as_native_obj=True, + ) + + with dag: + es_host = es.get_es_host(environment=environment) + + point_alias = es.point_alias( + es_host=es_host, + target_index="{{ params.target_index }}", + target_alias="{{ params.target_alias }}", + should_delete_old_index="{{ params.should_delete_old_index }}", + ) + + notify_completion = slack.notify_slack.override( + trigger_rule=TriggerRule.NONE_FAILED + )( + text="Alias {{ params.target_alias }} applied to index {{ params.target_index }}.", + dag_id=dag.dag_id, + username="Point Alias", + icon_emoji=":elasticsearch:", + ) + + es_host >> point_alias >> notify_completion + + +for environment in ENVIRONMENTS: + point_alias_dag(environment) From c5410c51b64ac5ff02e9301707d7b07a8ee713ce Mon Sep 17 00:00:00 2001 From: Staci Cooper Date: Fri, 8 Mar 2024 15:44:02 -0800 Subject: [PATCH 03/11] Skip all point_alias tasks if target_alias not supplied * `point_alias` needs to have trigger_rule=TriggerRule.NONE_FAILED, because the previous step to remove existing alias my be skipped * however, this is a problem for DAGs that import the entire point_alias taskGroup and try to skip it using a branching operator. Because `point_alias` runs when NONE_FAILED, it will try to run even though the entire taskgroup has been skipped. * easiest solution is to have all the individual tasks in the point_alias group individually handle skipping if the appropriate params haven't been passed in --- catalog/dags/common/elasticsearch.py | 13 ++++++++++++- .../create_new_es_index/create_new_es_index.py | 10 ---------- .../create_new_es_index/create_new_es_index_dag.py | 6 +----- 3 files changed, 13 insertions(+), 16 deletions(-) diff --git a/catalog/dags/common/elasticsearch.py b/catalog/dags/common/elasticsearch.py index 56f96af83c6..1b8eca3cb53 100644 --- a/catalog/dags/common/elasticsearch.py +++ b/catalog/dags/common/elasticsearch.py @@ -7,6 +7,7 @@ from airflow.models.connection import Connection from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchPythonHook from airflow.sensors.base import PokeReturnValue +from airflow.utils.trigger_rule import TriggerRule from elasticsearch.exceptions import NotFoundError from common.constants import REFRESH_POKE_INTERVAL @@ -203,6 +204,9 @@ def point_alias( @task def get_existing_index(es_host: str, target_alias: str): """Get the index to which the target alias currently points, if it exists.""" + if not target_alias: + raise AirflowSkipException("No target alias was provided.") + es_conn = ElasticsearchPythonHook(hosts=[es_host]).get_conn try: @@ -242,6 +246,9 @@ def point_new_alias( target_index: str, target_alias: str, ): + if not target_alias: + raise AirflowSkipException("No target alias was provided.") + es_conn = ElasticsearchPythonHook(hosts=[es_host]).get_conn response = es_conn.indices.put_alias(index=target_index, name=target_alias) return response.get("acknowledged") @@ -262,7 +269,11 @@ def delete_old_index(es_host: str, index_name: str, should_delete_old_index: boo es_host, existing_index, target_alias ) - point_alias = point_new_alias(es_host, target_index, target_alias) + point_alias = point_new_alias.override( + # `remove_alias_from_existing_index` is skipped if there is no + # existing index + trigger_rule=TriggerRule.NONE_FAILED + )(es_host, target_index, target_alias) delete_index = delete_old_index(es_host, existing_index, should_delete_old_index) diff --git a/catalog/dags/elasticsearch_cluster/create_new_es_index/create_new_es_index.py b/catalog/dags/elasticsearch_cluster/create_new_es_index/create_new_es_index.py index 02ceb0940b0..a5e13a7da47 100644 --- a/catalog/dags/elasticsearch_cluster/create_new_es_index/create_new_es_index.py +++ b/catalog/dags/elasticsearch_cluster/create_new_es_index/create_new_es_index.py @@ -68,13 +68,3 @@ def get_final_index_configuration( # Apply the desired index name config["index"] = index_name return config - - -@task.branch -def should_point_alias(target_alias): - if not target_alias: - # No target alias supplied, skip point_alias steps - return "notify_slack" - - # Proceed to the first step of the `point_alias` taskgroup - return "point_alias.get_existing_index" diff --git a/catalog/dags/elasticsearch_cluster/create_new_es_index/create_new_es_index_dag.py b/catalog/dags/elasticsearch_cluster/create_new_es_index/create_new_es_index_dag.py index 1b940825b97..14febf9e546 100644 --- a/catalog/dags/elasticsearch_cluster/create_new_es_index/create_new_es_index_dag.py +++ b/catalog/dags/elasticsearch_cluster/create_new_es_index/create_new_es_index_dag.py @@ -116,7 +116,6 @@ get_final_index_configuration, get_index_name, merge_index_configurations, - should_point_alias, ) from elasticsearch_cluster.create_new_es_index.create_new_es_index_types import ( CREATE_NEW_INDEX_CONFIGS, @@ -263,8 +262,6 @@ def create_new_es_index_dag(config: CreateNewIndex): es_host=es_host, ) - check_alias = should_point_alias(target_alias="{{ params.target_alias }}") - point_alias = es.point_alias( es_host=es_host, target_index=index_name, @@ -288,8 +285,7 @@ def create_new_es_index_dag(config: CreateNewIndex): prevent_concurrency >> [es_host, index_name] index_name >> check_override >> [current_index_config, final_index_config] current_index_config >> merged_index_config >> final_index_config - final_index_config >> create_new_index >> reindex >> check_alias - check_alias >> [point_alias, notify_completion] + final_index_config >> create_new_index >> reindex >> point_alias point_alias >> notify_completion return dag From e0c3acc42eee5f25e109f662c20cd79efa3791c2 Mon Sep 17 00:00:00 2001 From: Staci Cooper Date: Fri, 8 Mar 2024 15:47:10 -0800 Subject: [PATCH 04/11] Update DAGs.md --- documentation/catalog/reference/DAGs.md | 46 +++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/documentation/catalog/reference/DAGs.md b/documentation/catalog/reference/DAGs.md index 15a6875e0a8..c5e923891ff 100644 --- a/documentation/catalog/reference/DAGs.md +++ b/documentation/catalog/reference/DAGs.md @@ -47,6 +47,8 @@ The following are DAGs grouped by their primary tag: | [`batched_update`](#batched_update) | `None` | | [`create_proportional_by_source_staging_index`](#create_proportional_by_source_staging_index) | `None` | | [`delete_records`](#delete_records) | `None` | +| [`point_production_alias`](#point_production_alias) | `None` | +| [`point_staging_alias`](#point_staging_alias) | `None` | | [`recreate_audio_popularity_calculation`](#recreate_audio_popularity_calculation) | `None` | | [`recreate_full_staging_index`](#recreate_full_staging_index) | `None` | | [`recreate_image_popularity_calculation`](#recreate_image_popularity_calculation) | `None` | @@ -166,6 +168,8 @@ The following is documentation associated with each DAG (where available): 1. [`oauth2_token_refresh`](#oauth2_token_refresh) 1. [`phylopic_reingestion_workflow`](#phylopic_reingestion_workflow) 1. [`phylopic_workflow`](#phylopic_workflow) +1. [`point_production_alias`](#point_production_alias) +1. [`point_staging_alias`](#point_staging_alias) 1. [`pr_review_reminders`](#pr_review_reminders) 1. [`production_elasticsearch_cluster_healthcheck`](#production_elasticsearch_cluster_healthcheck) 1. [`rawpixel_workflow`](#rawpixel_workflow) @@ -525,6 +529,11 @@ available: - `override_config`: boolean override; when True, the `index_config` will be used for the new index configuration _without_ merging any values from the source index config. +- `target_alias` : optional alias to be applied to the new index after + reindexing. If the alias already applies to an existing index, it will be + removed first. +- `should_delete_old_index`: whether to remove the index previously pointed to + by the target_alias, if it exists. Defaults to False. ##### Merging policy @@ -624,6 +633,11 @@ available: - `override_config`: boolean override; when True, the `index_config` will be used for the new index configuration _without_ merging any values from the source index config. +- `target_alias` : optional alias to be applied to the new index after + reindexing. If the alias already applies to an existing index, it will be + removed first. +- `should_delete_old_index`: whether to remove the index previously pointed to + by the target_alias, if it exists. Defaults to False. ##### Merging policy @@ -1052,6 +1066,38 @@ Output: TSV file containing the image, their respective meta-data. Notes: http://api-docs.phylopic.org/v2/ No rate limit specified. +### `point_production_alias` + +#### Point Alias DAG + +This file generates our Point Alias DAGs using a factory function. A separate +DAG is generated for the staging and production environments. + +The DAGs are used to point a `target_alias` to a `target_index` in the given +environment. When the alias is applied, it is first removed from any existing +index to which it already applies; optionally, it can also delete that index +afterward. + +##### When this DAG runs + +This DAG is on a `None` schedule and is run manually. + +### `point_staging_alias` + +#### Point Alias DAG + +This file generates our Point Alias DAGs using a factory function. A separate +DAG is generated for the staging and production environments. + +The DAGs are used to point a `target_alias` to a `target_index` in the given +environment. When the alias is applied, it is first removed from any existing +index to which it already applies; optionally, it can also delete that index +afterward. + +##### When this DAG runs + +This DAG is on a `None` schedule and is run manually. + ### `pr_review_reminders` #### PR Review Reminders From 50f082f318f682cdd8ba7f479df47797b507e1a1 Mon Sep 17 00:00:00 2001 From: Staci Cooper Date: Fri, 8 Mar 2024 15:51:02 -0800 Subject: [PATCH 05/11] lint --- .../create_new_es_index/create_new_es_index_dag.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/catalog/dags/elasticsearch_cluster/create_new_es_index/create_new_es_index_dag.py b/catalog/dags/elasticsearch_cluster/create_new_es_index/create_new_es_index_dag.py index 14febf9e546..54191bb162e 100644 --- a/catalog/dags/elasticsearch_cluster/create_new_es_index/create_new_es_index_dag.py +++ b/catalog/dags/elasticsearch_cluster/create_new_es_index/create_new_es_index_dag.py @@ -22,8 +22,11 @@ * `override_config`: boolean override; when True, the `index_config` will be used for the new index configuration _without_ merging any values from the source index config. -* `target_alias` : optional alias to be applied to the new index after reindexing. If the alias already applies to an existing index, it will be removed first. -* `should_delete_old_index`: whether to remove the index previously pointed to by the target_alias, if it exists. Defaults to False. +* `target_alias` : optional alias to be applied to the new index after reindexing. + If the alias already applies to an existing index, it will be + removed first. +* `should_delete_old_index`: whether to remove the index previously pointed to by + the target_alias, if it exists. Defaults to False. ## Merging policy From 9f90f63ea484135aa9f7c67f8bb30a7721db5ba3 Mon Sep 17 00:00:00 2001 From: Staci Cooper Date: Fri, 8 Mar 2024 15:58:26 -0800 Subject: [PATCH 06/11] Notify slack if earlier tasks skipped --- .../create_proportional_by_source_staging_index_dag.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/catalog/dags/elasticsearch_cluster/create_proportional_by_source_staging_index/create_proportional_by_source_staging_index_dag.py b/catalog/dags/elasticsearch_cluster/create_proportional_by_source_staging_index/create_proportional_by_source_staging_index_dag.py index d64c673062e..28f5fa237d6 100644 --- a/catalog/dags/elasticsearch_cluster/create_proportional_by_source_staging_index/create_proportional_by_source_staging_index_dag.py +++ b/catalog/dags/elasticsearch_cluster/create_proportional_by_source_staging_index/create_proportional_by_source_staging_index_dag.py @@ -41,6 +41,7 @@ from airflow.decorators import dag from airflow.models.param import Param +from airflow.utils.trigger_rule import TriggerRule from common import elasticsearch as es from common import slack @@ -176,7 +177,9 @@ def create_proportional_by_source_staging_index(): target_alias=destination_alias, ) - notify_completion = slack.notify_slack( + notify_completion = slack.notify_slack.override( + trigger_rule=TriggerRule.NONE_FAILED + )( text=f"Reindexing complete for {destination_index_name}.", dag_id=DAG_ID, username="Proportional by Source Staging Index Creation", From 373045bf7c4c514e5cad86f6b47fc1e4337139ec Mon Sep 17 00:00:00 2001 From: Staci Cooper Date: Tue, 12 Mar 2024 09:19:30 -0700 Subject: [PATCH 07/11] Make remove/point alias atomic operation, skip delete when index not provided --- catalog/dags/common/elasticsearch.py | 56 ++++++++++------------------ 1 file changed, 20 insertions(+), 36 deletions(-) diff --git a/catalog/dags/common/elasticsearch.py b/catalog/dags/common/elasticsearch.py index 1b8eca3cb53..16ef8332d8c 100644 --- a/catalog/dags/common/elasticsearch.py +++ b/catalog/dags/common/elasticsearch.py @@ -7,7 +7,6 @@ from airflow.models.connection import Connection from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchPythonHook from airflow.sensors.base import PokeReturnValue -from airflow.utils.trigger_rule import TriggerRule from elasticsearch.exceptions import NotFoundError from common.constants import REFRESH_POKE_INTERVAL @@ -197,8 +196,8 @@ def point_alias( Optional Arguments: - should_delete_old_index: If True, any indices with the target alias _other_ - than the target index will be deleted. + should_delete_old_index: If True, the index previously pointed to by the target + alias (if one exists) will be deleted. """ @task @@ -221,61 +220,46 @@ def get_existing_index(es_host: str, target_alias: str): logger.info(f"Target alias {target_alias} does not exist.") return None - @task - def remove_alias_from_existing_index( - es_host: str, - index_name: str, - target_alias: str, - ): - """Remove the given alias from the given index.""" - if not index_name: - raise AirflowSkipException( - "No existing index from which to remove the target alias." - ) - - es_conn = ElasticsearchPythonHook(hosts=[es_host]).get_conn - response = es_conn.indices.delete_alias( - name=target_alias, - index=index_name, - ) - return response.get("acknowledged") - @task def point_new_alias( es_host: str, target_index: str, + existing_index: str, target_alias: str, ): + """ + Remove the target_alias from the existing index to which it applies, if + applicable, and point it to the target_index in one atomic operation. + """ if not target_alias: raise AirflowSkipException("No target alias was provided.") es_conn = ElasticsearchPythonHook(hosts=[es_host]).get_conn - response = es_conn.indices.put_alias(index=target_index, name=target_alias) + + actions = [] + if existing_index: + actions.append({"remove": {"index": existing_index, "alias": target_alias}}) + actions.append({"add": {"index": target_index, "alias": target_alias}}) + logger.info(f"Applying actions: {actions}") + + response = es_conn.indices.update_aliases(body={"actions": actions}) return response.get("acknowledged") @task def delete_old_index(es_host: str, index_name: str, should_delete_old_index: bool): if not should_delete_old_index: raise AirflowSkipException("`should_delete_old_index` is set to `False`.") + if not index_name: + raise AirflowSkipException("No applicable index to delete.") es_conn = ElasticsearchPythonHook(hosts=[es_host]).get_conn - response = es_conn.indices.delete( - index=index_name, - ) + response = es_conn.indices.delete(index=index_name) return response.get("acknowledged") existing_index = get_existing_index(es_host, target_alias) - remove_alias = remove_alias_from_existing_index( - es_host, existing_index, target_alias - ) - point_alias = point_new_alias.override( - # `remove_alias_from_existing_index` is skipped if there is no - # existing index - trigger_rule=TriggerRule.NONE_FAILED - )(es_host, target_index, target_alias) + point_alias = point_new_alias(es_host, target_index, existing_index, target_alias) delete_index = delete_old_index(es_host, existing_index, should_delete_old_index) - existing_index >> remove_alias >> point_alias - point_alias >> delete_index + existing_index >> point_alias >> delete_index From 9fdd9d3dd72fd363e6c81ee83c377cd8a688b164 Mon Sep 17 00:00:00 2001 From: Staci Cooper Date: Tue, 12 Mar 2024 09:31:30 -0700 Subject: [PATCH 08/11] Include 'es' in dag ids --- .../point_es_alias_dag.py} | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) rename catalog/dags/elasticsearch_cluster/{point_alias/point_alias_dag.py => point_es_alias/point_es_alias_dag.py} (92%) diff --git a/catalog/dags/elasticsearch_cluster/point_alias/point_alias_dag.py b/catalog/dags/elasticsearch_cluster/point_es_alias/point_es_alias_dag.py similarity index 92% rename from catalog/dags/elasticsearch_cluster/point_alias/point_alias_dag.py rename to catalog/dags/elasticsearch_cluster/point_es_alias/point_es_alias_dag.py index 50d0ef7b57d..816cef8e5ed 100644 --- a/catalog/dags/elasticsearch_cluster/point_alias/point_alias_dag.py +++ b/catalog/dags/elasticsearch_cluster/point_es_alias/point_es_alias_dag.py @@ -1,7 +1,7 @@ """ -# Point Alias DAG +# Point Elasticsearch Alias DAG -This file generates our Point Alias DAGs using a factory function. A +This file generates our Point ES Alias DAGs using a factory function. A separate DAG is generated for the staging and production environments. The DAGs are used to point a `target_alias` to a `target_index` in the @@ -28,9 +28,9 @@ ) -def point_alias_dag(environment: str): +def point_es_alias_dag(environment: str): dag = DAG( - dag_id=f"point_{environment}_alias", + dag_id=f"point_{environment}_es_alias", default_args=DAG_DEFAULT_ARGS, schedule=None, start_date=datetime(2024, 1, 31), @@ -89,4 +89,4 @@ def point_alias_dag(environment: str): for environment in ENVIRONMENTS: - point_alias_dag(environment) + point_es_alias_dag(environment) From a6acc31e032bc303a7b5c0da1eda27a95bb19157 Mon Sep 17 00:00:00 2001 From: Staci Cooper Date: Tue, 12 Mar 2024 09:40:16 -0700 Subject: [PATCH 09/11] Add option to delete old index to the proportional index dag --- .../create_proportional_by_source_staging_index_dag.py | 9 +++++++++ .../point_es_alias/point_es_alias_dag.py | 8 ++++---- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/catalog/dags/elasticsearch_cluster/create_proportional_by_source_staging_index/create_proportional_by_source_staging_index_dag.py b/catalog/dags/elasticsearch_cluster/create_proportional_by_source_staging_index/create_proportional_by_source_staging_index_dag.py index 28f5fa237d6..6f3331104d6 100644 --- a/catalog/dags/elasticsearch_cluster/create_proportional_by_source_staging_index/create_proportional_by_source_staging_index_dag.py +++ b/catalog/dags/elasticsearch_cluster/create_proportional_by_source_staging_index/create_proportional_by_source_staging_index_dag.py @@ -103,6 +103,14 @@ " the index aliased to `-filtered` will be used." ), ), + "should_delete_old_index": Param( + default=False, + type="boolean", + description=( + "Whether to delete the index previously pointed to by the" + " `{media_type}-subset-by-source` alias." + ), + ), }, render_template_as_native_obj=True, ) @@ -175,6 +183,7 @@ def create_proportional_by_source_staging_index(): es_host=es_host, target_index=destination_index_name, target_alias=destination_alias, + should_delete_old_index="{{ params.should_delete_old_index }}", ) notify_completion = slack.notify_slack.override( diff --git a/catalog/dags/elasticsearch_cluster/point_es_alias/point_es_alias_dag.py b/catalog/dags/elasticsearch_cluster/point_es_alias/point_es_alias_dag.py index 816cef8e5ed..b2f441297bd 100644 --- a/catalog/dags/elasticsearch_cluster/point_es_alias/point_es_alias_dag.py +++ b/catalog/dags/elasticsearch_cluster/point_es_alias/point_es_alias_dag.py @@ -1,13 +1,13 @@ """ -# Point Elasticsearch Alias DAG +# Point ES Alias DAG This file generates our Point ES Alias DAGs using a factory function. A separate DAG is generated for the staging and production environments. The DAGs are used to point a `target_alias` to a `target_index` in the -given environment. When the alias is applied, it is first removed from -any existing index to which it already applies; optionally, it can also -delete that index afterward. +given environment's elasticsearch cluster. When the alias is applied, it +is first removed from any existing index to which it already applies; +optionally, it can also delete that index afterward. ## When this DAG runs From 40982a1063eecd66884c213f7905939326de366a Mon Sep 17 00:00:00 2001 From: Staci Cooper Date: Tue, 12 Mar 2024 09:46:00 -0700 Subject: [PATCH 10/11] Update DAGs.md --- documentation/catalog/reference/DAGs.md | 32 ++++++++++++------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/documentation/catalog/reference/DAGs.md b/documentation/catalog/reference/DAGs.md index c5e923891ff..df31389354e 100644 --- a/documentation/catalog/reference/DAGs.md +++ b/documentation/catalog/reference/DAGs.md @@ -47,8 +47,8 @@ The following are DAGs grouped by their primary tag: | [`batched_update`](#batched_update) | `None` | | [`create_proportional_by_source_staging_index`](#create_proportional_by_source_staging_index) | `None` | | [`delete_records`](#delete_records) | `None` | -| [`point_production_alias`](#point_production_alias) | `None` | -| [`point_staging_alias`](#point_staging_alias) | `None` | +| [`point_production_es_alias`](#point_production_es_alias) | `None` | +| [`point_staging_es_alias`](#point_staging_es_alias) | `None` | | [`recreate_audio_popularity_calculation`](#recreate_audio_popularity_calculation) | `None` | | [`recreate_full_staging_index`](#recreate_full_staging_index) | `None` | | [`recreate_image_popularity_calculation`](#recreate_image_popularity_calculation) | `None` | @@ -168,8 +168,8 @@ The following is documentation associated with each DAG (where available): 1. [`oauth2_token_refresh`](#oauth2_token_refresh) 1. [`phylopic_reingestion_workflow`](#phylopic_reingestion_workflow) 1. [`phylopic_workflow`](#phylopic_workflow) -1. [`point_production_alias`](#point_production_alias) -1. [`point_staging_alias`](#point_staging_alias) +1. [`point_production_es_alias`](#point_production_es_alias) +1. [`point_staging_es_alias`](#point_staging_es_alias) 1. [`pr_review_reminders`](#pr_review_reminders) 1. [`production_elasticsearch_cluster_healthcheck`](#production_elasticsearch_cluster_healthcheck) 1. [`rawpixel_workflow`](#rawpixel_workflow) @@ -1066,33 +1066,33 @@ Output: TSV file containing the image, their respective meta-data. Notes: http://api-docs.phylopic.org/v2/ No rate limit specified. -### `point_production_alias` +### `point_production_es_alias` -#### Point Alias DAG +#### Point ES Alias DAG -This file generates our Point Alias DAGs using a factory function. A separate +This file generates our Point ES Alias DAGs using a factory function. A separate DAG is generated for the staging and production environments. The DAGs are used to point a `target_alias` to a `target_index` in the given -environment. When the alias is applied, it is first removed from any existing -index to which it already applies; optionally, it can also delete that index -afterward. +environment's elasticsearch cluster. When the alias is applied, it is first +removed from any existing index to which it already applies; optionally, it can +also delete that index afterward. ##### When this DAG runs This DAG is on a `None` schedule and is run manually. -### `point_staging_alias` +### `point_staging_es_alias` -#### Point Alias DAG +#### Point ES Alias DAG -This file generates our Point Alias DAGs using a factory function. A separate +This file generates our Point ES Alias DAGs using a factory function. A separate DAG is generated for the staging and production environments. The DAGs are used to point a `target_alias` to a `target_index` in the given -environment. When the alias is applied, it is first removed from any existing -index to which it already applies; optionally, it can also delete that index -afterward. +environment's elasticsearch cluster. When the alias is applied, it is first +removed from any existing index to which it already applies; optionally, it can +also delete that index afterward. ##### When this DAG runs From d609c78d0191747d5d59018a37b6fbe74bae203b Mon Sep 17 00:00:00 2001 From: Staci Cooper Date: Tue, 12 Mar 2024 09:50:43 -0700 Subject: [PATCH 11/11] Update param list in proportional dag docs --- .../create_proportional_by_source_staging_index_dag.py | 2 ++ documentation/catalog/reference/DAGs.md | 2 ++ 2 files changed, 4 insertions(+) diff --git a/catalog/dags/elasticsearch_cluster/create_proportional_by_source_staging_index/create_proportional_by_source_staging_index_dag.py b/catalog/dags/elasticsearch_cluster/create_proportional_by_source_staging_index/create_proportional_by_source_staging_index_dag.py index 6f3331104d6..9d4ccd087b5 100644 --- a/catalog/dags/elasticsearch_cluster/create_proportional_by_source_staging_index/create_proportional_by_source_staging_index_dag.py +++ b/catalog/dags/elasticsearch_cluster/create_proportional_by_source_staging_index/create_proportional_by_source_staging_index_dag.py @@ -19,6 +19,8 @@ * source_index: An existing staging Elasticsearch index to use as the basis for the new index. If not provided, the index aliased to `-filtered` will be used. +* should_delete_old_index: If True, the index previously pointed to by the target + alias (if one exists) will be deleted. ## When this DAG runs diff --git a/documentation/catalog/reference/DAGs.md b/documentation/catalog/reference/DAGs.md index df31389354e..2dd9b88a559 100644 --- a/documentation/catalog/reference/DAGs.md +++ b/documentation/catalog/reference/DAGs.md @@ -731,6 +731,8 @@ Optional params: - source_index: An existing staging Elasticsearch index to use as the basis for the new index. If not provided, the index aliased to `-filtered` will be used. +- should_delete_old_index: If True, the index previously pointed to by the + target alias (if one exists) will be deleted. ##### When this DAG runs