Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add point_alias DAG and add alias params to create_new_es_index DAGs #3890

Merged
merged 11 commits into from
Mar 14, 2024
106 changes: 73 additions & 33 deletions catalog/dags/common/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
from typing import Literal, Union

from airflow.decorators import task, task_group
from airflow.exceptions import AirflowSkipException
from airflow.models.connection import Connection
from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchPythonHook
from airflow.sensors.base import PokeReturnValue
from airflow.utils.trigger_rule import TriggerRule
from elasticsearch.exceptions import NotFoundError

from common.constants import REFRESH_POKE_INTERVAL

Expand Down Expand Up @@ -175,51 +176,90 @@ def refresh_index(es_host: str, index_name: str):


@task_group(group_id="point_alias")
def point_alias(index_name: str, alias: str, es_host: str):
def point_alias(
es_host: str,
target_index: str,
target_alias: str,
should_delete_old_index: bool = False,
):
"""
Point the target alias to the given index. If the alias is already being
used by one or more indices, it will first be removed from all of them.
"""
used by another index, it will be removed from this index first. Optionally,
that index may also be automatically deleted.

@task.branch
def check_if_alias_exists(alias: str, es_host: str):
"""Check if the alias already exists."""
es_conn = ElasticsearchPythonHook(hosts=[es_host]).get_conn
return (
"point_alias.remove_existing_alias"
if es_conn.indices.exists_alias(name=alias)
else "point_alias.point_new_alias"
)
Required Arguments:

es_host: Connection string for elasticsearch
target_index: Str identifier for the target index. May be either the index name
or an existing alias.
target_alias: The new alias to be applied to the target index

Optional Arguments:

should_delete_old_index: If True, the index previously pointed to by the target
alias (if one exists) will be deleted.
"""

@task
def remove_existing_alias(alias: str, es_host: str):
"""Remove the given alias from any indices to which it points."""
def get_existing_index(es_host: str, target_alias: str):
"""Get the index to which the target alias currently points, if it exists."""
if not target_alias:
raise AirflowSkipException("No target alias was provided.")

es_conn = ElasticsearchPythonHook(hosts=[es_host]).get_conn
response = es_conn.indices.delete_alias(
name=alias,
# Remove the alias from _all_ indices to which it currently
# applies
index="_all",
)
return response.get("acknowledged")

try:
response = es_conn.indices.get_alias(name=target_alias)
if len(response) > 1:
raise ValueError(
"Expected at most one existing index with target alias"
f"{target_alias}, but {len(response)} were found."
)
return list(response.keys())[0]
except NotFoundError:
logger.info(f"Target alias {target_alias} does not exist.")
return None

@task
def point_new_alias(
es_host: str,
index_name: str,
alias: str,
target_index: str,
existing_index: str,
target_alias: str,
):
"""
Remove the target_alias from the existing index to which it applies, if
applicable, and point it to the target_index in one atomic operation.
"""
if not target_alias:
raise AirflowSkipException("No target alias was provided.")

es_conn = ElasticsearchPythonHook(hosts=[es_host]).get_conn
response = es_conn.indices.put_alias(index=index_name, name=alias)

actions = []
if existing_index:
actions.append({"remove": {"index": existing_index, "alias": target_alias}})
actions.append({"add": {"index": target_index, "alias": target_alias}})
logger.info(f"Applying actions: {actions}")

response = es_conn.indices.update_aliases(body={"actions": actions})
return response.get("acknowledged")

exists_alias = check_if_alias_exists(alias, es_host)
remove_alias = remove_existing_alias(alias, es_host)
@task
def delete_old_index(es_host: str, index_name: str, should_delete_old_index: bool):
if not should_delete_old_index:
raise AirflowSkipException("`should_delete_old_index` is set to `False`.")
if not index_name:
raise AirflowSkipException("No applicable index to delete.")

es_conn = ElasticsearchPythonHook(hosts=[es_host]).get_conn
response = es_conn.indices.delete(index=index_name)
return response.get("acknowledged")

existing_index = get_existing_index(es_host, target_alias)

point_alias = point_new_alias(es_host, target_index, existing_index, target_alias)

point_alias = point_new_alias.override(
# The remove_alias task may be skipped.
trigger_rule=TriggerRule.NONE_FAILED,
)(es_host, index_name, alias)
delete_index = delete_old_index(es_host, existing_index, should_delete_old_index)

exists_alias >> [remove_alias, point_alias]
remove_alias >> point_alias
existing_index >> point_alias >> delete_index
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
* `override_config`: boolean override; when True, the `index_config` will be used
for the new index configuration _without_ merging any values
from the source index config.
* `target_alias` : optional alias to be applied to the new index after reindexing.
If the alias already applies to an existing index, it will be
removed first.
* `should_delete_old_index`: whether to remove the index previously pointed to by
the target_alias, if it exists. Defaults to False.

## Merging policy

Expand Down Expand Up @@ -104,6 +109,7 @@
from airflow.utils.trigger_rule import TriggerRule

from common import elasticsearch as es
from common import slack
from common.constants import AUDIO, DAG_DEFAULT_ARGS, MEDIA_TYPES
from common.sensors.utils import prevent_concurrency_with_dags
from elasticsearch_cluster.create_new_es_index.create_new_es_index import (
Expand Down Expand Up @@ -191,6 +197,23 @@ def create_new_es_index_dag(config: CreateNewIndex):
" configuration."
),
),
"target_alias": Param(
default=None,
type=["string", "null"],
description=(
"Optional alias which will be applied to the newly created index. If"
" the alias already exists, it will first be removed from the"
" index to which it previously pointed."
),
),
"should_delete_old_index": Param(
default=False,
type="boolean",
description=(
"Whether to delete the index previously pointed to by the"
" `target_alias`."
),
),
},
)

Expand Down Expand Up @@ -242,11 +265,31 @@ def create_new_es_index_dag(config: CreateNewIndex):
es_host=es_host,
)

point_alias = es.point_alias(
es_host=es_host,
target_index=index_name,
target_alias="{{ params.target_alias }}",
should_delete_old_index="{{ params.should_delete_old_index }}",
)

notify_completion = slack.notify_slack.override(
trigger_rule=TriggerRule.NONE_FAILED
)(
text=(
f"New index { index_name } was successfully created with alias"
"{{ params.target_alias }}."
),
dag_id=dag.dag_id,
username="Create New ES Index",
icon_emoji=":elasticsearch:",
)

# Set up dependencies
prevent_concurrency >> [es_host, index_name]
index_name >> check_override >> [current_index_config, final_index_config]
current_index_config >> merged_index_config >> final_index_config
final_index_config >> create_new_index >> reindex
final_index_config >> create_new_index >> reindex >> point_alias
point_alias >> notify_completion

return dag

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
* source_index: An existing staging Elasticsearch index to use as the basis for
the new index. If not provided, the index aliased to
`<media_type>-filtered` will be used.
* should_delete_old_index: If True, the index previously pointed to by the target
alias (if one exists) will be deleted.

## When this DAG runs

Expand All @@ -41,6 +43,7 @@

from airflow.decorators import dag
from airflow.models.param import Param
from airflow.utils.trigger_rule import TriggerRule

from common import elasticsearch as es
from common import slack
Expand Down Expand Up @@ -102,6 +105,14 @@
" the index aliased to `<media_type>-filtered` will be used."
),
),
"should_delete_old_index": Param(
default=False,
type="boolean",
description=(
"Whether to delete the index previously pointed to by the"
" `{media_type}-subset-by-source` alias."
),
),
},
render_template_as_native_obj=True,
)
Expand Down Expand Up @@ -171,10 +182,15 @@ def create_proportional_by_source_staging_index():
)

point_alias = es.point_alias(
index_name=destination_index_name, alias=destination_alias, es_host=es_host
es_host=es_host,
target_index=destination_index_name,
target_alias=destination_alias,
stacimc marked this conversation as resolved.
Show resolved Hide resolved
should_delete_old_index="{{ params.should_delete_old_index }}",
)

notify_completion = slack.notify_slack(
notify_completion = slack.notify_slack.override(
trigger_rule=TriggerRule.NONE_FAILED
)(
text=f"Reindexing complete for {destination_index_name}.",
dag_id=DAG_ID,
username="Proportional by Source Staging Index Creation",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
"""
# Point ES Alias DAG

