Skip to content

Commit

Permalink
Gracefully handle duplicate name with unique MD5 (distributed-system-…
Browse files Browse the repository at this point in the history
…analysis#3355)

* Gracefully handle duplicate name with unique MD5

PBENCH-1112

In testing we're seeing duplicate tarball names with distinct MD5 values. We
have reduced the importance of duplicate tarball names, relying on the dataset
resource ID (MD5) for "uniqueness" internally. However the cache manager still
maintains tarball files and MD5 companions on disk, and the upload API has to
temporarily store the API payload somewhere. When the duplicate tarballs are
uploaded concurrently, the `.open()` fails, leading to an internal server
error. We want to minimize the chance of reporting internal server errors where
possible.

Instead, move temporarily upload files into an MD5-based subdirectory, and
clean up that directory after upload completes.
  • Loading branch information
dbutenhof committed May 18, 2023
1 parent 8aa8e71 commit d457961
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 103 deletions.
116 changes: 72 additions & 44 deletions lib/pbench/server/api/resources/upload_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import hashlib
from http import HTTPStatus
import os
from pathlib import Path
import shutil
from typing import Any, Optional

Expand All @@ -26,7 +27,7 @@
Schema,
)
import pbench.server.auth.auth as Auth
from pbench.server.cache_manager import CacheManager, MetadataError
from pbench.server.cache_manager import CacheManager, DuplicateTarball, MetadataError
from pbench.server.database.models.audit import (
Audit,
AuditReason,
Expand Down Expand Up @@ -177,6 +178,7 @@ def _put(self, args: ApiParams, request: Request, context: ApiContext) -> Respon

attributes = {"access": access, "metadata": metadata}
filename = args.uri["filename"]
tmp_dir: Optional[Path] = None

try:
try:
Expand Down Expand Up @@ -236,9 +238,23 @@ def _put(self, args: ApiParams, request: Request, context: ApiContext) -> Respon
f"'Content-Length' {content_length} must be greater than 0",
)

tar_full_path = self.temporary / filename
md5_full_path = self.temporary / f"{filename}.md5"
dataset_name = Dataset.stem(tar_full_path)
dataset_name = Dataset.stem(filename)

# NOTE: we isolate each uploaded tarball into a private MD5-based
# subdirectory in order to retain the original tarball stem name
# for the cache manager while giving us protection against multiple
# tarballs with the same name. (A duplicate MD5 will have already
# failed, so that's not a concern.)
try:
tmp_dir = self.temporary / md5sum
tmp_dir.mkdir()
except FileExistsError:
raise CleanupTime(
HTTPStatus.CONFLICT,
"Temporary upload directory already exists",
)
tar_full_path = tmp_dir / filename
md5_full_path = tmp_dir / f"{filename}.md5"

bytes_received = 0
usage = shutil.disk_usage(tar_full_path.parent)
Expand All @@ -257,7 +273,7 @@ def _put(self, args: ApiParams, request: Request, context: ApiContext) -> Respon
try:
dataset = Dataset(
owner_id=user_id,
name=Dataset.stem(tar_full_path),
name=dataset_name,
resource_id=md5sum,
access=access,
)
Expand Down Expand Up @@ -309,60 +325,61 @@ def _put(self, args: ApiParams, request: Request, context: ApiContext) -> Respon
# error recovery.
recovery.add(tar_full_path.unlink)

with tar_full_path.open(mode="wb") as ofp:
hash_md5 = hashlib.md5()
# NOTE: We know that the MD5 is unique at this point; so even if
# two tarballs with the same name are uploaded concurrently, by
# writing into a temporary directory named for the MD5 we're
# assured that they can't conflict.
try:
with tar_full_path.open(mode="wb") as ofp:
hash_md5 = hashlib.md5()

try:
while True:
chunk = request.stream.read(self.CHUNK_SIZE)
bytes_received += len(chunk)
if len(chunk) == 0 or bytes_received > content_length:
break

