Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement POST /api/v1/relay #3425

Merged
merged 5 commits into from
May 31, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 37 additions & 36 deletions lib/pbench/server/api/resources/intake_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,12 @@ def _intake(
authorized_user = Auth.token_auth.current_user()
user_id = authorized_user.id
username = authorized_user.username
except Exception:
except Exception as e:
username = None
user_id = None
raise APIAbort(HTTPStatus.UNAUTHORIZED, "Verifying user_id failed")
raise APIAbort(
HTTPStatus.UNAUTHORIZED, "Verifying user_id failed"
) from e

# Ask our helper to determine the name and resource ID of the new
# dataset, along with requested access and metadata.
Expand Down Expand Up @@ -237,11 +239,11 @@ def _intake(
try:
tmp_dir = self.temporary / intake.md5
tmp_dir.mkdir()
except FileExistsError:
except FileExistsError as e:
raise APIAbort(
HTTPStatus.CONFLICT,
"Temporary upload directory already exists",
)
) from e
tar_full_path = tmp_dir / filename
md5_full_path = tmp_dir / f"{filename}.md5"

Expand Down Expand Up @@ -277,18 +279,18 @@ def _intake(
)
try:
Dataset.query(resource_id=intake.md5)
except DatasetNotFound:
except DatasetNotFound as e:
raise APIInternalError(
f"Duplicate dataset {dataset_name} for user {username!r} id {user_id} not found"
)
f"Duplicate dataset {intake.md5!r} ({dataset_name!r} is missing"
webbnh marked this conversation as resolved.
Show resolved Hide resolved
) from e
else:
response = jsonify(dict(message="Dataset already exists"))
response.status_code = HTTPStatus.OK
return response
except APIAbort:
raise # Propagate an APIAbort exception to the outer block
except Exception:
raise APIInternalError("Unable to create dataset")
except Exception as e:
raise APIInternalError("Unable to create dataset") from e

recovery.add(dataset.delete)

Expand All @@ -305,12 +307,12 @@ def _intake(

# Now we're ready to pull the tarball, so ask our helper for the
# length and data stream.
access = self._stream(intake, request)
stream = self._stream(intake, request)

if access.length <= 0:
if stream.length <= 0:
raise APIAbort(
HTTPStatus.BAD_REQUEST,
f"'Content-Length' {access.length} must be greater than 0",
f"'Content-Length' {stream.length} must be greater than 0",
)

# An exception from this point on MAY leave an uploaded tar file
Expand All @@ -327,33 +329,30 @@ def _intake(
hash_md5 = hashlib.md5()

while True:
chunk = access.stream.read(self.CHUNK_SIZE)
chunk = stream.stream.read(self.CHUNK_SIZE)
bytes_received += len(chunk)
if len(chunk) == 0 or bytes_received > access.length:
if len(chunk) == 0 or bytes_received > stream.length:
break
ofp.write(chunk)
hash_md5.update(chunk)
except OSError as exc:
# NOTE: Werkzeug doesn't support status 509, so the abort call
# in _dispatch will fail. Rather than figure out how to fix
# that, just report as an internal error.
if exc.errno == errno.ENOSPC:
# NOTE: Werkzeug doesn't support status 509, so the abort
# call in _dispatch will fail. Rather than figure out how
# to fix that, just report as an internal error.
raise APIInternalError(
f"Out of space on {tar_full_path.root}"
) from exc
msg = f"Out of space on {tar_full_path.root}"
else:
raise APIInternalError(
f"Unexpected error {exc.errno} encountered during file upload"
) from exc
msg = f"Unexpected error {exc.errno} encountered during file upload"
raise APIInternalError(msg) from exc
except Exception as e:
raise APIInternalError(
"Unexpected error encountered during file upload"
) from e

if bytes_received != access.length:
if bytes_received != stream.length:
raise APIAbort(
HTTPStatus.BAD_REQUEST,
f"Expected {access.length} bytes but received {bytes_received} bytes",
f"Expected {stream.length} bytes but received {bytes_received} bytes",
)
elif hash_md5.hexdigest() != intake.md5:
raise APIAbort(
Expand All @@ -365,34 +364,34 @@ def _intake(
recovery.add(md5_full_path.unlink)
try:
md5_full_path.write_text(f"{intake.md5} {filename}\n")
except Exception:
except Exception as e:
raise APIInternalError(
f"Failed to write .md5 file '{md5_full_path}'",
)
) from e

# Create a cache manager object
try:
cache_m = CacheManager(self.config, current_app.logger)
except Exception:
raise APIInternalError("Unable to map the cache manager")
except Exception as e:
raise APIInternalError("Unable to map the cache manager") from e

# Move the files to their final location
try:
tarball = cache_m.create(tar_full_path)
except DuplicateTarball:
except DuplicateTarball as exc:
raise APIAbort(
HTTPStatus.BAD_REQUEST,
f"A tarball with the name {dataset_name!r} already exists",
)
) from exc
except MetadataError as exc:
raise APIAbort(
HTTPStatus.BAD_REQUEST,
f"Tarball {dataset.name!r} is invalid or missing required metadata.log: {exc}",
)
) from exc
except Exception as exc:
raise APIInternalError(
f"Unable to create dataset in file system for {tar_full_path}: {exc}"
)
) from exc

