diff --git a/catalog/dags/common/elasticsearch.py b/catalog/dags/common/elasticsearch.py new file mode 100644 index 00000000000..92acc57b6fc --- /dev/null +++ b/catalog/dags/common/elasticsearch.py @@ -0,0 +1,225 @@ +import logging +from datetime import timedelta +from typing import Literal, Union + +from airflow.decorators import task, task_group +from airflow.models.connection import Connection +from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchPythonHook +from airflow.sensors.base import PokeReturnValue +from airflow.utils.trigger_rule import TriggerRule + +from common.constants import REFRESH_POKE_INTERVAL + + +logger = logging.getLogger(__name__) + + +# Index settings that should not be copied over from the base configuration when +# creating a new index. +EXCLUDED_INDEX_SETTINGS = {"provided_name", "creation_date", "uuid", "version"} + + +@task +def get_es_host(environment: str): + es_conn = Connection.get_connection_from_secrets( + f"elasticsearch_http_{environment}" + ) + return es_conn.get_uri() + + +@task +def get_index_configuration( + source_index: str, + es_host: str, +): + """ + Return the configuration for the index identified by the + `source_index` param. `source_index` may be either an index name + or an alias, but must uniquely identify one existing index or an + error will be raised. + """ + es_conn = ElasticsearchPythonHook(hosts=[es_host]).get_conn + + response = es_conn.indices.get( + index=source_index, + # Return empty dict instead of throwing error if no index can be + # found. We raise our own error instead. + ignore_unavailable=True, + ) + + if len(response) != 1: + raise ValueError(f"Index {source_index} could not be uniquely identified.") + + # The response has the form: + # { index_name: index_configuration } + # However, since `source_index` can be an alias rather than the index name, + # we do not necessarily know the index_name so we cannot access the configuration + # directly by key. We instead get the first value from the dict, knowing that we + # have already ensured in a previous check that there is exactly one value in the + # response. + config = next(iter(response.values())) + return config + + +def remove_excluded_index_settings(index_config): + """ + Remove fields from the given index configuration that should not be included when + using it to create a new index. + """ + # Remove fields from the current_index_config that should not be copied + # over into the new index (such as uuid) + for setting in EXCLUDED_INDEX_SETTINGS: + index_config.get("settings", {}).get("index", {}).pop(setting) + + # Aliases should also not by applied automatically + index_config.pop("aliases") + + return index_config + + +@task +def create_index(index_config, es_host: str): + es_conn = ElasticsearchPythonHook(hosts=[es_host]).get_conn + + new_index = es_conn.indices.create(**index_config) + + return new_index + + +@task_group(group_id="trigger_and_wait_for_reindex") +def trigger_and_wait_for_reindex( + es_host: str, + destination_index: str, + source_index: str, + timeout: timedelta, + requests_per_second: int, + query: dict | None = None, + max_docs: int | None = None, + refresh: bool = True, + slices: Union[int, Literal["auto"]] = "auto", +): + @task + def trigger_reindex( + es_host: str, + destination_index: str, + source_index: str, + query: dict, + requests_per_second: int, + max_docs: int | None, + slices: Union[int, Literal["auto"]], + ): + es_conn = ElasticsearchPythonHook(hosts=[es_host]).get_conn + source = {"index": source_index} + # An empty query is not accepted; only pass it + # if a query was actually supplied + if query: + source["query"] = query + + response = es_conn.reindex( + source=source, + dest={"index": destination_index}, + max_docs=max_docs, + # Parallelize indexing when not None + slices=slices, + # Do not hold the slot while awaiting completion + wait_for_completion=False, + # Whether to immediately refresh the index after completion to make + # the data available for search + refresh=refresh, + # Throttle + requests_per_second=requests_per_second, + ) + return response["task"] + + @task.sensor( + poke_interval=REFRESH_POKE_INTERVAL, timeout=timeout, mode="reschedule" + ) + def wait_for_reindex( + es_host: str, task_id: str, expected_docs: int | None = None + ) -> PokeReturnValue: + es_conn = ElasticsearchPythonHook(hosts=[es_host]).get_conn + + response = es_conn.tasks.get(task_id=task_id) + + count = response.get("task", {}).get("status", {}).get("total") + if expected_docs and count != expected_docs: + logger.info( + f"Reindexed {count} documents, but {expected_docs} were expected." + ) + else: + logger.info(f"Reindexed {count} documents.") + + return PokeReturnValue(is_done=response.get("completed") is True) + + trigger_reindex_task = trigger_reindex( + es_host, + destination_index, + source_index, + query, + requests_per_second, + max_docs, + slices, + ) + + wait_for_reindex_task = wait_for_reindex( + task_id=trigger_reindex_task, expected_docs=max_docs, es_host=es_host + ) + + trigger_reindex_task >> wait_for_reindex_task + + +@task +def refresh_index(es_host: str, index_name: str): + es_conn = ElasticsearchPythonHook(hosts=[es_host]).get_conn + return es_conn.indices.refresh(index=index_name) + + +@task_group(group_id="point_alias") +def point_alias(index_name: str, alias: str, es_host: str): + """ + Point the target alias to the given index. If the alias is already being + used by one or more indices, it will first be removed from all of them. + """ + + @task.branch + def check_if_alias_exists(alias: str, es_host: str): + """Check if the alias already exists.""" + es_conn = ElasticsearchPythonHook(hosts=[es_host]).get_conn + return ( + "point_alias.remove_existing_alias" + if es_conn.indices.exists_alias(name=alias) + else "point_alias.point_new_alias" + ) + + @task + def remove_existing_alias(alias: str, es_host: str): + """Remove the given alias from any indices to which it points.""" + es_conn = ElasticsearchPythonHook(hosts=[es_host]).get_conn + response = es_conn.indices.delete_alias( + name=alias, + # Remove the alias from _all_ indices to which it currently + # applies + index="_all", + ) + return response.get("acknowledged") + + @task + def point_new_alias( + es_host: str, + index_name: str, + alias: str, + ): + es_conn = ElasticsearchPythonHook(hosts=[es_host]).get_conn + response = es_conn.indices.put_alias(index=index_name, name=alias) + return response.get("acknowledged") + + exists_alias = check_if_alias_exists(alias, es_host) + remove_alias = remove_existing_alias(alias, es_host) + + point_alias = point_new_alias.override( + # The remove_alias task may be skipped. + trigger_rule=TriggerRule.NONE_FAILED, + )(es_host, index_name, alias) + + exists_alias >> [remove_alias, point_alias] + remove_alias >> point_alias diff --git a/catalog/dags/elasticsearch_cluster/create_new_es_index/create_new_es_index.py b/catalog/dags/elasticsearch_cluster/create_new_es_index/create_new_es_index.py index 1cb2b970eda..a5e13a7da47 100644 --- a/catalog/dags/elasticsearch_cluster/create_new_es_index/create_new_es_index.py +++ b/catalog/dags/elasticsearch_cluster/create_new_es_index/create_new_es_index.py @@ -1,21 +1,14 @@ import logging -from datetime import timedelta -from airflow.decorators import task, task_group -from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchPythonHook -from airflow.sensors.python import PythonSensor +from airflow.decorators import task -from common.constants import REFRESH_POKE_INTERVAL +from common.elasticsearch import remove_excluded_index_settings from elasticsearch_cluster.create_new_es_index.utils import merge_configurations logger = logging.getLogger(__name__) -# Index settings that should not be copied over from the base configuration when -# creating a new index. -EXCLUDED_INDEX_SETTINGS = ["provided_name", "creation_date", "uuid", "version"] - GET_FINAL_INDEX_CONFIG_TASK_NAME = "get_final_index_configuration" GET_CURRENT_INDEX_CONFIG_TASK_NAME = "get_current_index_configuration" @@ -35,40 +28,6 @@ def check_override_config(override): return GET_CURRENT_INDEX_CONFIG_TASK_NAME -@task -def get_current_index_configuration( - source_index: str, - es_host: str, -): - """ - Return the configuration for the current index, identified by the - `source_index` param. `source_index` may be either an index name - or an alias, but must uniquely identify one existing index or an - error will be raised. - """ - es_conn = ElasticsearchPythonHook(hosts=[es_host]).get_conn - - response = es_conn.indices.get( - index=source_index, - # Return empty dict instead of throwing error if no index can be - # found. We raise our own error instead. - ignore_unavailable=True, - ) - - if len(response) != 1: - raise ValueError(f"Index {source_index} could not be uniquely identified.") - - # The response has the form: - # { index_name: index_configuration } - # However, since `source_index` can be an alias rather than the index name, - # we do not necessarily know the index_name so we cannot access the configuration - # directly by key. We instead get the first value from the dict, knowing that we - # have already ensured in a previous check that there is exactly one value in the - # response. - config = next(iter(response.values())) - return config - - @task def merge_index_configurations(new_index_config, current_index_config): """ @@ -76,13 +35,7 @@ def merge_index_configurations(new_index_config, current_index_config): return an index configuration in the appropriate format for being passed to the `create_index` API. """ - # Do not automatically apply any aliases to the new index - current_index_config.pop("aliases") - - # Remove fields from the current_index_config that should not be copied - # over into the new index (such as uuid) - for setting in EXCLUDED_INDEX_SETTINGS: - current_index_config.get("settings", {}).get("index", {}).pop(setting) + remove_excluded_index_settings(current_index_config) # Merge the new configuration values into the current configuration return merge_configurations(current_index_config, new_index_config) @@ -115,74 +68,3 @@ def get_final_index_configuration( # Apply the desired index name config["index"] = index_name return config - - -@task -def create_index(index_config, es_host: str): - es_conn = ElasticsearchPythonHook(hosts=[es_host]).get_conn - - new_index = es_conn.indices.create(**index_config) - - return new_index - - -@task_group(group_id="trigger_and_wait_for_reindex") -def trigger_and_wait_for_reindex( - index_name: str, - source_index: str, - query: dict, - timeout: timedelta, - requests_per_second: int, - es_host: str, -): - @task - def trigger_reindex( - index_name: str, - source_index: str, - query: dict, - requests_per_second: int, - es_host: str, - ): - es_conn = ElasticsearchPythonHook(hosts=[es_host]).get_conn - - source = {"index": source_index} - # An empty query is not accepted; only pass it - # if a query was actually supplied - if query: - source["query"] = query - - response = es_conn.reindex( - source=source, - dest={"index": index_name}, - # Parallelize indexing - slices="auto", - # Do not hold the slot while awaiting completion - wait_for_completion=False, - # Immediately refresh the index after completion to make - # the data available for search - refresh=True, - # Throttle - requests_per_second=requests_per_second, - ) - - return response["task"] - - def _wait_for_reindex(task_id: str, es_host: str): - es_conn = ElasticsearchPythonHook(hosts=[es_host]).get_conn - - response = es_conn.tasks.get(task_id=task_id) - return response.get("completed") - - trigger_reindex_task = trigger_reindex( - index_name, source_index, query, requests_per_second, es_host - ) - - wait_for_reindex = PythonSensor( - task_id="wait_for_reindex", - python_callable=_wait_for_reindex, - timeout=timeout, - poke_interval=REFRESH_POKE_INTERVAL, - op_kwargs={"task_id": trigger_reindex_task, "es_host": es_host}, - ) - - trigger_reindex_task >> wait_for_reindex diff --git a/catalog/dags/elasticsearch_cluster/create_new_es_index/create_new_es_index_dag.py b/catalog/dags/elasticsearch_cluster/create_new_es_index/create_new_es_index_dag.py index 5e2e517a042..3062281ee49 100644 --- a/catalog/dags/elasticsearch_cluster/create_new_es_index/create_new_es_index_dag.py +++ b/catalog/dags/elasticsearch_cluster/create_new_es_index/create_new_es_index_dag.py @@ -102,14 +102,21 @@ from airflow.models.param import Param from airflow.utils.trigger_rule import TriggerRule +from common import elasticsearch as es from common.constants import AUDIO, DAG_DEFAULT_ARGS, MEDIA_TYPES from common.sensors.utils import prevent_concurrency_with_dags -from elasticsearch_cluster.create_new_es_index import create_new_es_index as es +from elasticsearch_cluster.create_new_es_index.create_new_es_index import ( + GET_CURRENT_INDEX_CONFIG_TASK_NAME, + GET_FINAL_INDEX_CONFIG_TASK_NAME, + check_override_config, + get_final_index_configuration, + get_index_name, + merge_index_configurations, +) from elasticsearch_cluster.create_new_es_index.create_new_es_index_types import ( CREATE_NEW_INDEX_CONFIGS, CreateNewIndex, ) -from elasticsearch_cluster.shared import get_es_host logger = logging.getLogger(__name__) @@ -189,31 +196,29 @@ def create_new_es_index_dag(config: CreateNewIndex): with dag: prevent_concurrency = prevent_concurrency_with_dags(config.blocking_dags) - es_host = get_es_host(environment=config.environment) + es_host = es.get_es_host(environment=config.environment) - index_name = es.get_index_name( + index_name = get_index_name( media_type="{{ params.media_type }}", index_suffix="{{ params.index_suffix or ts_nodash }}", ) - check_override = es.check_override_config( - override="{{ params.override_config }}" - ) + check_override = check_override_config(override="{{ params.override_config }}") - current_index_config = es.get_current_index_configuration.override( - task_id=es.GET_CURRENT_INDEX_CONFIG_TASK_NAME + current_index_config = es.get_index_configuration.override( + task_id=GET_CURRENT_INDEX_CONFIG_TASK_NAME )( source_index="{{ params.source_index or params.media_type }}", es_host=es_host, ) - merged_index_config = es.merge_index_configurations( + merged_index_config = merge_index_configurations( new_index_config="{{ params.index_config }}", current_index_config=current_index_config, ) - final_index_config = es.get_final_index_configuration.override( - task_id=es.GET_FINAL_INDEX_CONFIG_TASK_NAME, + final_index_config = get_final_index_configuration.override( + task_id=GET_FINAL_INDEX_CONFIG_TASK_NAME, trigger_rule=TriggerRule.NONE_FAILED, )( override_config="{{ params.override_config }}", @@ -228,7 +233,7 @@ def create_new_es_index_dag(config: CreateNewIndex): ) reindex = es.trigger_and_wait_for_reindex( - index_name=index_name, + destination_index=index_name, source_index="{{ params.source_index or params.media_type }}", query="{{ params.query }}", timeout=config.reindex_timeout, diff --git a/catalog/dags/elasticsearch_cluster/create_proportional_by_source_staging_index/create_proportional_by_source_staging_index.py b/catalog/dags/elasticsearch_cluster/create_proportional_by_source_staging_index/create_proportional_by_source_staging_index.py new file mode 100644 index 00000000000..14e840c27ac --- /dev/null +++ b/catalog/dags/elasticsearch_cluster/create_proportional_by_source_staging_index/create_proportional_by_source_staging_index.py @@ -0,0 +1,106 @@ +import logging + +from airflow.decorators import task +from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchPythonHook + +from common.elasticsearch import remove_excluded_index_settings + + +logger = logging.getLogger(__name__) + + +@task +def get_source_index(source_index: str, media_type: str): + """ + Get the desired source index. If a source_index was passed in + params, we use that; else we default to the filtered media index. + """ + return source_index or f"{media_type}-filtered" + + +@task +def get_destination_index_name( + media_type: str, current_datetime_str: str, percentage_of_prod: int +): + """Get the desired name for the destination index.""" + percentage_str = round(percentage_of_prod * 100) + + return ( + f"{media_type}-{percentage_str}-percent-proportional" + f"-{current_datetime_str.lower()}" + ) + + +@task +def get_destination_alias(media_type: str): + return f"{media_type}-subset-by-source" + + +@task +def get_destination_index_config(source_config: dict, destination_index_name: str): + """ + Build the index configuration for the destination index, based on the + source index configuration. + """ + destination_config = remove_excluded_index_settings(source_config) + + # Apply the desired index name + destination_config["index"] = destination_index_name + return destination_config + + +@task +def get_staging_source_counts(source_index: str, es_host: str): + """ + Get the count of records per source for the given media type in the + staging source index. + """ + es_conn = ElasticsearchPythonHook(hosts=[es_host]).get_conn + + # `size` is the max number of buckets to return for the aggregation + # query, and should be set to a number that exceeds the known + # number of sources. Read more at: + # https://github.com/elastic/elasticsearch/issues/18838 + size = 100 + + response = es_conn.search( + index=source_index, + # Return 0 records from the search query; we only care about the + # aggregations + size=0, + aggregations={ + "unique_sources": { + "terms": {"field": "source", "size": size, "order": {"_key": "desc"}} + } + }, + ) + + sources = ( + response.get("aggregations", {}).get("unique_sources", {}).get("buckets", []) + ) + return {source["key"]: source["doc_count"] for source in sources} + + +@task +def get_proportional_source_count_kwargs( + staging_source_counts: dict[str, int], percentage_of_prod: int +): + """ + Return a list of kwargs for each mapped task to reindex the + documents for each source individually. + + For each task we will have: + * `max_docs`: The count of records for this source needed in the new + index in order for the source to make up the same + proportion of the new index as it does in the + source index. + * `query`: An elasticsearch query that will be used to restrict + the reindexing task to records from this source. + """ + return [ + { + "max_docs": round(staging_total * percentage_of_prod), + "query": {"bool": {"filter": [{"term": {"source": source}}]}}, + } + for source, staging_total in staging_source_counts.items() + ] diff --git a/catalog/dags/elasticsearch_cluster/create_proportional_by_source_staging_index/create_proportional_by_source_staging_index_dag.py b/catalog/dags/elasticsearch_cluster/create_proportional_by_source_staging_index/create_proportional_by_source_staging_index_dag.py new file mode 100644 index 00000000000..28ed4a02a00 --- /dev/null +++ b/catalog/dags/elasticsearch_cluster/create_proportional_by_source_staging_index/create_proportional_by_source_staging_index_dag.py @@ -0,0 +1,191 @@ +""" +# Create Proportional By Source Staging Index DAG + +This DAG is used to create a new staging Elasticsearch index that is a subset +of a staging source index, such that the proportions of records by source in +the new index is equal to the proportions of records by source in the source +index. + + +Required Dagrun Configuration parameters: + +* media_type: The media type for which to create a new index. +* percentage_of_prod: A float indicating the proportion of items to take from each + source from the total amount existing in the staging + source index + +Optional params: + +* source_index: An existing staging Elasticsearch index to use as the basis for + the new index. If not provided, the index aliased to + `-filtered` will be used. + +## When this DAG runs + +This DAG is on a `None` schedule and is run manually. + +## Race conditions + +Because this DAG runs on the staging ingestion server and staging elasticsearch +cluster, it does _not_ interfere with the `data_refresh` or +`create_filtered_index` DAGs. + +However, as the DAG operates on the staging API database it will exit +immediately if any of the following DAGs are running: +* `staging_database_restore` +* `recreate_full_staging_index` +* `create_new_staging_es_index` +""" +from datetime import datetime, timedelta + +from airflow.decorators import dag +from airflow.models.param import Param + +from common import elasticsearch as es +from common import slack +from common.constants import ( + AUDIO, + DAG_DEFAULT_ARGS, + MEDIA_TYPES, + STAGING, +) +from common.sensors.utils import prevent_concurrency_with_dags +from database.staging_database_restore.constants import ( + DAG_ID as STAGING_DB_RESTORE_DAG_ID, +) +from elasticsearch_cluster.create_new_es_index.create_new_es_index_types import ( + CREATE_NEW_INDEX_CONFIGS, +) +from elasticsearch_cluster.create_proportional_by_source_staging_index import ( + create_proportional_by_source_staging_index as create_index, +) +from elasticsearch_cluster.recreate_staging_index.recreate_full_staging_index import ( + DAG_ID as RECREATE_STAGING_INDEX_DAG_ID, +) + + +DAG_ID = "create_proportional_by_source_staging_index" + + +@dag( + dag_id=DAG_ID, + default_args=DAG_DEFAULT_ARGS, + schedule=None, + start_date=datetime(2024, 1, 31), + tags=["database", "elasticsearch"], + max_active_runs=1, + catchup=False, + doc_md=__doc__, + params={ + "media_type": Param( + default=AUDIO, + enum=MEDIA_TYPES, + description="The media type for which to create the index.", + ), + "percentage_of_prod": Param( + default=0.5, + type="number", + exclusiveMinimum=0, + maximum=1, + description=( + "The proportion of items to take of each provider from" + " the total amount existing in the source index." + ), + ), + "source_index": Param( + default=None, + type=["string", "null"], + description=( + "Optionally, the existing staging Elasticsearch index" + " to use as the basis for the new index. If not provided," + " the index aliased to `-filtered` will be used." + ), + ), + }, + render_template_as_native_obj=True, +) +def create_proportional_by_source_staging_index(): + # Fail early if any conflicting DAGs are running + prevent_concurrency = prevent_concurrency_with_dags( + external_dag_ids=[ + STAGING_DB_RESTORE_DAG_ID, + RECREATE_STAGING_INDEX_DAG_ID, + CREATE_NEW_INDEX_CONFIGS[STAGING].dag_id, + ] + ) + + es_host = es.get_es_host(environment=STAGING) + + source_index_name = create_index.get_source_index( + source_index="{{ params.source_index }}", + media_type="{{ params.media_type }}", + ) + + source_config = es.get_index_configuration( + source_index=source_index_name, es_host=es_host + ) + + destination_index_name = create_index.get_destination_index_name( + media_type="{{ params.media_type }}", + current_datetime_str="{{ ts_nodash }}", + percentage_of_prod="{{ params.percentage_of_prod }}", + ) + + destination_alias = create_index.get_destination_alias( + media_type="{{ params.media_type }}" + ) + + destination_index_config = create_index.get_destination_index_config( + source_config=source_config, destination_index_name=destination_index_name + ) + + new_index = es.create_index(index_config=destination_index_config, es_host=es_host) + + staging_source_counts = create_index.get_staging_source_counts( + source_index=source_index_name, es_host=es_host + ) + + desired_source_counts = create_index.get_proportional_source_count_kwargs.override( + task_id="get_desired_source_counts" + )( + staging_source_counts=staging_source_counts, + percentage_of_prod="{{ params.percentage_of_prod }}", + ) + + reindex = es.trigger_and_wait_for_reindex.partial( + destination_index=destination_index_name, + source_index=source_index_name, + timeout=timedelta(hours=12), + requests_per_second="{{ var.value.get('ES_INDEX_THROTTLING_RATE', 20_000) }}", + # When slices are used to parallelize indexing, max_docs does + # not work reliably and the final proportions may be incorrect. + slices=None, + # Do not refresh the index after each partial reindex + refresh=False, + es_host=es_host, + ).expand_kwargs(desired_source_counts) + + refresh_destination_index = es.refresh_index( + index_name=destination_index_name, es_host=es_host + ) + + point_alias = es.point_alias( + index_name=destination_index_name, alias=destination_alias, es_host=es_host + ) + + notify_completion = slack.notify_slack( + text=f"Reindexing complete for {destination_index_name}.", + dag_id=DAG_ID, + username="Proportional by Source Staging Index Creation", + icon_emoji=":elasticsearch:", + ) + + # Setup additional dependencies + prevent_concurrency >> es_host + es_host >> [source_index_name, destination_index_name, destination_alias] + staging_source_counts >> desired_source_counts + new_index >> staging_source_counts + reindex >> refresh_destination_index >> point_alias >> notify_completion + + +create_proportional_by_source_staging_index() diff --git a/catalog/dags/elasticsearch_cluster/healthcheck_dag.py b/catalog/dags/elasticsearch_cluster/healthcheck_dag.py index 63132d604b1..50807968d4c 100644 --- a/catalog/dags/elasticsearch_cluster/healthcheck_dag.py +++ b/catalog/dags/elasticsearch_cluster/healthcheck_dag.py @@ -25,10 +25,10 @@ from elasticsearch import Elasticsearch from common.constants import ENVIRONMENTS, PRODUCTION, Environment +from common.elasticsearch import get_es_host from common.sensors.utils import is_concurrent_with_any from common.slack import send_alert, send_message from data_refresh.data_refresh_types import DATA_REFRESH_CONFIGS -from elasticsearch_cluster.shared import get_es_host logger = logging.getLogger(__name__) diff --git a/catalog/dags/elasticsearch_cluster/recreate_staging_index/recreate_full_staging_index_dag.py b/catalog/dags/elasticsearch_cluster/recreate_staging_index/recreate_full_staging_index_dag.py index 3b4cb8eb5a0..b91e93edb90 100644 --- a/catalog/dags/elasticsearch_cluster/recreate_staging_index/recreate_full_staging_index_dag.py +++ b/catalog/dags/elasticsearch_cluster/recreate_staging_index/recreate_full_staging_index_dag.py @@ -2,7 +2,8 @@ # Recreate Full Staging Index DAG This DAG is used to fully recreate a new staging Elasticsearch index for a -given `media_type`, using records pulled from the staging API database. It is +given `media_type`, using records pulled from the staging API database rather than +from a source index (like the `create_new_staging_es_index` DAG does). It is used to decouple the steps of creating a new index from the rest of the data refresh process. @@ -33,8 +34,11 @@ cluster, it does _not_ interfere with the `data_refresh` or `create_filtered_index` DAGs. -However, the DAG will exit immediately if the `staging_database_restore` DAG is -running, as it operates on the staging API database. +However, as the DAG operates on the staging API database it will exit +immediately if any of the following DAGs are running: +* `staging_database_restore` +* `create_proportional_by_provider_staging_index` +* `create_new_staging_es_index` """ from datetime import datetime diff --git a/catalog/dags/elasticsearch_cluster/shared.py b/catalog/dags/elasticsearch_cluster/shared.py deleted file mode 100644 index 02fcb315cc6..00000000000 --- a/catalog/dags/elasticsearch_cluster/shared.py +++ /dev/null @@ -1,11 +0,0 @@ -from airflow.decorators import task -from airflow.models.connection import Connection -from airflow.models.xcom_arg import XComArg - -from common.constants import Environment - - -@task -def get_es_host(environment: Environment) -> XComArg: - conn = Connection.get_connection_from_secrets(f"elasticsearch_http_{environment}") - return conn.get_uri() diff --git a/catalog/tests/dags/elasticsearch_cluster/create_new_es_index/test_create_proportional_by_source_staging_index_dag.py b/catalog/tests/dags/elasticsearch_cluster/create_new_es_index/test_create_proportional_by_source_staging_index_dag.py new file mode 100644 index 00000000000..63f5188c0bf --- /dev/null +++ b/catalog/tests/dags/elasticsearch_cluster/create_new_es_index/test_create_proportional_by_source_staging_index_dag.py @@ -0,0 +1,95 @@ +import pytest + +from elasticsearch_cluster.create_proportional_by_source_staging_index.create_proportional_by_source_staging_index import ( + get_proportional_source_count_kwargs, +) + + +@pytest.mark.parametrize( + "staging_source_counts, percentage_of_prod, expected_results", + [ + ( + {"jamendo": 10_000, "freesound": 20_000, "wikimedia_audio": 10_000}, + 0.25, + [ + { + "max_docs": 2_500, + "query": {"bool": {"filter": [{"term": {"source": "jamendo"}}]}}, + }, + { + "max_docs": 5_000, + "query": {"bool": {"filter": [{"term": {"source": "freesound"}}]}}, + }, + { + "max_docs": 2_500, + "query": { + "bool": {"filter": [{"term": {"source": "wikimedia_audio"}}]} + }, + }, + ], + ), + ( + { + "jamendo": 10_000, + "freesound": 20_000, + }, + 0.0, + [ + { + "max_docs": 0, + "query": {"bool": {"filter": [{"term": {"source": "jamendo"}}]}}, + }, + { + "max_docs": 0, + "query": {"bool": {"filter": [{"term": {"source": "freesound"}}]}}, + }, + ], + ), + ( + { + "jamendo": 982, + "freesound": 423, + }, + 1.0, + [ + { + "max_docs": 982, + "query": {"bool": {"filter": [{"term": {"source": "jamendo"}}]}}, + }, + { + "max_docs": 423, + "query": {"bool": {"filter": [{"term": {"source": "freesound"}}]}}, + }, + ], + ), + # Proportions do not divide evenly into the estimated new index total. + ( + # All sources are exactly 1/3 of the index + {"flickr": 3_333, "stocksnap": 3_333, "smk": 3_333}, + 0.5, + [ + { + # Note that each source gets 1_666 records (because it is + # rounded), for a total of 4_998 records in the new index. + "max_docs": 1_666, + "query": {"bool": {"filter": [{"term": {"source": "flickr"}}]}}, + }, + { + "max_docs": 1_666, + "query": {"bool": {"filter": [{"term": {"source": "stocksnap"}}]}}, + }, + { + "max_docs": 1_666, + "query": {"bool": {"filter": [{"term": {"source": "smk"}}]}}, + }, + ], + ), + ], +) +def test_get_proportional_source_count_kwargs( + staging_source_counts, percentage_of_prod, expected_results +): + actual_results = get_proportional_source_count_kwargs.function( + staging_source_counts, percentage_of_prod + ) + assert actual_results == expected_results diff --git a/documentation/catalog/reference/DAGs.md b/documentation/catalog/reference/DAGs.md index 94055eafe25..15a6875e0a8 100644 --- a/documentation/catalog/reference/DAGs.md +++ b/documentation/catalog/reference/DAGs.md @@ -42,15 +42,16 @@ The following are DAGs grouped by their primary tag: ### Database -| DAG ID | Schedule Interval | -| --------------------------------------------------------------------------------- | ----------------- | -| [`batched_update`](#batched_update) | `None` | -| [`delete_records`](#delete_records) | `None` | -| [`recreate_audio_popularity_calculation`](#recreate_audio_popularity_calculation) | `None` | -| [`recreate_full_staging_index`](#recreate_full_staging_index) | `None` | -| [`recreate_image_popularity_calculation`](#recreate_image_popularity_calculation) | `None` | -| [`report_pending_reported_media`](#report_pending_reported_media) | `@weekly` | -| [`staging_database_restore`](#staging_database_restore) | `@monthly` | +| DAG ID | Schedule Interval | +| --------------------------------------------------------------------------------------------- | ----------------- | +| [`batched_update`](#batched_update) | `None` | +| [`create_proportional_by_source_staging_index`](#create_proportional_by_source_staging_index) | `None` | +| [`delete_records`](#delete_records) | `None` | +| [`recreate_audio_popularity_calculation`](#recreate_audio_popularity_calculation) | `None` | +| [`recreate_full_staging_index`](#recreate_full_staging_index) | `None` | +| [`recreate_image_popularity_calculation`](#recreate_image_popularity_calculation) | `None` | +| [`report_pending_reported_media`](#report_pending_reported_media) | `@weekly` | +| [`staging_database_restore`](#staging_database_restore) | `@monthly` | ### Elasticsearch @@ -144,6 +145,7 @@ The following is documentation associated with each DAG (where available): 1. [`create_filtered_image_index`](#create_filtered_image_index) 1. [`create_new_production_es_index`](#create_new_production_es_index) 1. [`create_new_staging_es_index`](#create_new_staging_es_index) +1. [`create_proportional_by_source_staging_index`](#create_proportional_by_source_staging_index) 1. [`delete_records`](#delete_records) 1. [`europeana_workflow`](#europeana_workflow) 1. [`finnish_museums_workflow`](#finnish_museums_workflow) @@ -696,6 +698,43 @@ The resulting, merged configuration will be: } ``` +### `create_proportional_by_source_staging_index` + +#### Create Proportional By Source Staging Index DAG + +This DAG is used to create a new staging Elasticsearch index that is a subset of +a staging source index, such that the proportions of records by source in the +new index is equal to the proportions of records by source in the source index. + +Required Dagrun Configuration parameters: + +- media_type: The media type for which to create a new index. +- percentage_of_prod: A float indicating the proportion of items to take from + each source from the total amount existing in the staging source index + +Optional params: + +- source_index: An existing staging Elasticsearch index to use as the basis for + the new index. If not provided, the index aliased to `-filtered` + will be used. + +##### When this DAG runs + +This DAG is on a `None` schedule and is run manually. + +##### Race conditions + +Because this DAG runs on the staging ingestion server and staging elasticsearch +cluster, it does _not_ interfere with the `data_refresh` or +`create_filtered_index` DAGs. + +However, as the DAG operates on the staging API database it will exit +immediately if any of the following DAGs are running: + +- `staging_database_restore` +- `recreate_full_staging_index` +- `create_new_staging_es_index` + ### `delete_records` #### Delete Records DAG @@ -1086,9 +1125,10 @@ code is deployed for the calculation. #### Recreate Full Staging Index DAG This DAG is used to fully recreate a new staging Elasticsearch index for a given -`media_type`, using records pulled from the staging API database. It is used to -decouple the steps of creating a new index from the rest of the data refresh -process. +`media_type`, using records pulled from the staging API database rather than +from a source index (like the `create_new_staging_es_index` DAG does). It is +used to decouple the steps of creating a new index from the rest of the data +refresh process. Staging index creation is handled by the _staging_ ingestion server. The DAG triggers the ingestion server `REINDEX` action to create a new index in the @@ -1117,8 +1157,12 @@ Because this DAG runs on the staging ingestion server and staging elasticsearch cluster, it does _not_ interfere with the `data_refresh` or `create_filtered_index` DAGs. -However, the DAG will exit immediately if the `staging_database_restore` DAG is -running, as it operates on the staging API database. +However, as the DAG operates on the staging API database it will exit +immediately if any of the following DAGs are running: + +- `staging_database_restore` +- `create_proportional_by_provider_staging_index` +- `create_new_staging_es_index` ### `recreate_image_popularity_calculation`