Skip to content

Commit

Permalink
Make media_type a dag param, refactor with TaskFlow
Browse files Browse the repository at this point in the history
  • Loading branch information
stacimc committed Nov 22, 2023
1 parent 439bc1d commit e8e3064
Show file tree
Hide file tree
Showing 7 changed files with 224 additions and 200 deletions.
2 changes: 1 addition & 1 deletion catalog/dags/data_refresh/create_filtered_index_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,5 +161,5 @@ def prevent_concurrency_with_data_refresh(**context):
return dag


for data_refresh in DATA_REFRESH_CONFIGS:
for data_refresh in DATA_REFRESH_CONFIGS.values():
create_filtered_index_dag = create_filtered_index_creation_dag(data_refresh)
4 changes: 2 additions & 2 deletions catalog/dags/data_refresh/dag_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,9 @@ def create_data_refresh_dag(data_refresh: DataRefresh, external_dag_ids: Sequenc


# Generate a data refresh DAG for each DATA_REFRESH_CONFIG.
all_data_refresh_dag_ids = {refresh.dag_id for refresh in DATA_REFRESH_CONFIGS}
all_data_refresh_dag_ids = {refresh.dag_id for refresh in DATA_REFRESH_CONFIGS.values()}

for data_refresh in DATA_REFRESH_CONFIGS:
for data_refresh in DATA_REFRESH_CONFIGS.values():
# Construct a set of all data refresh DAG ids other than the current DAG
other_dag_ids = all_data_refresh_dag_ids - {data_refresh.dag_id}

Expand Down
10 changes: 5 additions & 5 deletions catalog/dags/data_refresh/data_refresh_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from dataclasses import dataclass, field
from datetime import datetime, timedelta

from common.constants import REFRESH_POKE_INTERVAL
from common.constants import AUDIO, IMAGE, REFRESH_POKE_INTERVAL


@dataclass
Expand Down Expand Up @@ -71,8 +71,8 @@ def __post_init__(self):
self.dag_id = f"{self.media_type}_data_refresh"


DATA_REFRESH_CONFIGS = [
DataRefresh(
DATA_REFRESH_CONFIGS = {
IMAGE: DataRefresh(
media_type="image",
data_refresh_timeout=timedelta(days=4),
refresh_metrics_timeout=timedelta(hours=24),
Expand All @@ -84,11 +84,11 @@ def __post_init__(self):
os.getenv("DATA_REFRESH_POKE_INTERVAL", 60 * 30)
),
),
DataRefresh(
AUDIO: DataRefresh(
media_type="audio",
data_refresh_poke_interval=int(
os.getenv("DATA_REFRESH_POKE_INTERVAL", 60 * 30)
),
filtered_index_poke_interval=int(os.getenv("DATA_REFRESH_POKE_INTERVAL", 60)),
),
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from datetime import timedelta

from airflow.decorators import task, task_group

from common import ingestion_server
from data_refresh.data_refresh_types import DATA_REFRESH_CONFIGS


@task
def get_target_alias(media_type: str, target_alias_override: str):
return target_alias_override or f"{media_type}-full"


@task.branch
def should_delete_index(should_delete: bool, old_index: str):
if should_delete and old_index:
# We should try to delete the old index only if the param is enabled,
# and we were able to find an index with the target_alias in the
# preceding task.
return "trigger_delete_index"
# Skip straight to notifying Slack.
return "notify_complete"


@task_group(group_id="create_index")
def create_index(media_type: str, index_suffix: str) -> None:
"""Create the new elasticsearch index on the staging cluster."""

# Get the DataRefresh config associated with this media type, in order to get
# the reindexing timeout information.
config = DATA_REFRESH_CONFIGS.get(media_type)
data_refresh_timeout = config.data_refresh_timeout if config else timedelta(days=1)

ingestion_server.trigger_and_wait_for_task(
action="REINDEX",
model=media_type,
data={"index_suffix": index_suffix},
timeout=data_refresh_timeout,
http_conn_id="staging_data_refresh",
)


@task_group(group_id="point_alias")
def point_alias(media_type: str, target_alias: str, index_suffix: str) -> None:
"""
Alias the index with the given suffix to the target_alias, first removing the
target_alias from any other indices to which it is linked.
"""
point_alias_payload = {
"alias": target_alias,
"index_suffix": index_suffix,
}

ingestion_server.trigger_and_wait_for_task(
action="POINT_ALIAS",
model=media_type,
data=point_alias_payload,
timeout=timedelta(hours=12), # matches the ingestion server's wait time
http_conn_id="staging_data_refresh",
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
"""
# Recreate Full Staging Index DAG
This DAG is used to fully recreate a new staging Elasticsearch index for a
given `media_type`, using records pulled from the staging API database. It is
used to decouple the steps of creating a new index from the rest of the
data refresh process.
Staging index creation is handled by the _staging_ ingestion server. The DAG
triggers the ingestion server `REINDEX` action to create a new index in the
staging elasticsearch cluster for the given media type, suffixed by the current
timestamp. The DAG awaits the completion of the index creation and then points
the `<media_type>-full` alias to the newly created index.
Optionally, the `target_alias_override` param can be used to override the target
alias that is pointed to the new index. When the alias is assigned, it is removed
from any other index to which it previously pointed. The `delete_old_index` param
can optionally be enabled in order to delete the index previously pointed to by
the alias, if applicable.
## When this DAG runs
This DAG is on a `None` schedule and is run manually.
## Race conditions
Because this DAG runs on the staging ingestion server and staging elasticsearch
cluster, it does _not_ interfere with the `data_refresh` or
`create_filtered_index` DAGs.
"""
from datetime import datetime

from airflow.decorators import dag
from airflow.models.param import Param
from airflow.utils.trigger_rule import TriggerRule
from elasticsearch.recreate_staging_index.recreate_full_staging_index import (
create_index,
get_target_alias,
point_alias,
should_delete_index,
)

from common import ingestion_server, slack
from common.constants import AUDIO, DAG_DEFAULT_ARGS, MEDIA_TYPES, XCOM_PULL_TEMPLATE


DAG_ID = "recreate_full_staging_index"


@dag(
dag_id=DAG_ID,
default_args=DAG_DEFAULT_ARGS,
schedule=None,
start_date=datetime(2023, 4, 1),
tags=["database"],
max_active_runs=1,
catchup=False,
doc_md=__doc__,
params={
"media_type": Param(
default=AUDIO,
enum=MEDIA_TYPES,
description="The media type for which to create the index.",
),
"target_alias_override": Param(
default=None,
type=["string", "null"],
description=(
"Optionally, override the target alias for the newly created index."
" The default to `{media_type}-full` using the given media_type."
),
),
"delete_old_index": Param(
default=False,
type="boolean",
description=(
"Whether to delete the index previously pointed to be the "
"`target_alias`, if it is replaced. The index will "
"only be deleted if the alias was successfully linked to the "
" newly created index."
),
),
},
render_template_as_native_obj=True,
)
def recreate_full_staging_index():
target_alias = get_target_alias(
media_type="{{ params.media_type }}",
target_alias_override="{{ params.target_alias_override }}",
)

# Suffix the index with a current timestamp.
new_index_suffix = ingestion_server.generate_index_suffix.override(
trigger_rule=TriggerRule.NONE_FAILED
)("full-{{ ts_nodash.lower() }}")

# Get the index currently aliased by the target_alias, in case it must be
# deleted later.
get_current_index_if_exists = ingestion_server.get_current_index(
target_alias, http_conn_id="staging_data_refresh"
)

# Create the new Elasticsearch index
do_create_index = create_index(
media_type="{{ params.media_type }}", index_suffix=new_index_suffix
)

# Actually point the alias
do_point_alias = point_alias(
media_type="{{ params.media_type }}",
target_alias=target_alias,
index_suffix=new_index_suffix,
)

check_if_should_delete_index = should_delete_index(
should_delete="{{ params.delete_old_index }}",
old_index=XCOM_PULL_TEMPLATE.format(
get_current_index_if_exists.task_id, "return_value"
),
)

# Branch only reached if check_if_should_delete_index is True.
# Deletes the old index pointed to by the target_alias after the alias has been
# unlinked.
delete_old_index = ingestion_server.trigger_task(
action="DELETE_INDEX",
model="{{ params.media_type }}",
data={
"index_suffix": XCOM_PULL_TEMPLATE.format(
get_current_index_if_exists.task_id, "return_value"
),
},
http_conn_id="staging_data_refresh",
)

notify_complete = slack.notify_slack.override(
task_id="notify_complete", trigger_rule=TriggerRule.NONE_FAILED
)(
text=(
"Finished creating full staging index `{{ params.media_type }}-"
f"{new_index_suffix}` aliased to `{target_alias}`."
),
username="Full Staging Index Creation",
dag_id=DAG_ID,
)

# Set up dependencies
target_alias >> get_current_index_if_exists >> new_index_suffix
new_index_suffix >> do_create_index >> do_point_alias
do_point_alias >> check_if_should_delete_index
check_if_should_delete_index >> [delete_old_index, notify_complete]
delete_old_index >> notify_complete


recreate_full_staging_index()
Loading

0 comments on commit e8e3064

Please sign in to comment.