diff --git a/lib/pbench/server/api/resources/upload_api.py b/lib/pbench/server/api/resources/upload_api.py index 5418005c69..59d3eaef7b 100644 --- a/lib/pbench/server/api/resources/upload_api.py +++ b/lib/pbench/server/api/resources/upload_api.py @@ -3,6 +3,7 @@ import hashlib from http import HTTPStatus import os +from pathlib import Path import shutil from typing import Any, Optional @@ -26,7 +27,7 @@ Schema, ) import pbench.server.auth.auth as Auth -from pbench.server.cache_manager import CacheManager, MetadataError +from pbench.server.cache_manager import CacheManager, DuplicateTarball, MetadataError from pbench.server.database.models.audit import ( Audit, AuditReason, @@ -177,6 +178,7 @@ def _put(self, args: ApiParams, request: Request, context: ApiContext) -> Respon attributes = {"access": access, "metadata": metadata} filename = args.uri["filename"] + tmp_dir: Optional[Path] = None try: try: @@ -236,9 +238,23 @@ def _put(self, args: ApiParams, request: Request, context: ApiContext) -> Respon f"'Content-Length' {content_length} must be greater than 0", ) - tar_full_path = self.temporary / filename - md5_full_path = self.temporary / f"{filename}.md5" - dataset_name = Dataset.stem(tar_full_path) + dataset_name = Dataset.stem(filename) + + # NOTE: we isolate each uploaded tarball into a private MD5-based + # subdirectory in order to retain the original tarball stem name + # for the cache manager while giving us protection against multiple + # tarballs with the same name. (A duplicate MD5 will have already + # failed, so that's not a concern.) + try: + tmp_dir = self.temporary / md5sum + tmp_dir.mkdir() + except FileExistsError: + raise CleanupTime( + HTTPStatus.CONFLICT, + "Temporary upload directory already exists", + ) + tar_full_path = tmp_dir / filename + md5_full_path = tmp_dir / f"{filename}.md5" bytes_received = 0 usage = shutil.disk_usage(tar_full_path.parent) @@ -257,7 +273,7 @@ def _put(self, args: ApiParams, request: Request, context: ApiContext) -> Respon try: dataset = Dataset( owner_id=user_id, - name=Dataset.stem(tar_full_path), + name=dataset_name, resource_id=md5sum, access=access, ) @@ -309,60 +325,61 @@ def _put(self, args: ApiParams, request: Request, context: ApiContext) -> Respon # error recovery. recovery.add(tar_full_path.unlink) - with tar_full_path.open(mode="wb") as ofp: - hash_md5 = hashlib.md5() + # NOTE: We know that the MD5 is unique at this point; so even if + # two tarballs with the same name are uploaded concurrently, by + # writing into a temporary directory named for the MD5 we're + # assured that they can't conflict. + try: + with tar_full_path.open(mode="wb") as ofp: + hash_md5 = hashlib.md5() - try: while True: chunk = request.stream.read(self.CHUNK_SIZE) bytes_received += len(chunk) if len(chunk) == 0 or bytes_received > content_length: break - ofp.write(chunk) hash_md5.update(chunk) - except OSError as exc: - if exc.errno == errno.ENOSPC: - raise CleanupTime( - HTTPStatus.INSUFFICIENT_STORAGE, - f"Out of space on {tar_full_path.root}", - ) - else: - raise CleanupTime( - HTTPStatus.INTERNAL_SERVER_ERROR, - f"Unexpected error {exc.errno} encountered during file upload", - ) - except Exception: + except OSError as exc: + if exc.errno == errno.ENOSPC: raise CleanupTime( - HTTPStatus.INTERNAL_SERVER_ERROR, - "Unexpected error encountered during file upload", + HTTPStatus.INSUFFICIENT_STORAGE, + f"Out of space on {tar_full_path.root}", ) - - if bytes_received != content_length: - raise CleanupTime( - HTTPStatus.BAD_REQUEST, - f"Expected {content_length} bytes but received {bytes_received} bytes", - ) - elif hash_md5.hexdigest() != md5sum: + else: raise CleanupTime( - HTTPStatus.BAD_REQUEST, - f"MD5 checksum {hash_md5.hexdigest()} does not match expected {md5sum}", + HTTPStatus.INTERNAL_SERVER_ERROR, + f"Unexpected error {exc.errno} encountered during file upload", ) + except Exception: + raise CleanupTime( + HTTPStatus.INTERNAL_SERVER_ERROR, + "Unexpected error encountered during file upload", + ) - # First write the .md5 - current_app.logger.info( - "Creating MD5 file {}: {}", md5_full_path, md5sum + if bytes_received != content_length: + raise CleanupTime( + HTTPStatus.BAD_REQUEST, + f"Expected {content_length} bytes but received {bytes_received} bytes", + ) + elif hash_md5.hexdigest() != md5sum: + raise CleanupTime( + HTTPStatus.BAD_REQUEST, + f"MD5 checksum {hash_md5.hexdigest()} does not match expected {md5sum}", ) - # From this point attempt to remove the MD5 file on error exit - recovery.add(md5_full_path.unlink) - try: - md5_full_path.write_text(f"{md5sum} {filename}\n") - except Exception: - raise CleanupTime( - HTTPStatus.INTERNAL_SERVER_ERROR, - f"Failed to write .md5 file '{md5_full_path}'", - ) + # First write the .md5 + current_app.logger.info("Creating MD5 file {}: {}", md5_full_path, md5sum) + + # From this point attempt to remove the MD5 file on error exit + recovery.add(md5_full_path.unlink) + try: + md5_full_path.write_text(f"{md5sum} {filename}\n") + except Exception: + raise CleanupTime( + HTTPStatus.INTERNAL_SERVER_ERROR, + f"Failed to write .md5 file '{md5_full_path}'", + ) # Create a cache manager object try: @@ -375,6 +392,11 @@ def _put(self, args: ApiParams, request: Request, context: ApiContext) -> Respon # Move the files to their final location try: tarball = cache_m.create(tar_full_path) + except DuplicateTarball: + raise CleanupTime( + HTTPStatus.BAD_REQUEST, + f"A tarball with the name {dataset_name!r} already exists", + ) except MetadataError as exc: raise CleanupTime( HTTPStatus.BAD_REQUEST, @@ -500,6 +522,12 @@ def _put(self, args: ApiParams, request: Request, context: ApiContext) -> Respon raise APIInternalError(message) from e else: raise APIAbort(status, message) from e + finally: + if tmp_dir: + try: + shutil.rmtree(tmp_dir) + except Exception as e: + current_app.logger.warning("Error removing {}: {}", tmp_dir, str(e)) response = jsonify(dict(message="File successfully uploaded")) response.status_code = HTTPStatus.CREATED diff --git a/lib/pbench/test/unit/server/test_upload.py b/lib/pbench/test/unit/server/test_upload.py index b696d5d3f4..ebecee1c23 100644 --- a/lib/pbench/test/unit/server/test_upload.py +++ b/lib/pbench/test/unit/server/test_upload.py @@ -1,14 +1,13 @@ from http import HTTPStatus from logging import Logger from pathlib import Path -import socket from typing import Any from freezegun import freeze_time import pytest from pbench.server import OperationCode, PbenchServerConfig -from pbench.server.cache_manager import CacheManager +from pbench.server.cache_manager import CacheManager, DuplicateTarball from pbench.server.database.models.audit import ( Audit, AuditReason, @@ -26,17 +25,11 @@ class TestUpload: cachemanager_created = None - cachemanager_create_fail = False + cachemanager_create_fail = None cachemanager_create_path = None tarball_deleted = None create_metadata = True - @pytest.fixture - def setup_ctrl(self): - self.controller = socket.gethostname() - yield - self.controller = None - @staticmethod def gen_uri(server_config, filename="f.tar.xz"): return f"{server_config.rest_uri}/upload/{filename}" @@ -75,7 +68,7 @@ def create(self, path: Path) -> FakeTarball: controller = "ctrl" TestUpload.cachemanager_create_path = path if TestUpload.cachemanager_create_fail: - raise Exception() + raise TestUpload.cachemanager_create_fail self.controllers.append(controller) tarball = FakeTarball(path) if TestUpload.create_metadata: @@ -84,7 +77,7 @@ def create(self, path: Path) -> FakeTarball: return tarball TestUpload.cachemanager_created = None - TestUpload.cachemanager_create_fail = False + TestUpload.cachemanager_create_fail = None TestUpload.cachemanager_create_path = None TestUpload.tarball_deleted = None monkeypatch.setattr(CacheManager, "__init__", FakeCacheManager.__init__) @@ -110,7 +103,7 @@ def test_malformed_authorization_header( assert not self.cachemanager_created def test_missing_md5sum_header_upload( - self, client, caplog, server_config, setup_ctrl, pbench_drb_token + self, client, caplog, server_config, pbench_drb_token ): expected_message = "Missing required 'Content-MD5' header" response = client.put( @@ -125,14 +118,13 @@ def test_missing_md5sum_header_upload( assert not self.cachemanager_created def test_missing_md5sum_header_value( - self, client, caplog, server_config, setup_ctrl, pbench_drb_token + self, client, caplog, server_config, pbench_drb_token ): expected_message = "Missing required 'Content-MD5' header value" response = client.put( self.gen_uri(server_config), headers={ "Authorization": "Bearer " + pbench_drb_token, - "controller": self.controller, "Content-MD5": "", }, ) @@ -142,7 +134,7 @@ def test_missing_md5sum_header_value( assert not self.cachemanager_created def test_missing_filename_extension( - self, client, caplog, server_config, setup_ctrl, pbench_drb_token + self, client, caplog, server_config, pbench_drb_token ): """Test with URL uploading a file named "f" which is missing the required filename extension""" @@ -159,7 +151,7 @@ def test_missing_filename_extension( assert not self.cachemanager_created def test_missing_length_header_upload( - self, client, caplog, server_config, setup_ctrl, pbench_drb_token + self, client, caplog, server_config, pbench_drb_token ): expected_message = "Missing or invalid 'Content-Length' header" response = client.put( @@ -175,7 +167,7 @@ def test_missing_length_header_upload( assert not self.cachemanager_created def test_bad_length_header_upload( - self, client, caplog, server_config, setup_ctrl, pbench_drb_token + self, client, caplog, server_config, pbench_drb_token ): expected_message = "Missing or invalid 'Content-Length' header" response = client.put( @@ -191,15 +183,12 @@ def test_bad_length_header_upload( self.verify_logs(caplog) assert not self.cachemanager_created - def test_bad_metadata_upload( - self, client, caplog, server_config, setup_ctrl, pbench_drb_token - ): + def test_bad_metadata_upload(self, client, server_config, pbench_drb_token): with freeze_time("1970-01-01 00:42:00"): response = client.put( self.gen_uri(server_config), headers={ "Authorization": "Bearer " + pbench_drb_token, - "controller": self.controller, "Content-MD5": "ANYMD5", "Content-Length": "STRING", }, @@ -218,7 +207,7 @@ def test_bad_metadata_upload( assert not self.cachemanager_created def test_mismatched_md5sum_header( - self, client, caplog, server_config, setup_ctrl, pbench_drb_token + self, client, caplog, server_config, pbench_drb_token ): filename = "log.tar.xz" datafile = Path("./lib/pbench/test/unit/server/fixtures/upload/", filename) @@ -245,7 +234,6 @@ def test_bad_extension_upload( tmp_path, caplog, server_config, - setup_ctrl, pbench_drb_token, ): datafile = tmp_path / bad_extension @@ -264,7 +252,7 @@ def test_bad_extension_upload( assert not self.cachemanager_created def test_invalid_authorization_upload( - self, client, caplog, server_config, setup_ctrl, pbench_drb_token_invalid + self, client, caplog, server_config, pbench_drb_token_invalid ): # Upload with invalid token response = client.put( @@ -277,7 +265,7 @@ def test_invalid_authorization_upload( assert not self.cachemanager_created def test_empty_upload( - self, client, tmp_path, caplog, server_config, setup_ctrl, pbench_drb_token + self, client, tmp_path, caplog, server_config, pbench_drb_token ): filename = "tmp.tar.xz" datafile = tmp_path / filename @@ -298,14 +286,51 @@ def test_empty_upload( self.verify_logs(caplog) assert not self.cachemanager_created + def test_temp_exists( + self, monkeypatch, client, tmp_path, server_config, pbench_drb_token + ): + md5 = "d41d8cd98f00b204e9800998ecf8427e" + + def td_exists(self, *args, **kwargs): + """Mock out Path.mkdir() + + The trick here is that calling the UPLOAD API results in two calls + to Path.mkdir: one in the __init__ to be sure that ARCHIVE/UPLOAD + exists, and the second for the temporary subdirectory. We want the + first to succeed normally so we'll pass the call to the real mkdir + if the path doesn't end with our MD5 value. + """ + if self.name != md5: + return self.real_mkdir(*args, **kwargs) + raise FileExistsError(str(self)) + + filename = "tmp.tar.xz" + datafile = tmp_path / filename + datafile.write_text("compressed tar ball") + with monkeypatch.context() as m: + m.setattr(Path, "real_mkdir", Path.mkdir, raising=False) + m.setattr(Path, "mkdir", td_exists) + with datafile.open("rb") as data_fp: + response = client.put( + self.gen_uri(server_config, filename), + data=data_fp, + headers=self.gen_headers(pbench_drb_token, md5), + ) + assert response.status_code == HTTPStatus.CONFLICT + assert ( + response.json.get("message") == "Temporary upload directory already exists" + ) + assert not self.cachemanager_created + + @pytest.mark.parametrize( + "exception,status", + ( + (Exception("Test"), HTTPStatus.INTERNAL_SERVER_ERROR), + (DuplicateTarball("x"), HTTPStatus.BAD_REQUEST), + ), + ) def test_upload_cachemanager_error( - self, - client, - caplog, - server_config, - setup_ctrl, - pbench_drb_token, - tarball, + self, client, server_config, pbench_drb_token, tarball, exception, status ): """ Cause the CacheManager.create() to fail; this should trigger the cleanup @@ -313,7 +338,7 @@ def test_upload_cachemanager_error( internal server error. """ datafile, _, md5 = tarball - TestUpload.cachemanager_create_fail = True + TestUpload.cachemanager_create_fail = exception with datafile.open("rb") as data_fp: response = client.put( @@ -322,7 +347,7 @@ def test_upload_cachemanager_error( headers=self.gen_headers(pbench_drb_token, md5), ) - assert response.status_code == HTTPStatus.INTERNAL_SERVER_ERROR + assert response.status_code == status with pytest.raises(DatasetNotFound): Dataset.query(resource_id=md5) @@ -332,7 +357,7 @@ def test_upload_cachemanager_error( assert not Path(str(self.cachemanager_create_path) + ".md5").exists() @pytest.mark.freeze_time("1970-01-01") - def test_upload(self, client, pbench_drb_token, server_config, setup_ctrl, tarball): + def test_upload(self, client, pbench_drb_token, server_config, tarball): """Test a successful dataset upload and validate the metadata and audit information. """ @@ -397,7 +422,7 @@ def test_upload(self, client, pbench_drb_token, server_config, setup_ctrl, tarba @pytest.mark.freeze_time("1970-01-01") def test_upload_invalid_metadata( - self, client, pbench_drb_token, server_config, setup_ctrl, tarball + self, client, pbench_drb_token, server_config, tarball ): """Test a dataset upload with a bad metadata. We expect a failure, and an 'errors' field in the response JSON explaining each error. @@ -428,15 +453,7 @@ def test_upload_invalid_metadata( ], } - def test_upload_duplicate( - self, - client, - caplog, - server_config, - setup_ctrl, - pbench_drb_token, - tarball, - ): + def test_upload_duplicate(self, client, server_config, pbench_drb_token, tarball): datafile, _, md5 = tarball with datafile.open("rb") as data_fp: response = client.put( @@ -464,14 +481,7 @@ def test_upload_duplicate( assert TestUpload.cachemanager_created is None def test_upload_metadata_error( - self, - client, - caplog, - monkeypatch, - server_config, - setup_ctrl, - pbench_drb_token, - tarball, + self, client, monkeypatch, server_config, pbench_drb_token, tarball ): """ Cause the Metadata.setvalue to fail at the very end of the upload so we @@ -530,9 +540,7 @@ def setvalue(dataset: Dataset, key: str, value: Any): assert audit[1].attributes == {"message": "INTERNAL ERROR"} @pytest.mark.freeze_time("1970-01-01") - def test_upload_archive( - self, client, pbench_drb_token, server_config, setup_ctrl, tarball - ): + def test_upload_archive(self, client, pbench_drb_token, server_config, tarball): """Test a successful archiveonly dataset upload.""" datafile, _, md5 = tarball with datafile.open("rb") as data_fp: @@ -594,9 +602,7 @@ def test_upload_archive( } @pytest.mark.freeze_time("1970-01-01") - def test_upload_nometa( - self, client, pbench_drb_token, server_config, setup_ctrl, tarball - ): + def test_upload_nometa(self, client, pbench_drb_token, server_config, tarball): """Test a successful upload of a dataset without metadata.log.""" datafile, _, md5 = tarball TestUpload.create_metadata = False