Skip to content

Commit

Permalink
Review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
dbutenhof committed May 26, 2023
1 parent aa702b2 commit 4199007
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 145 deletions.
109 changes: 41 additions & 68 deletions lib/pbench/server/api/resources/intake_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,30 @@ class Access:


class IntakeBase(ApiBase):
"""Framework to assimilate a dataset into the Pbench Server"""
"""Framework to assimilate a dataset into the Pbench Server.
This relies on subclasses to provides specific hook methods to identify and
stream the tarball data:
_identify: decodes the URI and query parameters to determine the target
dataset name, the appropriate MD5, the initial access type, and
optional metadata to be set.
_stream: decodes the intake data and provides the length and byte IO
stream to be read into a temporary file.
"""

CHUNK_SIZE = 65536

def __init__(self, config: PbenchServerConfig, schema: ApiSchema):
super().__init__(config, schema)
self.temporary = config.ARCHIVE / CacheManager.TEMPORARY
self.temporary.mkdir(mode=0o755, parents=True, exist_ok=True)
method = list(self.schemas.schemas.keys())[0]
self.name = self.schemas[method].audit_name
current_app.logger.info("INTAKE temporary directory is {}", self.temporary)

def process_metadata(self, metas: list[str]) -> JSONOBJECT:
@staticmethod
def process_metadata(metas: list[str]) -> JSONOBJECT:
"""Process 'metadata' query parameter
We allow the client to set metadata on the new dataset. We won't do
Expand Down Expand Up @@ -113,8 +126,8 @@ def process_metadata(self, metas: list[str]) -> JSONOBJECT:
)
return metadata

def _prepare(self, args: ApiParams, request: Request) -> Intake:
"""Prepare to begin the intake operation
def _identify(self, args: ApiParams, request: Request) -> Intake:
"""Identify the tarball to be streamed.
Must be implemented by each subclass of this base class.
Expand All @@ -127,13 +140,13 @@ def _prepare(self, args: ApiParams, request: Request) -> Intake:
"""
raise NotImplementedError()

