Skip to content

Commit

Permalink
Review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
dbutenhof committed Oct 19, 2021
1 parent c72deff commit 7747b1c
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 73 deletions.
114 changes: 66 additions & 48 deletions lib/pbench/server/api/resources/query_apis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,33 @@
JSON,
PostprocessError,
Schema,
SchemaError,
UnauthorizedAccess,
UnsupportedAccessMode,
)
from pbench.server.database.models.template import Template
from pbench.server.database.models.datasets import Dataset, DatasetNotFound
from pbench.server.database.models.template import Template
from pbench.server.database.models.users import User

# A type defined to allow the preprocess subclass method to provide shared
# context with the assemble and postprocess methods.
CONTEXT = Dict[str, Any]


class MissingBulkSchemaParameters(SchemaError):
"""
The subclass schema is missing the required "controller" or dataset "name"
parameters required to locate a Dataset.
"""

def __init__(self, subclass_name: str):
super().__init__()
self.subclass_name = subclass_name

def __str__(self) -> str:
return f"ElasticBulkBase subclass {self.subclass_name} is missing required schema parameters"


class ElasticBase(ApiBase):
"""
A base class for Elasticsearch queries that allows subclasses to provide
Expand Down Expand Up @@ -377,59 +392,57 @@ def __init__(
"""
Base class constructor.
This method assumes and requires that a dataset will be located using
the controller and dataset name, so "controller" and "name" string-type
parameters must be defined in the subclass schema.
Args:
config: server configuration
logger: logger object
schema: API schema: for example,
Schema(
Parameter("user", ParamType.USER, required=True),
Parameter("start", ParamType.DATE)
Parameter("controller", ParamType.STRING, required=True),
Parameter("name", ParamType.STRING, required=True)
)
role: specify the API role, defaulting to READ
role: specify the API role, defaulting to UPDATE
"""
super().__init__(config, logger, schema, role=role)
host = config.get("elasticsearch", "host")
port = config.get("elasticsearch", "port")
self.es_url = f"http://{host}:{port}"
if "controller" not in schema or "name" not in schema:
raise MissingBulkSchemaParameters(self.__class__.__name__)

def generate_documents(self, json_data: JSON, dataset: Dataset) -> Iterator[dict]:
def generate_actions(self, json_data: JSON, dataset: Dataset) -> Iterator[dict]:
"""
Generate a series of Elasticsearch bulk commands to be fed to the
Generate a series of Elasticsearch bulk API actions for the
streaming_bulk helper.
This is an abstract method that must be implemented by a subclass.
Args:
json_data: JSON dictionary of type-normalized key-value pairs
controller: the controller that generated the dataset
name: name of the dataset to publish
access: The desired access level of the dataset
json_data: The original query JSON parameters based on the subclass
schema.
dataset: The associated Dataset object
"""
raise NotImplementedError()

def complete(self, dataset: Dataset, json_data: JSON, error_count: int) -> None:
"""
Complete a bulk Elasticsearch operation, generally by modifying the
Complete a bulk Elasticsearch operation, perhaps by modifying the
source Dataset resource.
This is an abstract method that must be implemented by a subclass.
This is an abstract method that may be implemented by a subclass to
perform some completion action; the default is to do nothing.
Args:
dataset: The associated Dataset object
json_data: JSON dictionary of type-normalized key-value pairs
controller: the controller that generated the dataset
name: name of the dataset to publish
access: The desired access level of the dataset
dataset: The associated Dataset object.
json_data: The original query JSON parameters based on the subclass
schema.
error_count: The number of errors encountered during the bulk
operation. We want to support idempotency: the Dataset should
not be altered unless the error_count is 0.
operation.
"""
raise NotImplementedError()
pass

def _post(self, json_data: JSON, _) -> Response:
"""
Expand Down Expand Up @@ -476,10 +489,36 @@ def _post(self, json_data: JSON, _) -> Response:
# Pass the bulk command generator to the helper
results = elasticsearch.helpers.streaming_bulk(
elastic,
self.generate_documents(json_data, dataset),
self.generate_actions(json_data, dataset),
raise_on_exception=False,
raise_on_error=False,
)

report = {}
count = 0
error_count = 0

# NOTE: because streaming_bulk is given a generator, and also
# returns a generator, we consume the entire sequence within the
# `try` block to catch failures.
for ok, response in results:
count += 1
u = response["update"]
status = "ok"
if "error" in u:
e = u["error"]
status = e["type"]
self.logger.debug(
"{} ({}: {}) for id {} in index {}",
u["status"],
status,
e["reason"],
u["_id"],
u["_index"],
)
error_count += 1
cnt = report.get(status, 0)
report[status] = cnt + 1
except Exception as e:
self.logger.exception(
"{}: exception {} occurred during the Elasticsearch request",
Expand All @@ -488,30 +527,9 @@ def _post(self, json_data: JSON, _) -> Response:
)
abort(HTTPStatus.INTERNAL_SERVER_ERROR, message="INTERNAL ERROR")

