From 81b7860e2662f4b135919da0728eea194f15c260 Mon Sep 17 00:00:00 2001 From: Dhruv Bhanushali Date: Wed, 27 Jul 2022 23:16:32 +0400 Subject: [PATCH] Add task for deleting indices (#714) Co-authored-by: sarayourfriend <24264157+sarayourfriend@users.noreply.github.com> --- ingestion_server/ingestion_server/api.py | 54 ++++++- .../ingestion_server/es_helpers.py | 103 +++++++++++++ ingestion_server/ingestion_server/indexer.py | 140 +++++++++--------- .../ingestion_server/indexer_worker.py | 3 +- ingestion_server/ingestion_server/tasks.py | 11 +- ingestion_server/test/integration_test.py | 93 +++++++++++- 6 files changed, 332 insertions(+), 72 deletions(-) create mode 100644 ingestion_server/ingestion_server/es_helpers.py diff --git a/ingestion_server/ingestion_server/api.py b/ingestion_server/ingestion_server/api.py index 73ef9c53b..117c9fcc6 100644 --- a/ingestion_server/ingestion_server/api.py +++ b/ingestion_server/ingestion_server/api.py @@ -15,7 +15,8 @@ from ingestion_server import slack from ingestion_server.constants.media_types import MEDIA_TYPES, MediaType -from ingestion_server.indexer import TableIndexer, elasticsearch_connect +from ingestion_server.es_helpers import elasticsearch_connect, get_stat +from ingestion_server.indexer import TableIndexer from ingestion_server.state import clear_state, worker_finished from ingestion_server.tasks import TaskTracker, TaskTypes, perform_task @@ -33,6 +34,25 @@ def on_get(_, resp): resp.media = {"status": "200 OK"} +class StatResource: + @staticmethod + def on_get(_, res, name): + """ + Handles an incoming GET request. Provides information about the given index or + alias. + + :param _: the incoming request + :param res: the appropriate response + :param name: the name of the index or alias + :return: the information about the index or alias + """ + + elasticsearch = elasticsearch_connect() + stat = get_stat(elasticsearch, name) + res.status = falcon.HTTP_200 + res.media = stat._asdict() + + class BaseTaskResource: """Base class for all resource that need access to a task tracker""" @@ -61,6 +81,7 @@ def _get_base_url(req): "since_date": {"type": "string"}, "index_suffix": {"type": "string"}, "alias": {"type": "string"}, + "force_delete": {"type": "boolean"}, }, "required": ["model", "action"], "allOf": [ @@ -81,6 +102,17 @@ def _get_base_url(req): }, "then": {"required": ["index_suffix", "since_date"]}, }, + { + "if": { + "properties": {"action": {"const": TaskTypes.DELETE_INDEX.name}} + }, + "then": { + "oneOf": [ + {"required": ["alias"]}, + {"required": ["index_suffix"]}, + ] + }, + }, ], } ) @@ -107,11 +139,13 @@ def on_post(self, req, res): since_date = body.get("since_date") index_suffix = body.get("index_suffix", task_id) alias = body.get("alias") + force_delete = body.get("force_delete", False) # Shared memory progress = Value("d", 0.0) finish_time = Value("d", 0.0) active_workers = Value("i", int(False)) + is_bad_request = Value("i", 0) task = Process( target=perform_task, @@ -123,10 +157,12 @@ def on_post(self, req, res): "progress": progress, "finish_time": finish_time, "active_workers": active_workers, + "is_bad_request": is_bad_request, # Task-specific keyword arguments "since_date": since_date, "index_suffix": index_suffix, "alias": alias, + "force_delete": force_delete, }, ) task.start() @@ -140,6 +176,7 @@ def on_post(self, req, res): progress=progress, finish_time=finish_time, active_workers=active_workers, + is_bad_request=is_bad_request, ) base_url = self._get_base_url(req) @@ -163,11 +200,21 @@ def on_post(self, req, res): "task_id": task_id, "status_check": status_url, } + elif is_bad_request.value == 1: + res.status = falcon.HTTP_400 + res.media = { + "message": ( + "Failed during task execution due to bad request. " + "Check scheduler logs." + ) + } else: res.status = falcon.HTTP_500 res.media = { - "message": "Failed to schedule task due to an internal server " - "error. Check scheduler logs." + "message": ( + "Failed to schedule task due to an internal server error. " + "Check scheduler logs." + ) } def on_get(self, _, res): @@ -276,6 +323,7 @@ def create_api(log=True): task_tracker = TaskTracker() _api.add_route("/", HealthResource()) + _api.add_route("/stat/{name}", StatResource()) _api.add_route("/task", TaskResource(task_tracker)) _api.add_route("/task/{task_id}", TaskStatus(task_tracker)) _api.add_route("/worker_finished", WorkerFinishedResource(task_tracker)) diff --git a/ingestion_server/ingestion_server/es_helpers.py b/ingestion_server/ingestion_server/es_helpers.py new file mode 100644 index 000000000..d7463e926 --- /dev/null +++ b/ingestion_server/ingestion_server/es_helpers.py @@ -0,0 +1,103 @@ +import logging as log +import time +from typing import NamedTuple, Optional, Union + +from aws_requests_auth.aws_auth import AWSRequestsAuth +from decouple import config +from elasticsearch import ConnectionError as EsConnectionError +from elasticsearch import Elasticsearch, NotFoundError, RequestsHttpConnection + + +class Stat(NamedTuple): + """ + Contains information about the index or alias identified by its name. + """ + + exists: bool + is_alias: Optional[bool] + alt_names: Optional[Union[str, list[str]]] + + +def elasticsearch_connect(timeout: int = 300) -> Elasticsearch: + """ + Repeatedly try to connect to Elasticsearch until successful. + + :param timeout: the amount of time in seconds to wait for a successful connection + :return: an Elasticsearch client. + """ + + while timeout > 0: + try: + return _elasticsearch_connect() + except EsConnectionError as err: + log.exception(err) + log.error("Reconnecting to Elasticsearch in 5 seconds...") + timeout -= 5 + time.sleep(5) + continue + + +def _elasticsearch_connect() -> Elasticsearch: + """ + Connect to an Elasticsearch indices at the configured domain. This method also + handles AWS authentication using the AWS access key ID and the secret access key. + + :return: an Elasticsearch client + """ + + elasticsearch_url = config("ELASTICSEARCH_URL", default="localhost") + elasticsearch_port = config("ELASTICSEARCH_PORT", default=9200, cast=int) + + # For AWS IAM access to Elasticsearch + aws_region = config("AWS_REGION", "us-east-1") + aws_access_key_id = config("AWS_ACCESS_KEY_ID", default="") + aws_secret_access_key = config("AWS_SECRET_ACCESS_KEY", default="") + + timeout = 12 # hours + + log.info(f"Connecting to {elasticsearch_url}:{elasticsearch_port} with AWS auth") + auth = AWSRequestsAuth( + aws_access_key=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + aws_host=elasticsearch_url, + aws_region=aws_region, + aws_service="es", + ) + auth.encode = lambda x: bytes(x.encode("utf-8")) + es = Elasticsearch( + host=elasticsearch_url, + port=elasticsearch_port, + connection_class=RequestsHttpConnection, + http_auth=auth, + timeout=timeout * 3600, # seconds + ) + es.info() + return es + + +def get_stat(es: Elasticsearch, name_or_alias: str) -> Stat: + """ + Get more information about the index name or alias given to the function. For + any given input, the function offers three bits of information: + + - whether an alias or index of the name exists + - whether the name is an alias + - the index name that the alias points to/the aliases associated with the index + + :param es: the Elasticsearch connection + :param name_or_alias: the name of the index or an alias associated with it + :return: a ``Stat`` instance containing the three bits of information + """ + + try: + matches = es.indices.get(index=name_or_alias) + real_name = list(matches.keys())[0] + aliases = list(matches[real_name]["aliases"].keys()) + is_alias = real_name != name_or_alias + return Stat( + exists=True, + is_alias=is_alias, + alt_names=real_name if is_alias else aliases, + ) + except NotFoundError: + return Stat(exists=False, is_alias=None, alt_names=None) diff --git a/ingestion_server/ingestion_server/indexer.py b/ingestion_server/ingestion_server/indexer.py index 9fedee6cc..c921bf43d 100644 --- a/ingestion_server/ingestion_server/indexer.py +++ b/ingestion_server/ingestion_server/indexer.py @@ -25,10 +25,8 @@ import elasticsearch import psycopg2 import requests -from aws_requests_auth.aws_auth import AWSRequestsAuth from decouple import config -from elasticsearch import Elasticsearch, NotFoundError, RequestsHttpConnection, helpers -from elasticsearch.exceptions import ConnectionError as ESConnectionError +from elasticsearch import Elasticsearch, helpers from elasticsearch_dsl import connections from psycopg2.sql import SQL, Identifier, Literal from requests import RequestException @@ -36,20 +34,12 @@ from ingestion_server import slack from ingestion_server.distributed_reindex_scheduler import schedule_distributed_index from ingestion_server.elasticsearch_models import media_type_to_elasticsearch_model +from ingestion_server.es_helpers import get_stat from ingestion_server.es_mapping import index_settings from ingestion_server.qa import create_search_qa_index from ingestion_server.queries import get_existence_queries -# For AWS IAM access to Elasticsearch -AWS_ACCESS_KEY_ID = config("AWS_ACCESS_KEY_ID", default="") -AWS_SECRET_ACCESS_KEY = config("AWS_SECRET_ACCESS_KEY", default="") - -ELASTICSEARCH_URL = config("ELASTICSEARCH_URL", default="localhost") -ELASTICSEARCH_PORT = config("ELASTICSEARCH_PORT", default=9200, cast=int) - -AWS_REGION = config("AWS_REGION", "us-east-1") - DATABASE_HOST = config("DATABASE_HOST", default="localhost") DATABASE_PORT = config("DATABASE_PORT", default=5432, cast=int) DATABASE_USER = config("DATABASE_USER", default="deploy") @@ -67,55 +57,6 @@ "COPY_TABLES", default="image", cast=lambda var: [s.strip() for s in var.split(",")] ) -TWELVE_HOURS_SEC = 60 * 60 * 12 - - -def elasticsearch_connect(timeout: int = 300) -> Elasticsearch: - """ - Repeatedly try to connect to Elasticsearch until successful. - - :param timeout: the amount of time in seconds to wait for a successful connection - :return: an Elasticsearch client. - """ - - while timeout > 0: - try: - return _elasticsearch_connect() - except ESConnectionError as err: - log.exception(err) - log.error("Reconnecting to Elasticsearch in 5 seconds...") - timeout -= 5 - time.sleep(5) - continue - - -def _elasticsearch_connect() -> Elasticsearch: - """ - Connect to an Elasticsearch indices at the configured domain. This method also - handles AWS authentication using the AWS access key ID and the secret access key. - - :return: an Elasticsearch client - """ - - log.info(f"Connecting to {ELASTICSEARCH_URL}:{ELASTICSEARCH_PORT} with AWS auth") - auth = AWSRequestsAuth( - aws_access_key=AWS_ACCESS_KEY_ID, - aws_secret_access_key=AWS_SECRET_ACCESS_KEY, - aws_host=ELASTICSEARCH_URL, - aws_region=AWS_REGION, - aws_service="es", - ) - auth.encode = lambda x: bytes(x.encode("utf-8")) - es = Elasticsearch( - host=ELASTICSEARCH_URL, - port=ELASTICSEARCH_PORT, - connection_class=RequestsHttpConnection, - http_auth=auth, - timeout=TWELVE_HOURS_SEC, - ) - es.info() - return es - def database_connect(autocommit=False): """ @@ -176,6 +117,7 @@ def __init__( callback_url: Optional[str] = None, progress: Optional[Value] = None, active_workers: Optional[Value] = None, + is_bad_request: Optional[Value] = None, ): self.es = es_instance connections.connections.add_connection("default", self.es) @@ -184,6 +126,7 @@ def __init__( self.callback_url = callback_url self.progress = progress self.active_workers = active_workers + self.is_bad_request = is_bad_request # Helpers # ======= @@ -430,15 +373,16 @@ def point_alias(self, model_name: str, index_suffix: str, alias: str, **_): timeout="12h", ) - try: - curr_index = list(self.es.indices.get(index=alias).keys())[0] - if curr_index == alias: + alias_stat = get_stat(self.es, alias) + curr_index = alias_stat.alt_names + if alias_stat.exists: + if not alias_stat.is_alias: # Alias is an index, this is fatal. message = f"There is an index named {alias}, cannot proceed." log.error(message) slack.error(message) return - elif curr_index != dest_index: + elif alias_stat.is_alias and curr_index != dest_index: # Alias is in use, atomically remap it to the new index. self.es.indices.update_aliases( body={ @@ -459,7 +403,7 @@ def point_alias(self, model_name: str, index_suffix: str, alias: str, **_): else: # Alias is already mapped. log.info(f"Alias {alias} already points to index {dest_index}.") - except NotFoundError: + else: # Alias does not exist, create it. self.es.indices.put_alias(index=dest_index, name=alias) message = f"Created alias {alias} pointing to index {dest_index}." @@ -469,3 +413,67 @@ def point_alias(self, model_name: str, index_suffix: str, alias: str, **_): if self.progress is not None: self.progress.value = 100 # mark job as completed self.ping_callback() + + def delete_index( + self, + model_name: str, + index_suffix: Optional[str] = None, + alias: Optional[str] = None, + force_delete: bool = False, + **_, + ): + """ + Delete the given index ensuring that it is not in use. + + :param model_name: the name of the media type + :param index_suffix: the suffix of the index to delete + :param alias: the alias to delete, including the index it points to + :param force_delete: whether to delete the index even if it is in use + """ + + target = alias if alias is not None else f"{model_name}-{index_suffix}" + + target_stat = get_stat(self.es, target) + if target_stat.exists: + if target_stat.is_alias: + if not force_delete: + # Alias cannot be deleted unless forced. + if self.is_bad_request is not None: + self.is_bad_request.value = 1 + message = ( + f"Alias {target} might be in use so it cannot be deleted. " + f"Verify that the API does not use this alias and then use the " + f"`force_delete` parameter." + ) + log.error(message) + slack.error(message) + return + target = target_stat.alt_names + else: + if target_stat.alt_names: + # Index mapped to alias cannot be deleted. + if self.is_bad_request is not None: + self.is_bad_request.value = 1 + message = ( + f"Index {target} is associated with aliases " + f"{target_stat.alt_names}, cannot delete. Delete aliases first." + ) + log.error(message) + slack.error(message) + return + + self.es.indices.delete(index=target) + message = f"Index {target} was deleted." + log.info(message) + slack.info(message) + else: + # Cannot delete as target does not exist. + if self.is_bad_request is not None: + self.is_bad_request.value = 1 + message = f"Target {target} does not exist and cannot be deleted." + log.info(message) + slack.info(message) + + if self.progress is not None: + self.progress.value = 100 + self.ping_callback() diff --git a/ingestion_server/ingestion_server/indexer_worker.py b/ingestion_server/ingestion_server/indexer_worker.py index 2eeadc395..08554d3ce 100644 --- a/ingestion_server/ingestion_server/indexer_worker.py +++ b/ingestion_server/ingestion_server/indexer_worker.py @@ -17,7 +17,8 @@ from psycopg2.sql import SQL, Identifier, Literal from ingestion_server import slack -from ingestion_server.indexer import TableIndexer, elasticsearch_connect +from ingestion_server.es_helpers import elasticsearch_connect +from ingestion_server.indexer import TableIndexer from ingestion_server.queries import get_existence_queries diff --git a/ingestion_server/ingestion_server/tasks.py b/ingestion_server/ingestion_server/tasks.py index 9e5f6b210..76379bc5a 100644 --- a/ingestion_server/ingestion_server/tasks.py +++ b/ingestion_server/ingestion_server/tasks.py @@ -10,7 +10,8 @@ from ingestion_server import slack from ingestion_server.constants.media_types import MediaType -from ingestion_server.indexer import TableIndexer, elasticsearch_connect +from ingestion_server.es_helpers import elasticsearch_connect +from ingestion_server.indexer import TableIndexer from ingestion_server.ingest import promote_api_table, refresh_api_table @@ -48,6 +49,9 @@ def _generate_next_value_(name: str, *args, **kwargs) -> str: UPDATE_INDEX = auto() # TODO: delete eventually, rarely used """reindex updates to a model from the database since the given date""" + DELETE_INDEX = auto() + """delete the given index after it has been superseded by a new one""" + LOAD_TEST_DATA = auto() """create indices in ES for QA tests; this is not intended to run in production but can be run without negative consequences""" @@ -108,6 +112,7 @@ def _time_fmt(timestamp: int) -> Optional[str]: finish_time = task_info["finish_time"].value progress = task_info["progress"].value active_workers = task_info["active_workers"].value + is_bad_request = task_info["is_bad_request"].value return { "active": active, "model": task_info["model"], @@ -119,6 +124,7 @@ def _time_fmt(timestamp: int) -> Optional[str]: "finish_time": _time_fmt(finish_time), "active_workers": bool(active_workers), "error": progress < 100 and not active, + "is_bad_request": bool(is_bad_request), } def list_task_statuses(self) -> list: @@ -152,6 +158,7 @@ def perform_task( progress: Value, finish_time: Value, active_workers: Value, + is_bad_request: Value, **kwargs, ): """ @@ -166,6 +173,7 @@ def perform_task( :param progress: shared memory for tracking the task's progress :param finish_time: shared memory for tracking the finish time of the task :param active_workers: shared memory for counting workers assigned to the task + :param is_bad_request: shared memory that flags tasks that fail due to bad requests """ elasticsearch = elasticsearch_connect() @@ -175,6 +183,7 @@ def perform_task( callback_url, progress, active_workers, + is_bad_request, ) # Task functions diff --git a/ingestion_server/test/integration_test.py b/ingestion_server/test/integration_test.py index d318fe8aa..338394a56 100644 --- a/ingestion_server/test/integration_test.py +++ b/ingestion_server/test/integration_test.py @@ -18,7 +18,7 @@ # Uses Bottle because, unlike Falcon, it can be run from within the test suite. from bottle import Bottle -from elasticsearch import Elasticsearch, RequestsHttpConnection +from elasticsearch import Elasticsearch, NotFoundError, RequestsHttpConnection from .gen_integration_compose import gen_integration_compose from .test_constants import service_ports @@ -213,6 +213,10 @@ def _get_constraints(cls, conn, table) -> dict[str, str]: cursor.execute(constraint_sql) return {constraint: name for name, constraint in cursor} + def check_index_exists(self, index_name): + es = self._get_es() + assert es.indices.get(index=index_name) is not None + def _ingest_upstream(self, model, suffix="integration"): """ Check that INGEST_UPSTREAM task completes successfully and responds @@ -267,6 +271,51 @@ def _promote(self, model, suffix="integration", alias=None): es = self._get_es() assert list(es.indices.get(index=alias).keys())[0] == f"{model}-{suffix}" + def _delete_index(self, model, suffix="integration", alias=None): + req = { + "model": model, + "action": "DELETE_INDEX", + "callback_url": bottle_url, + } + if alias is None: + req |= {"index_suffix": suffix} + else: + req |= { + "alias": alias, + "force_delete": True, + } + res = requests.post(f"{ingestion_server}/task", json=req) + stat_msg = "The job should launch successfully and return 202 ACCEPTED." + self.assertEqual(res.status_code, 202, msg=stat_msg) + + # Wait for the task to send us a callback. + assert self.__class__.cb_queue.get(timeout=120) == "CALLBACK!" + + es = self._get_es() + with pytest.raises(NotFoundError): + es.indices.get(index=f"{model}-{suffix}") + + def _soft_delete_index( + self, model, alias, suffix="integration", omit_force_delete=False + ): + """ + Deleting without the ``force_delete`` flag set to ``True`` is + considered a soft-delete because it will be declined if the target is + an alias. Not providing the flag is equivalent to setting it to + ``False``. + """ + req = { + "model": model, + "action": "DELETE_INDEX", + "alias": alias, + } + if not omit_force_delete: + req |= {"force_delete": False} + res = requests.post(f"{ingestion_server}/task", json=req) + stat_msg = "The job should fail fast and return 400 BAD REQUEST." + self.assertEqual(res.status_code, 400, msg=stat_msg) + self.check_index_exists(f"{model}-{suffix}") + @classmethod def setUpClass(cls) -> None: # Launch a Bottle server to receive and handle callbacks @@ -457,3 +506,45 @@ def test_update_index_audio(self): # Wait for the task to send us a callback. assert self.__class__.cb_queue.get(timeout=120) == "CALLBACK!" + + @pytest.mark.order(12) + def test_index_deletion_succeeds(self): + self._ingest_upstream("audio", "temporary") + self._delete_index("audio", "temporary") + + @pytest.mark.order(13) + def test_alias_force_deletion_succeeds(self): + self._ingest_upstream("audio", "temporary") + self._promote("audio", "temporary", "audio-temp") + self._delete_index("audio", "temporary", "audio-temp") + + @pytest.mark.order(14) + def test_alias_soft_deletion_fails(self): + self._ingest_upstream("audio", "temporary") + self._promote("audio", "temporary", "audio-temp") + self._soft_delete_index("audio", "audio-temp", "temporary") + + @pytest.mark.order(15) + def test_alias_ambiguous_deletion_fails(self): + # No need to ingest or promote, index and alias exist + self._soft_delete_index("audio", "audio-temp", "temporary", True) + + @pytest.mark.order(16) + def test_stat_endpoint_for_index(self): + res = requests.get(f"{ingestion_server}/stat/audio-integration") + data = res.json() + assert data["exists"] + assert data["alt_names"] == ["audio-main"] + + @pytest.mark.order(17) + def test_stat_endpoint_for_alias(self): + res = requests.get(f"{ingestion_server}/stat/audio-main") + data = res.json() + assert data["exists"] + assert data["alt_names"] == "audio-integration" + + @pytest.mark.order(18) + def test_stat_endpoint_for_non_existent(self): + res = requests.get(f"{ingestion_server}/stat/non-existent") + data = res.json() + assert not data["exists"]