def _access(self, intake: Intake, request: Request) -> Access:
def _stream(self, intake: Intake, request: Request) -> Access:
"""Determine how to access the tarball byte stream
Must be implemented by each subclass of this base class.
Args:
intake: The Intake parameters produced by _intake
intake: The Intake parameters produced by _identify
request: The Flask request object
Returns:
Expand All @@ -154,15 +167,6 @@ def _intake(
1) PUT /api/v1/upload/<filename>
2) POST /api/v1/relay/<uri>
The operational differences are encapsulated by two helper methods
provided by the subclasses:
_prepare: decodes the URI and query parameters to determine the target
dataset name, the appropriate MD5, the initial access type, and
optional metadata to be set.
_access: decodes the intake data and provides the length and byte IO
stream to be read into a temporary file.
If the new dataset is created successfully, return 201 (CREATED).
The tarball name must be unique on the Pbench Server. If the name
Expand Down Expand Up @@ -206,12 +210,7 @@ def _intake(

# Ask our helper to determine the name and resource ID of the new
# dataset, along with requested access and metadata.
try:
intake = self._prepare(args, request)
except APIAbort:
raise
except Exception as e:
raise APIInternalError(str(e)) from e
intake = self._identify(args, request)

filename = intake.name
metadata = self.process_metadata(intake.metadata)
Expand Down Expand Up @@ -249,14 +248,15 @@ def _intake(
bytes_received = 0
usage = shutil.disk_usage(tar_full_path.parent)
current_app.logger.info(
"{} UPLOAD (pre): {:.3}% full, {} remaining",
"{} {} (pre): {:.3}% full, {} remaining",
self.name,
tar_full_path.name,
float(usage.used) / float(usage.total) * 100.0,
humanize.naturalsize(usage.free),
)

current_app.logger.info(
"PUT uploading {} for {} to {}", filename, username, tar_full_path
"{} {} for {} to {}", self.name, filename, username, tar_full_path
)

# Create a tracking dataset object; it'll begin in UPLOADING state
Expand All @@ -278,31 +278,24 @@ def _intake(
try:
Dataset.query(resource_id=intake.md5)
except DatasetNotFound:
current_app.logger.error(
"Duplicate dataset {} for user = (user_id: {}, username: {}) not found",
dataset_name,
user_id,
username,
raise APIInternalError(
f"Duplicate dataset {dataset_name} for user {username!r} id {user_id} not found"
)
raise APIAbort(HTTPStatus.INTERNAL_SERVER_ERROR, "INTERNAL ERROR")
else:
response = jsonify(dict(message="Dataset already exists"))
response.status_code = HTTPStatus.OK
return response
except APIAbort:
raise # Propagate an APIAbort exception to the outer block
except Exception:
raise APIAbort(
HTTPStatus.INTERNAL_SERVER_ERROR,
message="Unable to create dataset",
)
raise APIInternalError("Unable to create dataset")

recovery.add(dataset.delete)

# AUDIT the operation start before we get any further
audit = Audit.create(
operation=OperationCode.CREATE,
name="upload",
name=self.name,
user_id=user_id,
user_name=username,
dataset=dataset,
Expand All @@ -312,12 +305,7 @@ def _intake(

# Now we're ready to pull the tarball, so ask our helper for the
# length and data stream.
try:
access = self._access(intake, request)
except APIAbort:
raise
except Exception as e:
raise APIInternalError(str(e)) from e
access = self._stream(intake, request)

if access.length <= 0:
raise APIAbort(
Expand Down Expand Up @@ -373,28 +361,20 @@ def _intake(
f"MD5 checksum {hash_md5.hexdigest()} does not match expected {intake.md5}",
)

# First write the .md5
current_app.logger.info(
"Creating MD5 file {}: {}", md5_full_path, intake.md5
)

# From this point attempt to remove the MD5 file on error exit
recovery.add(md5_full_path.unlink)
try:
md5_full_path.write_text(f"{intake.md5} {filename}\n")
except Exception:
raise APIAbort(
HTTPStatus.INTERNAL_SERVER_ERROR,
raise APIInternalError(
f"Failed to write .md5 file '{md5_full_path}'",
)

# Create a cache manager object
try:
cache_m = CacheManager(self.config, current_app.logger)
except Exception:
raise APIAbort(
HTTPStatus.INTERNAL_SERVER_ERROR, "Unable to map the cache manager"
)
raise APIInternalError("Unable to map the cache manager")

# Move the files to their final location
try:
Expand All @@ -410,14 +390,14 @@ def _intake(
f"Tarball {dataset.name!r} is invalid or missing required metadata.log: {exc}",
)
except Exception as exc:
raise APIAbort(
HTTPStatus.INTERNAL_SERVER_ERROR,
f"Unable to create dataset in file system for {tar_full_path}: {exc}",
raise APIInternalError(
f"Unable to create dataset in file system for {tar_full_path}: {exc}"
)

usage = shutil.disk_usage(tar_full_path.parent)
current_app.logger.info(
"{} UPLOAD (post): {:.3}% full, {} remaining",
"{} {} (post): {:.3}% full, {} remaining",
self.name,
tar_full_path.name,
float(usage.used) / float(usage.total) * 100.0,
humanize.naturalsize(usage.free),
Expand All @@ -441,18 +421,14 @@ def _intake(
attributes["missing_metadata"] = True
Metadata.create(dataset=dataset, key=Metadata.METALOG, value=metalog)
except Exception as exc:
raise APIAbort(
HTTPStatus.INTERNAL_SERVER_ERROR,
f"Unable tintakeo create metalog for Tarball {dataset.name!r}: {exc}",
raise APIInternalError(
f"Unable to create metalog for Tarball {dataset.name!r}: {exc}"
)

try:
retention_days = self.config.default_retention_period
except Exception as e:
raise APIAbort(
HTTPStatus.INTERNAL_SERVER_ERROR,
f"Unable to get integer retention days: {e!s}",
)
raise APIInternalError(f"Unable to get integer retention days: {e!s}")

# Calculate a default deletion time for the dataset, based on the
# time it was uploaded rather than the time it was originally
Expand All @@ -474,9 +450,7 @@ def _intake(
if f:
attributes["failures"] = f
except Exception as e:
raise APIAbort(
HTTPStatus.INTERNAL_SERVER_ERROR, f"Unable to set metadata: {e!s}"
)
raise APIInternalError(f"Unable to set metadata: {e!s}")

# Finally, update the operational state and Audit success.
try:
Expand All @@ -490,9 +464,8 @@ def _intake(
root=audit, status=AuditStatus.SUCCESS, attributes=attributes
)
except Exception as exc:
raise APIAbort(
HTTPStatus.INTERNAL_SERVER_ERROR,
f"Unable to finalize dataset {dataset}: {exc!s}",
raise APIInternalError(
f"Unable to finalize dataset {dataset}: {exc!s}"
) from exc
except Exception as e:
if isinstance(e, APIAbort):
Expand All @@ -518,7 +491,7 @@ def _intake(
attributes={"message": audit_msg},
)
recovery.cleanup()
raise exception
raise exception from e
finally:
if tmp_dir:
try:
Expand Down
38 changes: 22 additions & 16 deletions lib/pbench/server/api/resources/relay.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@


class Relay(IntakeBase):
"""Download a dataset from a relay server"""

CHUNK_SIZE = 65536
"""Retrieve a dataset from a relay server"""

def __init__(self, config: PbenchServerConfig):
super().__init__(
Expand All @@ -43,18 +41,18 @@ def __init__(self, config: PbenchServerConfig):
),
),
audit_type=AuditType.NONE,
audit_name="upload",
audit_name="relay",
authorization=ApiAuthorizationType.NONE,
),
)

def _prepare(self, args: ApiParams, request: Request) -> Intake:
"""Prepare to begin the intake operation
def _identify(self, args: ApiParams, request: Request) -> Intake:
"""Identify the tarball to be streamed.
We get the Relay configuration file location from the "uri" API
We get the Relay inventory file location from the "uri" API
parameter.
The Relay configuration file is an application/json file which
The Relay inventory file is an application/json file which
contains the following required fields:
uri: The Relay URI of the tarball file
Expand All @@ -68,17 +66,22 @@ def _prepare(self, args: ApiParams, request: Request) -> Intake:
Args:
args: API parameters
URI parameters
uri: The full Relay configuration file URI
URI parameters: the Relay inventory file URI
request: The original Request object containing query parameters
Returns:
An Intake object capturing the critical information
Raises:
APIAbort on failure
"""
uri = args.uri["uri"]
response = requests.get(uri, headers={"accept": "application/json"})
response = requests.get(uri, headers={"Accept": "application/json"})
if not response.ok:
raise APIAbort(HTTPStatus.BAD_GATEWAY, "Relay URI does not respond")
raise APIAbort(
HTTPStatus.BAD_GATEWAY,
f"Relay manifest URI problem: {response.reason!r}",
)

try:
information = response.json()
Expand All @@ -99,30 +102,33 @@ def _prepare(self, args: ApiParams, request: Request) -> Intake:

return Intake(name, md5, access, metadata, uri)

def _access(self, intake: Intake, request: Request) -> Access:
def _stream(self, intake: Intake, request: Request) -> Access:
"""Determine how to access the tarball byte stream
Using the _intake information captured in the Intake instance, perform
a follow-up GET operation to the URI provided by the Relay config file,
returning the length header and the IO stream.
Args:
intake: The Intake parameters produced by _intake
intake: The Intake parameters produced by _identify
request: The Flask request object
Returns:
An Access object with the data byte stream and length
Raises:
APIAbort on failure
"""
response: requests.Response = requests.get(
url=intake.uri, stream=True, headers={"accept": "application/octet-stream"}
url=intake.uri, stream=True, headers={"Accept": "application/octet-stream"}
)
if not response.ok:
raise APIAbort(
response.status_code,
f"Unable to retrieve relay tarball: {response.reason!r}",
)
try:
length = int(response.headers["content-length"])
length = int(response.headers["Content-length"])
return Access(length, response.raw)
except Exception as e:
raise APIAbort(
Expand Down
Loading

0 comments on commit 4199007

Please sign in to comment.