Skip to content

Commit

Permalink
More cleanup
Browse files Browse the repository at this point in the history
Reduced the bulk failure logging to a "report" document showing
counts for each error reason in each index, which might be a bit
more useful than just error counts but is much less bulky than the
details of each error. The return value to client is a new "summary"
document with just ok/failure counts.
  • Loading branch information
dbutenhof committed Oct 19, 2021
1 parent 7747b1c commit 2ab5f6c
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 56 deletions.
116 changes: 75 additions & 41 deletions lib/pbench/server/api/resources/query_apis/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from collections import Counter, defaultdict
from datetime import datetime
from http import HTTPStatus
from logging import Logger
Expand Down Expand Up @@ -42,7 +43,7 @@ def __init__(self, subclass_name: str):
self.subclass_name = subclass_name

def __str__(self) -> str:
return f"ElasticBulkBase subclass {self.subclass_name} is missing required schema parameters"
return f"API {self.subclass_name} is missing schema parameters controller and/or name"


class ElasticBase(ApiBase):
Expand Down Expand Up @@ -402,32 +403,45 @@ def __init__(
schema: API schema: for example,
Schema(
Parameter("controller", ParamType.STRING, required=True),
Parameter("name", ParamType.STRING, required=True)
Parameter("name", ParamType.STRING, required=True),
...
)
role: specify the API role, defaulting to UPDATE
"""
super().__init__(config, logger, schema, role=role)
host = config.get("elasticsearch", "host")
port = config.get("elasticsearch", "port")
self.es_url = f"http://{host}:{port}"
self.node = {
"host": config.get("elasticsearch", "host"),
"port": config.get("elasticsearch", "port"),
}

if "controller" not in schema or "name" not in schema:
raise MissingBulkSchemaParameters(self.__class__.__name__)

def generate_actions(self, json_data: JSON, dataset: Dataset) -> Iterator[dict]:
"""
Generate a series of Elasticsearch bulk API actions for the
streaming_bulk helper.
Generate a series of Elasticsearch bulk operation actions driven by the
dataset document map. For example:
{
"_op_type": "update",
"_index": index_name,
"_id": document_id,
"doc": {"authorization": {"access": new_access}}
}
This is an abstract method that must be implemented by a subclass.
Args:
json_data: The original query JSON parameters based on the subclass
schema.
dataset: The associated Dataset object
Returns:
Sequence of Elasticsearch bulk action dict objects
"""
raise NotImplementedError()

def complete(self, dataset: Dataset, json_data: JSON, error_count: int) -> None:
def complete(self, dataset: Dataset, json_data: JSON, summary: JSON) -> None:
"""
Complete a bulk Elasticsearch operation, perhaps by modifying the
source Dataset resource.
Expand All @@ -439,8 +453,9 @@ def complete(self, dataset: Dataset, json_data: JSON, error_count: int) -> None:
dataset: The associated Dataset object.
json_data: The original query JSON parameters based on the subclass
schema.
error_count: The number of errors encountered during the bulk
operation.
summary: The summary document of the operation:
ok Count of successful actions
failure Count of failing actions
"""
pass

Expand Down Expand Up @@ -483,7 +498,31 @@ def _post(self, json_data: JSON, _) -> Response:
abort(HTTPStatus.FORBIDDEN, message=str(e))

# Build an Elasticsearch instance to manage the bulk update
elastic = elasticsearch.Elasticsearch([self.es_url])
elastic = elasticsearch.Elasticsearch(self.node)

# Internally report a summary of successes and Elasticsearch failure
# reasons: this will look something like
#
# {
# "ok": {
# "index1": 1,
# "index2": 500,
# ...
# },
# "elasticsearch failure reason 1": {
# "index2": 5,
# "index5": 10
# ...
# },
# "elasticsearch failure reason 2": {
# "index3": 2,
# "index4": 15
# ...
# }
# }
report = defaultdict(Counter)
count = 0
error_count = 0

try:
# Pass the bulk command generator to the helper
Expand All @@ -494,10 +533,6 @@ def _post(self, json_data: JSON, _) -> Response:
raise_on_error=False,
)

report = {}
count = 0
error_count = 0

# NOTE: because streaming_bulk is given a generator, and also
# returns a generator, we consume the entire sequence within the
# `try` block to catch failures.
Expand All @@ -507,48 +542,47 @@ def _post(self, json_data: JSON, _) -> Response:
status = "ok"
if "error" in u:
e = u["error"]
status = e["type"]
self.logger.debug(
"{} ({}: {}) for id {} in index {}",
u["status"],
status,
e["reason"],
u["_id"],
u["_index"],
)
status = e["reason"]
error_count += 1
cnt = report.get(status, 0)
report[status] = cnt + 1
report[status][u["_index"]] += 1
except Exception as e:
self.logger.exception(
"{}: exception {} occurred during the Elasticsearch request",
"{}: exception {} occurred during the Elasticsearch request: report {}",
klasname,
type(e).__name__,
report,
)
abort(HTTPStatus.INTERNAL_SERVER_ERROR, message="INTERNAL ERROR")

self.logger.info(
"{}:dataset {}: {} successful document updates and {} failures",
klasname,
dataset,
count - error_count,
error_count,
)
summary = {"ok": count - error_count, "failure": error_count}

# Let the subclass complete the operation
self.complete(dataset, json_data, error_count)
self.complete(dataset, json_data, summary)

# Return the report document as the success response, or abort with an
# Return the summary document as the success response, or abort with an
# internal error if we weren't 100% successful. Some elasticsearch
# documents may have been affected, but the client will be able to try
# again. TODO: switching to `pyesbulk` will automatically handle
# retrying on non-terminal errors, but this requires some cleanup
# work on the pyesbulk side.
# again.
#
# TODO: switching to `pyesbulk` will automatically handle retrying on
# non-terminal errors, but this requires some cleanup work on the
# pyesbulk side.
if error_count > 0:
self.logger.error(
"{}:dataset {}: {} successful document updates and {} failures: {}",
klasname,
dataset,
count - error_count,
error_count,
report,
)
abort(
HTTPStatus.INTERNAL_SERVER_ERROR,
message=f"{error_count:d} of {count:d} Elasticsearch document UPDATE operations failed",
data=report,
data=summary,
)

return report
self.logger.info(
"{}:dataset {}: {} successful document updates", klasname, dataset, count
)
return summary
15 changes: 4 additions & 11 deletions lib/pbench/server/api/resources/query_apis/datasets_publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,8 @@ def __init__(self, config: PbenchServerConfig, logger: Logger):

def generate_actions(self, json_data: JSON, dataset: Dataset) -> Iterator[dict]:
"""
Generate a series of Elasticsearch bulk update operation documents
driven by the dataset document map.
{
"_op_type": "update",
"_index": index_name,
"_id": document_id,
"doc": {"authorization": {"access": new_access}}
}
Generate a series of Elasticsearch bulk update actions driven by the
dataset document map.
Args:
json_data: Type-normalized client JSON input
Expand Down Expand Up @@ -76,9 +69,9 @@ def generate_actions(self, json_data: JSON, dataset: Dataset) -> Iterator[dict]:
"doc": {"authorization": {"access": access}},
}