report = {}
count = 0
error_count = 0
for ok, response in results:
count += 1
u = response["update"]
status = "ok"
if "error" in u:
e = u["error"]
status = e["type"]
self.logger.debug(
"{} ({}: {}) for id {} in index {}",
u["status"],
status,
e["reason"],
u["_id"],
u["_index"],
)
error_count += 1
cnt = report.get(status, 0)
report[status] = cnt + 1

self.logger.info(
"Update access for dataset {}: {} successful document updates and {} failures",
"{}:dataset {}: {} successful document updates and {} failures",
klasname,
dataset,
count - error_count,
error_count,
Expand Down
28 changes: 9 additions & 19 deletions lib/pbench/server/api/resources/query_apis/datasets_publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@ class DatasetsPublish(ElasticBulkBase):
Change the "access" authorization of a Pbench dataset by modifying the
"authorization": {"access": value} subdocument of each Elasticsearch
document associated with the specified dataset.
Note that this may amount to hundreds of thousands of documents across a
range of Elasticsearch indices, so we use the Elasticsearch streaming_bulk
helper to break down our request into chunks that Elasticsearch can handle.
"""

def __init__(self, config: PbenchServerConfig, logger: Logger):
Expand All @@ -36,7 +32,7 @@ def __init__(self, config: PbenchServerConfig, logger: Logger):
role=API_OPERATION.UPDATE,
)

def generate_documents(self, json_data: JSON, dataset: Dataset) -> Iterator[dict]:
def generate_actions(self, json_data: JSON, dataset: Dataset) -> Iterator[dict]:
"""
Generate a series of Elasticsearch bulk update operation documents
driven by the dataset document map.
Expand All @@ -48,27 +44,21 @@ def generate_documents(self, json_data: JSON, dataset: Dataset) -> Iterator[dict
"doc": {"authorization": {"access": new_access}}
}
json_data: JSON dictionary of type-normalized key-value pairs
controller: the controller that generated the dataset
name: name of the dataset to publish
access: The desired access level of the dataset
Args:
json_data: Type-normalized client JSON input
access: The desired access level of the dataset
dataset: the Dataset object
context: A dict containing a "dataset" key with the Dataset
object, which contains the root run-data index document ID.
Returns:
A sequence of Elasticsearch bulk actions
"""
name = json_data["name"]
access = json_data["access"]
user = dataset.owner

self.logger.info(
"Update access for dataset {} for user {} to {}", name, user, access
)

map = Metadata.getvalue(dataset=dataset, key=Metadata.INDEX_MAP)
doc_count = sum(len(i) for i in map.values())

self.logger.info(
"Publish operation will update {} Elasticsearch documents in {} indices: {}",
"Publish operation for dataset {} will update {} Elasticsearch documents in {} indices: {}",
dataset,
doc_count,
len(map),
list(map.keys()),
Expand Down
14 changes: 8 additions & 6 deletions lib/pbench/test/unit/server/query_apis/test_datasets_publish.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from http import HTTPStatus
from typing import Iterator

import elasticsearch
import pytest
from http import HTTPStatus

from pbench.server.api.resources import JSON
from pbench.server.database.models.datasets import Dataset
from pbench.test.unit.server.headertypes import HeaderTypes
Expand Down Expand Up @@ -70,8 +72,8 @@ def fake_bulk(
assert cmd["_op_type"] == "update"
assert cmd["_id"] in expected_ids

# For the sake of "authenticity", return our expected results list
# as a generator
# Generate a sequence of result documents more or less as we'd
# expect to see from Elasticsearch
for item in expected_results:
yield item

Expand Down Expand Up @@ -122,8 +124,8 @@ def test_partial(
client,
get_document_map,
monkeypatch,
server_config,
pbench_token,
server_config,
):
"""
Check the publish API when some document updates fail. We expect an
Expand All @@ -146,7 +148,7 @@ def test_partial(
assert dataset.access == Dataset.PRIVATE_ACCESS

def test_no_dataset(
self, client, get_document_map, monkeypatch, server_config, pbench_token,
self, client, get_document_map, monkeypatch, pbench_token, server_config
):
"""
Check the publish API if the dataset doesn't exist.
Expand All @@ -165,7 +167,7 @@ def test_no_dataset(
assert response.json["message"] == "No dataset node|badwolf"

def test_exception(
self, attach_dataset, client, monkeypatch, server_config, pbench_token,
self, attach_dataset, client, monkeypatch, pbench_token, server_config
):
"""
Check the publish API response if the bulk helper throws an exception.
Expand Down

0 comments on commit 7747b1c

Please sign in to comment.