This file generates our Point ES Alias DAGs using a factory function. A
separate DAG is generated for the staging and production environments.

The DAGs are used to point a `target_alias` to a `target_index` in the
given environment's elasticsearch cluster. When the alias is applied, it
is first removed from any existing index to which it already applies;
optionally, it can also delete that index afterward.

## When this DAG runs

This DAG is on a `None` schedule and is run manually.
"""

from datetime import datetime

from airflow import DAG
from airflow.models.param import Param
from airflow.utils.trigger_rule import TriggerRule

from common import elasticsearch as es
from common import slack
from common.constants import (
DAG_DEFAULT_ARGS,
ENVIRONMENTS,
)


def point_es_alias_dag(environment: str):
dag = DAG(
dag_id=f"point_{environment}_es_alias",
default_args=DAG_DEFAULT_ARGS,
schedule=None,
start_date=datetime(2024, 1, 31),
tags=["database", "elasticsearch"],
max_active_runs=1,
catchup=False,
doc_md=__doc__,
params={
"target_index": Param(
type="string",
description=(
"The existing Elasticsearch index to which the target alias"
" should be applied."
),
),
"target_alias": Param(
type="string",
description=(
"The alias which will be applied to the index. If"
" the alias already exists, it will first be removed from the"
" index to which it previously pointed."
),
),
"should_delete_old_index": Param(
default=False,
type="boolean",
description=(
"Whether to delete the index previously pointed to by the"
" `target_alias`."
),
),
},
render_template_as_native_obj=True,
)

with dag:
es_host = es.get_es_host(environment=environment)

point_alias = es.point_alias(
es_host=es_host,
target_index="{{ params.target_index }}",
target_alias="{{ params.target_alias }}",
should_delete_old_index="{{ params.should_delete_old_index }}",
)

notify_completion = slack.notify_slack.override(
trigger_rule=TriggerRule.NONE_FAILED
)(
text="Alias {{ params.target_alias }} applied to index {{ params.target_index }}.",
dag_id=dag.dag_id,
username="Point Alias",
icon_emoji=":elasticsearch:",
)

es_host >> point_alias >> notify_completion


for environment in ENVIRONMENTS:
point_es_alias_dag(environment)
Loading
Loading