Skip to content
This repository has been archived by the owner on Feb 22, 2023. It is now read-only.

Commit

Permalink
Add task for deleting indices (#714)
Browse files Browse the repository at this point in the history
Co-authored-by: sarayourfriend <[email protected]>
  • Loading branch information
dhruvkb and sarayourfriend authored Jul 27, 2022
1 parent 56657fb commit 81b7860
Show file tree
Hide file tree
Showing 6 changed files with 332 additions and 72 deletions.
54 changes: 51 additions & 3 deletions ingestion_server/ingestion_server/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@

from ingestion_server import slack
from ingestion_server.constants.media_types import MEDIA_TYPES, MediaType
from ingestion_server.indexer import TableIndexer, elasticsearch_connect
from ingestion_server.es_helpers import elasticsearch_connect, get_stat
from ingestion_server.indexer import TableIndexer
from ingestion_server.state import clear_state, worker_finished
from ingestion_server.tasks import TaskTracker, TaskTypes, perform_task

Expand All @@ -33,6 +34,25 @@ def on_get(_, resp):
resp.media = {"status": "200 OK"}


class StatResource:
@staticmethod
def on_get(_, res, name):
"""
Handles an incoming GET request. Provides information about the given index or
alias.
:param _: the incoming request
:param res: the appropriate response
:param name: the name of the index or alias
:return: the information about the index or alias
"""

elasticsearch = elasticsearch_connect()
stat = get_stat(elasticsearch, name)
res.status = falcon.HTTP_200
res.media = stat._asdict()


class BaseTaskResource:
"""Base class for all resource that need access to a task tracker"""

Expand Down Expand Up @@ -61,6 +81,7 @@ def _get_base_url(req):
"since_date": {"type": "string"},
"index_suffix": {"type": "string"},
"alias": {"type": "string"},
"force_delete": {"type": "boolean"},
},
"required": ["model", "action"],
"allOf": [
Expand All @@ -81,6 +102,17 @@ def _get_base_url(req):
},
"then": {"required": ["index_suffix", "since_date"]},
},
{
"if": {
"properties": {"action": {"const": TaskTypes.DELETE_INDEX.name}}
},
"then": {
"oneOf": [
{"required": ["alias"]},
{"required": ["index_suffix"]},
]
},
},
],
}
)
Expand All @@ -107,11 +139,13 @@ def on_post(self, req, res):
since_date = body.get("since_date")
index_suffix = body.get("index_suffix", task_id)
alias = body.get("alias")
force_delete = body.get("force_delete", False)

# Shared memory
progress = Value("d", 0.0)
finish_time = Value("d", 0.0)
active_workers = Value("i", int(False))
is_bad_request = Value("i", 0)

task = Process(
target=perform_task,
Expand All @@ -123,10 +157,12 @@ def on_post(self, req, res):
"progress": progress,
"finish_time": finish_time,
"active_workers": active_workers,
"is_bad_request": is_bad_request,
# Task-specific keyword arguments
"since_date": since_date,
"index_suffix": index_suffix,
"alias": alias,
"force_delete": force_delete,
},
)
task.start()
Expand All @@ -140,6 +176,7 @@ def on_post(self, req, res):
progress=progress,
finish_time=finish_time,
active_workers=active_workers,
is_bad_request=is_bad_request,
)

base_url = self._get_base_url(req)
Expand All @@ -163,11 +200,21 @@ def on_post(self, req, res):
"task_id": task_id,
"status_check": status_url,
}
elif is_bad_request.value == 1:
res.status = falcon.HTTP_400
res.media = {
"message": (
"Failed during task execution due to bad request. "
"Check scheduler logs."
)
}
else:
res.status = falcon.HTTP_500
res.media = {
"message": "Failed to schedule task due to an internal server "
"error. Check scheduler logs."
"message": (
"Failed to schedule task due to an internal server error. "
"Check scheduler logs."
)
}

def on_get(self, _, res):
Expand Down Expand Up @@ -276,6 +323,7 @@ def create_api(log=True):
task_tracker = TaskTracker()

_api.add_route("/", HealthResource())
_api.add_route("/stat/{name}", StatResource())
_api.add_route("/task", TaskResource(task_tracker))
_api.add_route("/task/{task_id}", TaskStatus(task_tracker))
_api.add_route("/worker_finished", WorkerFinishedResource(task_tracker))
Expand Down
103 changes: 103 additions & 0 deletions ingestion_server/ingestion_server/es_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import logging as log
import time
from typing import NamedTuple, Optional, Union

from aws_requests_auth.aws_auth import AWSRequestsAuth
from decouple import config
from elasticsearch import ConnectionError as EsConnectionError
from elasticsearch import Elasticsearch, NotFoundError, RequestsHttpConnection


class Stat(NamedTuple):
"""
Contains information about the index or alias identified by its name.
"""

exists: bool
is_alias: Optional[bool]
alt_names: Optional[Union[str, list[str]]]


def elasticsearch_connect(timeout: int = 300) -> Elasticsearch:
"""
Repeatedly try to connect to Elasticsearch until successful.
:param timeout: the amount of time in seconds to wait for a successful connection
:return: an Elasticsearch client.
"""

while timeout > 0:
try:
return _elasticsearch_connect()
except EsConnectionError as err:
log.exception(err)
log.error("Reconnecting to Elasticsearch in 5 seconds...")
timeout -= 5
time.sleep(5)
continue


def _elasticsearch_connect() -> Elasticsearch:
"""
Connect to an Elasticsearch indices at the configured domain. This method also
handles AWS authentication using the AWS access key ID and the secret access key.
:return: an Elasticsearch client
"""

elasticsearch_url = config("ELASTICSEARCH_URL", default="localhost")
elasticsearch_port = config("ELASTICSEARCH_PORT", default=9200, cast=int)

# For AWS IAM access to Elasticsearch
aws_region = config("AWS_REGION", "us-east-1")
aws_access_key_id = config("AWS_ACCESS_KEY_ID", default="")
aws_secret_access_key = config("AWS_SECRET_ACCESS_KEY", default="")

timeout = 12 # hours

log.info(f"Connecting to {elasticsearch_url}:{elasticsearch_port} with AWS auth")
auth = AWSRequestsAuth(
aws_access_key=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
aws_host=elasticsearch_url,
aws_region=aws_region,
aws_service="es",
)
auth.encode = lambda x: bytes(x.encode("utf-8"))
es = Elasticsearch(
host=elasticsearch_url,
port=elasticsearch_port,
connection_class=RequestsHttpConnection,
http_auth=auth,
timeout=timeout * 3600, # seconds
)
es.info()
return es


def get_stat(es: Elasticsearch, name_or_alias: str) -> Stat:
"""
Get more information about the index name or alias given to the function. For
any given input, the function offers three bits of information:
- whether an alias or index of the name exists
- whether the name is an alias
- the index name that the alias points to/the aliases associated with the index
:param es: the Elasticsearch connection
:param name_or_alias: the name of the index or an alias associated with it
:return: a ``Stat`` instance containing the three bits of information
"""

try:
matches = es.indices.get(index=name_or_alias)
real_name = list(matches.keys())[0]
aliases = list(matches[real_name]["aliases"].keys())
is_alias = real_name != name_or_alias
return Stat(
exists=True,
is_alias=is_alias,
alt_names=real_name if is_alias else aliases,
)
except NotFoundError:
return Stat(exists=False, is_alias=None, alt_names=None)
Loading

0 comments on commit 81b7860

Please sign in to comment.