From d4579610e5a4d80e63a2faf6e3086459703e20df Mon Sep 17 00:00:00 2001 From: David Butenhof Date: Wed, 22 Mar 2023 07:46:29 -0400 Subject: [PATCH] Gracefully handle duplicate name with unique MD5 (#3355) * Gracefully handle duplicate name with unique MD5 PBENCH-1112 In testing we're seeing duplicate tarball names with distinct MD5 values. We have reduced the importance of duplicate tarball names, relying on the dataset resource ID (MD5) for "uniqueness" internally. However the cache manager still maintains tarball files and MD5 companions on disk, and the upload API has to temporarily store the API payload somewhere. When the duplicate tarballs are uploaded concurrently, the `.open()` fails, leading to an internal server error. We want to minimize the chance of reporting internal server errors where possible. Instead, move temporarily upload files into an MD5-based subdirectory, and clean up that directory after upload completes. --- lib/pbench/server/api/resources/upload_api.py | 116 +++++++++------- lib/pbench/test/unit/server/test_upload.py | 124 +++++++++--------- 2 files changed, 137 insertions(+), 103 deletions(-) diff --git a/lib/pbench/server/api/resources/upload_api.py b/lib/pbench/server/api/resources/upload_api.py index 5418005c69..59d3eaef7b 100644 --- a/lib/pbench/server/api/resources/upload_api.py +++ b/lib/pbench/server/api/resources/upload_api.py @@ -3,6 +3,7 @@ import hashlib from http import HTTPStatus import os +from pathlib import Path import shutil from typing import Any, Optional @@ -26,7 +27,7 @@ Schema, ) import pbench.server.auth.auth as Auth -from pbench.server.cache_manager import CacheManager, MetadataError +from pbench.server.cache_manager import CacheManager, DuplicateTarball, MetadataError from pbench.server.database.models.audit import ( Audit, AuditReason, @@ -177,6 +178,7 @@ def _put(self, args: ApiParams, request: Request, context: ApiContext) -> Respon attributes = {"access": access, "metadata": metadata} filename = args.uri["filename"] + tmp_dir: Optional[Path] = None try: try: @@ -236,9 +238,23 @@ def _put(self, args: ApiParams, request: Request, context: ApiContext) -> Respon f"'Content-Length' {content_length} must be greater than 0", ) - tar_full_path = self.temporary / filename - md5_full_path = self.temporary / f"{filename}.md5" - dataset_name = Dataset.stem(tar_full_path) + dataset_name = Dataset.stem(filename) + + # NOTE: we isolate each uploaded tarball into a private MD5-based + # subdirectory in order to retain the original tarball stem name + # for the cache manager while giving us protection against multiple + # tarballs with the same name. (A duplicate MD5 will have already + # failed, so that's not a concern.) + try: + tmp_dir = self.temporary / md5sum + tmp_dir.mkdir() + except FileExistsError: + raise CleanupTime( + HTTPStatus.CONFLICT, + "Temporary upload directory already exists", + ) + tar_full_path = tmp_dir / filename + md5_full_path = tmp_dir / f"{filename}.md5" bytes_received = 0 usage = shutil.disk_usage(tar_full_path.parent) @@ -257,7 +273,7 @@ def _put(self, args: ApiParams, request: Request, context: ApiContext) -> Respon try: dataset = Dataset( owner_id=user_id, - name=Dataset.stem(tar_full_path), + name=dataset_name, resource_id=md5sum, access=access, ) @@ -309,60 +325,61 @@ def _put(self, args: ApiParams, request: Request, context: ApiContext) -> Respon # error recovery. recovery.add(tar_full_path.unlink) - with tar_full_path.open(mode="wb") as ofp: - hash_md5 = hashlib.md5() + # NOTE: We know that the MD5 is unique at this point; so even if + # two tarballs with the same name are uploaded concurrently, by + # writing into a temporary directory named for the MD5 we're + # assured that they can't conflict. + try: + with tar_full_path.open(mode="wb") as ofp: + hash_md5 = hashlib.md5() - try: while True: chunk = request.stream.read(self.CHUNK_SIZE) bytes_received += len(chunk) if len(chunk) == 0 or bytes_received > content_length: break - ofp.write(chunk) hash_md5.update(chunk) - except OSError as exc: - if exc.errno == errno.ENOSPC: - raise CleanupTime( - HTTPStatus.INSUFFICIENT_STORAGE, - f"Out of space on {tar_full_path.root}", - ) - else: - raise CleanupTime( - HTTPStatus.INTERNAL_SERVER_ERROR, - f"Unexpected error {exc.errno} encountered during file upload", - ) - except Exception: + except OSError as exc: + if exc.errno == errno.ENOSPC: raise CleanupTime( - HTTPStatus.INTERNAL_SERVER_ERROR, - "Unexpected error encountered during file upload", + HTTPStatus.INSUFFICIENT_STORAGE, + f"Out of space on {tar_full_path.root}", ) - - if bytes_received != content_length: - raise CleanupTime( - HTTPStatus.BAD_REQUEST, - f"Expected {content_length} bytes but received {bytes_received} bytes", - ) - elif hash_md5.hexdigest() != md5sum: + else: raise CleanupTime( - HTTPStatus.BAD_REQUEST, - f"MD5 checksum {hash_md5.hexdigest()} does not match expected {md5sum}", + HTTPStatus.INTERNAL_SERVER_ERROR, + f"Unexpected error {exc.errno} encountered during file upload", ) + except Exception: + raise CleanupTime( + HTTPStatus.INTERNAL_SERVER_ERROR, + "Unexpected error encountered during file upload", + ) - # First write the .md5 - current_app.logger.info( - "Creating MD5 file {}: {}", md5_full_path, md5sum + if bytes_received != content_length: + raise CleanupTime( + HTTPStatus.BAD_REQUEST, + f"Expected {content_length} bytes but received {bytes_received} bytes", + ) + elif hash_md5.hexdigest() != md5sum: + raise CleanupTime( + HTTPStatus.BAD_REQUEST, + f"MD5 checksum {hash_md5.hexdigest()} does not match expected {md5sum}", ) - # From this point attempt to remove the MD5 file on error exit - recovery.add(md5_full_path.unlink) - try: - md5_full_path.write_text(f"{md5sum} {filename}\n") - except Exception: - raise CleanupTime( - HTTPStatus.INTERNAL_SERVER_ERROR, - f"Failed to write .md5 file '{md5_full_path}'", - ) + # First write the .md5 + current_app.logger.info("Creating MD5 file {}: {}", md5_full_path, md5sum) + + # From this point attempt to remove the MD5 file on error exit + recovery.add(md5_full_path.unlink) + try: + md5_full_path.write_text(f"{md5sum} {filename}\n") + except Exception: + raise CleanupTime( + HTTPStatus.INTERNAL_SERVER_ERROR, + f"Failed to write .md5 file '{md5_full_path}'", + ) # Create a cache manager object try: @@ -375,6 +392,11 @@ def _put(self, args: ApiParams, request: Request, context: ApiContext) -> Respon # Move the files to their final location try: tarball = cache_m.create(tar_full_path) + except DuplicateTarball: + raise CleanupTime( + HTTPStatus.BAD_REQUEST, + f"A tarball with the name {dataset_name!r} already exists", + ) except MetadataError as exc: raise CleanupTime( HTTPStatus.BAD_REQUEST, @@ -500,6 +522,12 @@ def _put(self, args: ApiParams, request: Request, context: ApiContext) -> Respon raise APIInternalError(message) from e else: raise APIAbort(status, message) from e + finally: + if tmp_dir: + try: + shutil.rmtree(tmp_dir) + except Exception as e: + current_app.logger.warning("Error removing {}: {}", tmp_dir, str(e)) response = jsonify(dict(message="File successfully uploaded")) response.status_code = HTTPStatus.CREATED diff --git a/lib/pbench/test/unit/server/test_upload.py b/lib/pbench/test/unit/server/test_upload.py index b696d5d3f4..ebecee1c23 100644 --- a/lib/pbench/test/unit/server/test_upload.py +++ b/lib/pbench/test/unit/server/test_upload.py @@ -1,14 +1,13 @@ from http import HTTPStatus from logging import Logger from pathlib import Path -import socket from typing import Any from freezegun import freeze_time import pytest from pbench.server import OperationCode, PbenchServerConfig -from pbench.server.cache_manager import CacheManager +from pbench.server.cache_manager import CacheManager, DuplicateTarball from pbench.server.database.models.audit import ( Audit, AuditReason, @@ -26,17 +25,11 @@ class TestUpload: cachemanager_created = None - cachemanager_create_fail = False + cachemanager_create_fail = None cachemanager_create_path = None tarball_deleted = None create_metadata = True - @pytest.fixture - def setup_ctrl(self): - self.controller = socket.gethostname() - yield - self.controller = None - @staticmethod def gen_uri(server_config, filename="f.tar.xz"): return f"{server_config.rest_uri}/upload/{filename}" @@ -75,7 +68,7 @@ def create(self, path: Path) -> FakeTarball: controller = "ctrl" TestUpload.cachemanager_create_path = path if TestUpload.cachemanager_create_fail: - raise Exception() + raise TestUpload.cachemanager_create_fail self.controllers.append(controller) tarball = FakeTarball(path) if TestUpload.create_metadata: @@ -84,7 +77,7 @@ def create(self, path: Path) -> FakeTarball: return tarball TestUpload.cachemanager_created = None - TestUpload.cachemanager_create_fail = False + TestUpload.cachemanager_create_fail = None TestUpload.cachemanager_create_path = None TestUpload.tarball_deleted = None monkeypatch.setattr(CacheManager, "__init__", FakeCacheManager.__init__) @@ -110,7 +103,7 @@ def test_malformed_authorization_header( assert not self.cachemanager_created def test_missing_md5sum_header_upload( - self, client, caplog, server_config, setup_ctrl, pbench_drb_token + self, client, caplog, server_config, pbench_drb_token ): expected_message = "Missing required 'Content-MD5' header" response = client.put( @@ -125,14 +118,13 @@ def test_missing_md5sum_header_upload( assert not self.cachemanager_created def test_missing_md5sum_header_value( - self, client, caplog, server_config, setup_ctrl, pbench_drb_token + self, client, caplog, server_config, pbench_drb_token ): expected_message = "Missing required 'Content-MD5' header value" response = client.put( self.gen_uri(server_config), headers={ "Authorization": "Bearer " + pbench_drb_token, - "controller": self.controller, "Content-MD5": "", }, ) @@ -142,7 +134,7 @@ def test_missing_md5sum_header_value( assert not self.cachemanager_created def test_missing_filename_extension( - self, client, caplog, server_config, setup_ctrl, pbench_drb_token + self, client, caplog, server_config, pbench_drb_token ): """Test with URL uploading a file named "f" which is missing the required filename extension""" @@ -159,7 +151,7 @@ def test_missing_filename_extension( assert not self.cachemanager_created def test_missing_length_header_upload( - self, client, caplog, server_config, setup_ctrl, pbench_drb_token + self, client, caplog, server_config, pbench_drb_token ): expected_message = "Missing or invalid 'Content-Length' header" response = client.put( @@ -175,7 +167,7 @@ def test_missing_length_header_upload( assert not self.cachemanager_created def test_bad_length_header_upload( - self, client, caplog, server_config, setup_ctrl, pbench_drb_token + self, client, caplog, server_config, pbench_drb_token ): expected_message = "Missing or invalid 'Content-Length' header" response = client.put( @@ -191,15 +183,12 @@ def test_bad_length_header_upload( self.verify_logs(caplog) assert not self.cachemanager_created - def test_bad_metadata_upload( - self, client, caplog, server_config, setup_ctrl, pbench_drb_token - ): + def test_bad_metadata_upload(self, client, server_config, pbench_drb_token): with freeze_time("1970-01-01 00:42:00"): response = client.put( self.gen_uri(server_config), headers={ "Authorization": "Bearer " + pbench_drb_token, - "controller": self.controller, "Content-MD5": "ANYMD5", "Content-Length": "STRING", }, @@ -218,7 +207,7 @@ def test_bad_metadata_upload( assert not self.cachemanager_created def test_mismatched_md5sum_header( - self, client, caplog, server_config, setup_ctrl, pbench_drb_token + self, client, caplog, server_config, pbench_drb_token ): filename = "log.tar.xz" datafile = Path("./lib/pbench/test/unit/server/fixtures/upload/", filename) @@ -245,7 +234,6 @@ def test_bad_extension_upload( tmp_path, caplog, server_config, - setup_ctrl, pbench_drb_token, ): datafile = tmp_path / bad_extension @@ -264,7 +252,7 @@ def test_bad_extension_upload( assert not self.cachemanager_created def test_invalid_authorization_upload( - self, client, caplog, server_config, setup_ctrl, pbench_drb_token_invalid + self, client, caplog, server_config, pbench_drb_token_invalid ): # Upload with invalid token response = client.put( @@ -277,7 +265,7 @@ def test_invalid_authorization_upload( assert not self.cachemanager_created def test_empty_upload( - self, client, tmp_path, caplog, server_config, setup_ctrl, pbench_drb_token + self, client, tmp_path, caplog, server_config, pbench_drb_token ): filename = "tmp.tar.xz" datafile = tmp_path / filename @@ -298,14 +286,51 @@ def test_empty_upload( self.verify_logs(caplog) assert not self.cachemanager_created + def test_temp_exists( + self, monkeypatch, client, tmp_path, server_config, pbench_drb_token + ): + md5 = "d41d8cd98f00b204e9800998ecf8427e" + + def td_exists(self, *args, **kwargs): + """Mock out Path.mkdir() + + The trick here is that calling the UPLOAD API results in two calls + to Path.mkdir: one in the __init__ to be sure that ARCHIVE/UPLOAD + exists, and the second for the temporary subdirectory. We want the + first to succeed normally so we'll pass the call to the real mkdir + if the path doesn't end with our MD5 value. + """ + if self.name != md5: + return self.real_mkdir(*args, **kwargs) + raise FileExistsError(str(self)) + + filename = "tmp.tar.xz" + datafile = tmp_path / filename + datafile.write_text("compressed tar ball") + with monkeypatch.context() as m: + m.setattr(Path, "real_mkdir", Path.mkdir, raising=False) + m.setattr(Path, "mkdir", td_exists) + with datafile.open("rb") as data_fp: + response = client.put( + self.gen_uri(server_config, filename), + data=data_fp, + headers=self.gen_headers(pbench_drb_token, md5), + ) + assert response.status_code == HTTPStatus.CONFLICT + assert ( + response.json.get("message") == "Temporary upload directory already exists" + ) + assert not self.cachemanager_created + + @pytest.mark.parametrize( + "exception,status", + ( + (Exception("Test"), HTTPStatus.INTERNAL_SERVER_ERROR), + (DuplicateTarball("x"), HTTPStatus.BAD_REQUEST), + ), + ) def test_upload_cachemanager_error( - self, - client, - caplog, - server_config, - setup_ctrl, - pbench_drb_token, - tarball, + self, client, server_config, pbench_drb_token, tarball, exception, status ): """ Cause the CacheManager.create() to fail; this should trigger the cleanup @@ -313,7 +338,7 @@ def test_upload_cachemanager_error( internal server error. """ datafile, _, md5 = tarball - TestUpload.cachemanager_create_fail = True + TestUpload.cachemanager_create_fail = exception with datafile.open("rb") as data_fp: response = client.put( @@ -322,7 +347,7 @@ def test_upload_cachemanager_error( headers=self.gen_headers(pbench_drb_token, md5), ) - assert response.status_code == HTTPStatus.INTERNAL_SERVER_ERROR + assert response.status_code == status with pytest.raises(DatasetNotFound): Dataset.query(resource_id=md5) @@ -332,7 +357,7 @@ def test_upload_cachemanager_error( assert not Path(str(self.cachemanager_create_path) + ".md5").exists() @pytest.mark.freeze_time("1970-01-01") - def test_upload(self, client, pbench_drb_token, server_config, setup_ctrl, tarball): + def test_upload(self, client, pbench_drb_token, server_config, tarball): """Test a successful dataset upload and validate the metadata and audit information. """ @@ -397,7 +422,7 @@ def test_upload(self, client, pbench_drb_token, server_config, setup_ctrl, tarba @pytest.mark.freeze_time("1970-01-01") def test_upload_invalid_metadata( - self, client, pbench_drb_token, server_config, setup_ctrl, tarball + self, client, pbench_drb_token, server_config, tarball ): """Test a dataset upload with a bad metadata. We expect a failure, and an 'errors' field in the response JSON explaining each error. @@ -428,15 +453,7 @@ def test_upload_invalid_metadata( ], } - def test_upload_duplicate( - self, - client, - caplog, - server_config, - setup_ctrl, - pbench_drb_token, - tarball, - ): + def test_upload_duplicate(self, client, server_config, pbench_drb_token, tarball): datafile, _, md5 = tarball with datafile.open("rb") as data_fp: response = client.put( @@ -464,14 +481,7 @@ def test_upload_duplicate( assert TestUpload.cachemanager_created is None def test_upload_metadata_error( - self, - client, - caplog, - monkeypatch, - server_config, - setup_ctrl, - pbench_drb_token, - tarball, + self, client, monkeypatch, server_config, pbench_drb_token, tarball ): """ Cause the Metadata.setvalue to fail at the very end of the upload so we @@ -530,9 +540,7 @@ def setvalue(dataset: Dataset, key: str, value: Any): assert audit[1].attributes == {"message": "INTERNAL ERROR"} @pytest.mark.freeze_time("1970-01-01") - def test_upload_archive( - self, client, pbench_drb_token, server_config, setup_ctrl, tarball - ): + def test_upload_archive(self, client, pbench_drb_token, server_config, tarball): """Test a successful archiveonly dataset upload.""" datafile, _, md5 = tarball with datafile.open("rb") as data_fp: @@ -594,9 +602,7 @@ def test_upload_archive( } @pytest.mark.freeze_time("1970-01-01") - def test_upload_nometa( - self, client, pbench_drb_token, server_config, setup_ctrl, tarball - ): + def test_upload_nometa(self, client, pbench_drb_token, server_config, tarball): """Test a successful upload of a dataset without metadata.log.""" datafile, _, md5 = tarball TestUpload.create_metadata = False