Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gracefully handle duplicate name with unique MD5 #3355

Merged
merged 3 commits into from
Mar 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 72 additions & 44 deletions lib/pbench/server/api/resources/upload_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import hashlib
from http import HTTPStatus
import os
from pathlib import Path
import shutil
from typing import Any, Optional

Expand All @@ -26,7 +27,7 @@
Schema,
)
import pbench.server.auth.auth as Auth
from pbench.server.cache_manager import CacheManager, MetadataError
from pbench.server.cache_manager import CacheManager, DuplicateTarball, MetadataError
from pbench.server.database.models.audit import (
Audit,
AuditReason,
Expand Down Expand Up @@ -177,6 +178,7 @@ def _put(self, args: ApiParams, request: Request, context: ApiContext) -> Respon

attributes = {"access": access, "metadata": metadata}
filename = args.uri["filename"]
tmp_dir: Optional[Path] = None

try:
try:
Expand Down Expand Up @@ -236,9 +238,23 @@ def _put(self, args: ApiParams, request: Request, context: ApiContext) -> Respon
f"'Content-Length' {content_length} must be greater than 0",
)

tar_full_path = self.temporary / filename
md5_full_path = self.temporary / f"{filename}.md5"
dataset_name = Dataset.stem(tar_full_path)
dataset_name = Dataset.stem(filename)

# NOTE: we isolate each uploaded tarball into a private MD5-based
# subdirectory in order to retain the original tarball stem name
# for the cache manager while giving us protection against multiple
# tarballs with the same name. (A duplicate MD5 will have already
# failed, so that's not a concern.)
try:
tmp_dir = self.temporary / md5sum
tmp_dir.mkdir()
except FileExistsError:
raise CleanupTime(
HTTPStatus.CONFLICT,
"Temporary upload directory already exists",
)
tar_full_path = tmp_dir / filename
md5_full_path = tmp_dir / f"{filename}.md5"

bytes_received = 0
usage = shutil.disk_usage(tar_full_path.parent)
Expand All @@ -257,7 +273,7 @@ def _put(self, args: ApiParams, request: Request, context: ApiContext) -> Respon
try:
dataset = Dataset(
owner_id=user_id,
name=Dataset.stem(tar_full_path),
name=dataset_name,
resource_id=md5sum,
access=access,
)
Expand Down Expand Up @@ -309,60 +325,61 @@ def _put(self, args: ApiParams, request: Request, context: ApiContext) -> Respon
# error recovery.
recovery.add(tar_full_path.unlink)

with tar_full_path.open(mode="wb") as ofp:
hash_md5 = hashlib.md5()
# NOTE: We know that the MD5 is unique at this point; so even if
# two tarballs with the same name are uploaded concurrently, by
# writing into a temporary directory named for the MD5 we're
# assured that they can't conflict.
try:
with tar_full_path.open(mode="wb") as ofp:
hash_md5 = hashlib.md5()

try:
while True:
chunk = request.stream.read(self.CHUNK_SIZE)
bytes_received += len(chunk)
if len(chunk) == 0 or bytes_received > content_length:
break

ofp.write(chunk)
hash_md5.update(chunk)
except OSError as exc:
if exc.errno == errno.ENOSPC:
raise CleanupTime(
HTTPStatus.INSUFFICIENT_STORAGE,
f"Out of space on {tar_full_path.root}",
)
else:
raise CleanupTime(
HTTPStatus.INTERNAL_SERVER_ERROR,
f"Unexpected error {exc.errno} encountered during file upload",
)
except Exception:
except OSError as exc:
if exc.errno == errno.ENOSPC:
raise CleanupTime(
HTTPStatus.INTERNAL_SERVER_ERROR,
"Unexpected error encountered during file upload",
HTTPStatus.INSUFFICIENT_STORAGE,
f"Out of space on {tar_full_path.root}",
)

if bytes_received != content_length:
raise CleanupTime(
HTTPStatus.BAD_REQUEST,
f"Expected {content_length} bytes but received {bytes_received} bytes",
)
elif hash_md5.hexdigest() != md5sum:
else:
raise CleanupTime(
HTTPStatus.BAD_REQUEST,
f"MD5 checksum {hash_md5.hexdigest()} does not match expected {md5sum}",
HTTPStatus.INTERNAL_SERVER_ERROR,
f"Unexpected error {exc.errno} encountered during file upload",
)
except Exception:
raise CleanupTime(
HTTPStatus.INTERNAL_SERVER_ERROR,
"Unexpected error encountered during file upload",
)

# First write the .md5
current_app.logger.info(
"Creating MD5 file {}: {}", md5_full_path, md5sum
if bytes_received != content_length:
raise CleanupTime(
HTTPStatus.BAD_REQUEST,
f"Expected {content_length} bytes but received {bytes_received} bytes",
)
elif hash_md5.hexdigest() != md5sum:
raise CleanupTime(
HTTPStatus.BAD_REQUEST,
f"MD5 checksum {hash_md5.hexdigest()} does not match expected {md5sum}",
)

# From this point attempt to remove the MD5 file on error exit
recovery.add(md5_full_path.unlink)
try:
md5_full_path.write_text(f"{md5sum} {filename}\n")
except Exception:
raise CleanupTime(
HTTPStatus.INTERNAL_SERVER_ERROR,
f"Failed to write .md5 file '{md5_full_path}'",
)
# First write the .md5
current_app.logger.info("Creating MD5 file {}: {}", md5_full_path, md5sum)

# From this point attempt to remove the MD5 file on error exit
recovery.add(md5_full_path.unlink)
try:
md5_full_path.write_text(f"{md5sum} {filename}\n")
except Exception:
raise CleanupTime(
HTTPStatus.INTERNAL_SERVER_ERROR,
f"Failed to write .md5 file '{md5_full_path}'",
)

# Create a cache manager object
try:
Expand All @@ -375,6 +392,11 @@ def _put(self, args: ApiParams, request: Request, context: ApiContext) -> Respon
# Move the files to their final location
try:
tarball = cache_m.create(tar_full_path)
except DuplicateTarball:
raise CleanupTime(
HTTPStatus.BAD_REQUEST,
f"A tarball with the name {dataset_name!r} already exists",
)
except MetadataError as exc:
raise CleanupTime(
HTTPStatus.BAD_REQUEST,
Expand Down Expand Up @@ -500,6 +522,12 @@ def _put(self, args: ApiParams, request: Request, context: ApiContext) -> Respon
raise APIInternalError(message) from e
else:
raise APIAbort(status, message) from e
finally:
if tmp_dir:
try:
shutil.rmtree(tmp_dir)
except Exception as e:
current_app.logger.warning("Error removing {}: {}", tmp_dir, str(e))

response = jsonify(dict(message="File successfully uploaded"))
response.status_code = HTTPStatus.CREATED
Expand Down
Loading