ofp.write(chunk)
hash_md5.update(chunk)
except OSError as exc:
if exc.errno == errno.ENOSPC:
raise CleanupTime(
HTTPStatus.INSUFFICIENT_STORAGE,
f"Out of space on {tar_full_path.root}",
)
else:
raise CleanupTime(
HTTPStatus.INTERNAL_SERVER_ERROR,
f"Unexpected error {exc.errno} encountered during file upload",
)
except Exception:
except OSError as exc:
if exc.errno == errno.ENOSPC:
raise CleanupTime(
HTTPStatus.INTERNAL_SERVER_ERROR,
"Unexpected error encountered during file upload",
HTTPStatus.INSUFFICIENT_STORAGE,
f"Out of space on {tar_full_path.root}",
)

if bytes_received != content_length:
raise CleanupTime(
HTTPStatus.BAD_REQUEST,
f"Expected {content_length} bytes but received {bytes_received} bytes",
)
elif hash_md5.hexdigest() != md5sum:
else:
raise CleanupTime(
HTTPStatus.BAD_REQUEST,
f"MD5 checksum {hash_md5.hexdigest()} does not match expected {md5sum}",
HTTPStatus.INTERNAL_SERVER_ERROR,
f"Unexpected error {exc.errno} encountered during file upload",
)
except Exception:
raise CleanupTime(
HTTPStatus.INTERNAL_SERVER_ERROR,
"Unexpected error encountered during file upload",
)

# First write the .md5
current_app.logger.info(
"Creating MD5 file {}: {}", md5_full_path, md5sum
if bytes_received != content_length:
raise CleanupTime(
HTTPStatus.BAD_REQUEST,
f"Expected {content_length} bytes but received {bytes_received} bytes",
)
elif hash_md5.hexdigest() != md5sum:
raise CleanupTime(
HTTPStatus.BAD_REQUEST,
f"MD5 checksum {hash_md5.hexdigest()} does not match expected {md5sum}",
)

# From this point attempt to remove the MD5 file on error exit
recovery.add(md5_full_path.unlink)
try:
md5_full_path.write_text(f"{md5sum} {filename}\n")
except Exception:
raise CleanupTime(
HTTPStatus.INTERNAL_SERVER_ERROR,
f"Failed to write .md5 file '{md5_full_path}'",
)
# First write the .md5
current_app.logger.info("Creating MD5 file {}: {}", md5_full_path, md5sum)

# From this point attempt to remove the MD5 file on error exit
recovery.add(md5_full_path.unlink)
try:
md5_full_path.write_text(f"{md5sum} {filename}\n")
except Exception:
raise CleanupTime(
HTTPStatus.INTERNAL_SERVER_ERROR,
f"Failed to write .md5 file '{md5_full_path}'",
)

# Create a cache manager object
try:
Expand All @@ -375,6 +392,11 @@ def _put(self, args: ApiParams, request: Request, context: ApiContext) -> Respon
# Move the files to their final location
try:
tarball = cache_m.create(tar_full_path)
except DuplicateTarball:
raise CleanupTime(
HTTPStatus.BAD_REQUEST,
f"A tarball with the name {dataset_name!r} already exists",
)
except MetadataError as exc:
raise CleanupTime(
HTTPStatus.BAD_REQUEST,
Expand Down Expand Up @@ -500,6 +522,12 @@ def _put(self, args: ApiParams, request: Request, context: ApiContext) -> Respon
raise APIInternalError(message) from e
else:
raise APIAbort(status, message) from e
finally:
if tmp_dir:
try:
shutil.rmtree(tmp_dir)
except Exception as e:
current_app.logger.warning("Error removing {}: {}", tmp_dir, str(e))

response = jsonify(dict(message="File successfully uploaded"))
response.status_code = HTTPStatus.CREATED
Expand Down
Loading

0 comments on commit d457961

Please sign in to comment.