usage = shutil.disk_usage(tar_full_path.parent)
current_app.logger.info(
Expand Down Expand Up @@ -423,12 +422,14 @@ def _intake(
except Exception as exc:
raise APIInternalError(
f"Unable to create metalog for Tarball {dataset.name!r}: {exc}"
)
) from exc

try:
retention_days = self.config.default_retention_period
except Exception as e:
raise APIInternalError(f"Unable to get integer retention days: {e!s}")
raise APIInternalError(
f"Unable to get integer retention days: {e!s}"
) from e

# Calculate a default deletion time for the dataset, based on the
# time it was uploaded rather than the time it was originally
Expand All @@ -450,7 +451,7 @@ def _intake(
if f:
attributes["failures"] = f
except Exception as e:
raise APIInternalError(f"Unable to set metadata: {e!s}")
raise APIInternalError(f"Unable to set metadata: {e!s}") from e

# Finally, update the operational state and Audit success.
try:
Expand Down
16 changes: 9 additions & 7 deletions lib/pbench/server/api/resources/relay.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ def __init__(self, config: PbenchServerConfig):
def _identify(self, args: ApiParams, request: Request) -> Intake:
"""Identify the tarball to be streamed.

We get the Relay inventory file location from the "uri" API
We get the Relay manifest file location from the "uri" API
parameter.

The Relay inventory file is an application/json file which
contains the following required fields:
The Relay manifest is an application/json file which contains the
following required fields:

uri: The Relay URI of the tarball file
md5: The tarball's MD5 hash (Pbench Server resource ID)
Expand All @@ -62,11 +62,11 @@ def _identify(self, args: ApiParams, request: Request) -> Intake:
metadata: An optional list of "key:value" metadata strings

This information will be captured in an Intake instance for use by the
base class and by the _access method.
base class and by the _stream method.

Args:
args: API parameters
URI parameters: the Relay inventory file URI
URI parameters: the Relay manifest URI
request: The original Request object containing query parameters

Returns:
Expand All @@ -88,7 +88,7 @@ def _identify(self, args: ApiParams, request: Request) -> Intake:
except Exception as e:
raise APIAbort(
HTTPStatus.BAD_GATEWAY,
f"Relay URI did not return a JSON document: {str(e)!r}",
f"Relay URI did not return a JSON manifest: {str(e)!r}",
) from e

try:
Expand All @@ -98,7 +98,9 @@ def _identify(self, args: ApiParams, request: Request) -> Intake:
access = information.get("access", Dataset.PRIVATE_ACCESS)
metadata = information.get("metadata", [])
except KeyError as e:
raise APIAbort(HTTPStatus.BAD_GATEWAY, f"Relay info missing {str(e)!r}")
raise APIAbort(
HTTPStatus.BAD_GATEWAY, f"Relay info missing {str(e)!r}"
) from e

return Intake(name, md5, access, metadata, uri)

Expand Down
14 changes: 7 additions & 7 deletions lib/pbench/server/api/resources/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,17 +76,17 @@ def _identify(self, args: ApiParams, request: Request) -> Intake:

try:
md5sum = request.headers["Content-MD5"]
except KeyError:
except KeyError as e:
raise APIAbort(
HTTPStatus.BAD_REQUEST, "Missing required 'Content-MD5' header"
)
) from e
if not md5sum:
raise APIAbort(
HTTPStatus.BAD_REQUEST,
"Missing required 'Content-MD5' header value",
)

return Intake(filename, md5sum, access, metadata, None)
return Intake(filename, md5sum, access, metadata, uri=None)

def _stream(self, intake: Intake, request: Request) -> Access:
"""Determine how to access the tarball byte stream
Expand All @@ -108,23 +108,23 @@ def _stream(self, intake: Intake, request: Request) -> Access:
try:
length_string = request.headers["Content-Length"]
content_length = int(length_string)
except KeyError:
except KeyError as e:
# NOTE: Werkzeug is "smart" about header access, and knows that
# Content-Length is an integer. Therefore, a non-integer value
# will raise KeyError. It's virtually impossible to report the
# actual incorrect value as we'd just get a KeyError again.
raise APIAbort(
HTTPStatus.LENGTH_REQUIRED,
"Missing or invalid 'Content-Length' header",
)
except ValueError:
) from e
except ValueError as e:
# NOTE: Because of the way Werkzeug works, this should not be
# possible: if Content-Length isn't an integer, we'll see the
# KeyError. This however serves as a clarifying backup case.
raise APIAbort(
HTTPStatus.BAD_REQUEST,
f"Invalid 'Content-Length' header, not an integer ({length_string})",
)
) from e
return Access(content_length, request.stream)

def _put(self, args: ApiParams, request: Request, context: ApiContext) -> Response:
Expand Down
2 changes: 1 addition & 1 deletion lib/pbench/test/unit/server/test_relay.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ def test_relay_not_json(self, client, server_config, pbench_drb_token):
assert response.status_code == HTTPStatus.BAD_GATEWAY
assert (
response.json["message"]
== "Relay URI did not return a JSON document: 'Expecting value: line 1 column 1 (char 0)'"
== "Relay URI did not return a JSON manifest: 'Expecting value: line 1 column 1 (char 0)'"
)

@responses.activate
Expand Down