def complete(self, dataset: Dataset, json_data: JSON, error_count: int) -> None:
def complete(self, dataset: Dataset, json_data: JSON, summary: JSON) -> None:
# Only on total success we update the Dataset's registered access
# column; a "partial success" will remain in the previous state.
if error_count == 0:
if summary["failure"] == 0:
dataset.access = json_data["access"]
dataset.update()
32 changes: 28 additions & 4 deletions lib/pbench/test/unit/server/query_apis/test_datasets_publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ class TestDatasetsPublish:

def fake_elastic(self, monkeypatch, map: JSON, partial_fail: bool):
"""
Helper function to mock the Elasticsearch helper streaming_bulk API,
which will validate the input actions and generate expected responses.
Pytest helper to install a mock for the Elasticsearch streaming_bulk
helper API for testing.
Args:
monkeypatch: The monkeypatch fixture from the test case
Expand Down Expand Up @@ -67,6 +67,20 @@ def fake_bulk(
raise_on_error: bool = True,
raise_on_exception: bool = True,
):
"""
Helper function to mock the Elasticsearch helper streaming_bulk API,
which will validate the input actions and generate expected responses.
Args:
elastic: An Elasticsearch object
stream: The input stream of bulk action dicts
raise_on_error: indicates whether errors should be raised
raise_on_exception: indicates whether exceptions should propagate
or be trapped
Yields:
Response documents from the mocked streaming_bulk helper
"""
# Consume and validate the command generator
for cmd in stream:
assert cmd["_op_type"] == "update"
Expand Down Expand Up @@ -114,13 +128,14 @@ def test_query(
)
assert response.status_code == expected_status
if expected_status == HTTPStatus.OK:
assert response.json == {"ok": 31}
assert response.json == {"ok": 31, "failure": 0}
dataset = Dataset.attach(controller="node", name="drb")
assert dataset.access == Dataset.PUBLIC_ACCESS

def test_partial(
self,
attach_dataset,
caplog,
client,
get_document_map,
monkeypatch,
Expand All @@ -141,7 +156,16 @@ def test_partial(

# Verify the report and status
assert response.status_code == HTTPStatus.INTERNAL_SERVER_ERROR
assert response.json["data"] == {"ok": 28, "KIDDING": 3}
assert response.json["data"] == {"ok": 28, "failure": 3}
for record in caplog.records:
if (
record.levelname == "ERROR"
and record.name == "pbench.server.api:__init__.py"
):
assert (
record.message
== "DatasetsPublish:dataset drb(1)|node|drb: 28 successful document updates and 3 failures: defaultdict(<class 'collections.Counter'>, {'Just kidding': Counter({'unit-test.v6.run-data.2021-06': 1, 'unit-test.v6.run-toc.2021-06': 1, 'unit-test.v5.result-data-sample.2021-06': 1}), 'ok': Counter({'unit-test.v5.result-data-sample.2021-06': 19, 'unit-test.v6.run-toc.2021-06': 9})})"
)

# Verify that the Dataset access didn't change
dataset = Dataset.attach(controller="node", name="drb")
Expand Down

0 comments on commit 2ab5f6c

Please sign in to comment.