diff --git a/.coveragerc b/.coveragerc index 5718fd589c1..3299a90950a 100644 --- a/.coveragerc +++ b/.coveragerc @@ -22,7 +22,7 @@ exclude_lines = # Don't complain if non-runnable code isn't run: if 0: if __name__ == .__main__.: - + if __name__ == __main__.: # Don't complain about abstract methods, they aren't run: @(abc\.)?abstract(((class|static)?method)|property) diff --git a/services/storage/src/simcore_service_storage/simcore_s3_dsm.py b/services/storage/src/simcore_service_storage/simcore_s3_dsm.py index 8059bb6165e..301579040c5 100644 --- a/services/storage/src/simcore_service_storage/simcore_s3_dsm.py +++ b/services/storage/src/simcore_service_storage/simcore_s3_dsm.py @@ -10,6 +10,7 @@ from pathlib import Path from typing import Any, Final, cast +import arrow from aiohttp import web from aiopg.sa import Engine from aiopg.sa.connection import SAConnection @@ -37,7 +38,6 @@ APP_DB_ENGINE_KEY, DATCORE_ID, EXPAND_DIR_MAX_ITEM_COUNT, - MAX_CONCURRENT_DB_TASKS, MAX_CONCURRENT_S3_TASKS, MAX_LINK_CHUNK_BYTE_SIZE, S3_UNDEFINED_OR_EXTERNAL_MULTIPART_ID, @@ -71,11 +71,7 @@ from .s3_client import S3MetaData, StorageS3Client from .s3_utils import S3TransferDataCB, update_task_progress from .settings import Settings -from .simcore_s3_dsm_utils import ( - expand_directory, - get_directory_file_id, - get_simcore_directory, -) +from .simcore_s3_dsm_utils import expand_directory, get_directory_file_id from .utils import ( convert_db_to_model, download_to_file_or_raise, @@ -83,6 +79,7 @@ is_valid_managed_multipart_upload, ) +_NO_CONCURRENCY: Final[int] = 1 _MAX_PARALLEL_S3_CALLS: Final[NonNegativeInt] = 10 _logger = logging.getLogger(__name__) @@ -577,7 +574,7 @@ async def delete_project_simcore_s3( self.simcore_bucket_name, project_id, node_id ) - async def deep_copy_project_simcore_s3( + async def deep_copy_project_simcore_s3( # noqa: C901 self, user_id: UserID, src_project: dict[str, Any], @@ -788,19 +785,20 @@ async def create_soft_link( async def synchronise_meta_data_table( self, *, dry_run: bool ) -> list[StorageFileID]: - file_ids_to_remove = [] + async with self.engine.acquire() as conn: _logger.warning( "Total number of entries to check %d", await db_file_meta_data.total(conn), ) # iterate over all entries to check if there is a file in the S3 backend - async for fmd in db_file_meta_data.list_valid_uploads(conn): + file_ids_to_remove = [ + fmd.file_id + async for fmd in db_file_meta_data.list_valid_uploads(conn) if not await get_s3_client(self.app).file_exists( self.simcore_bucket_name, s3_object=fmd.object_name - ): - # this file does not exist in S3 - file_ids_to_remove.append(fmd.file_id) + ) + ] if not dry_run: await db_file_meta_data.delete(conn, file_ids_to_remove) @@ -832,7 +830,7 @@ async def _clean_expired_uploads(self) -> None: 1. will try to update the entry from S3 backend if exists 2. will delete the entry if nothing exists in S3 backend. """ - now = datetime.datetime.utcnow() + now = arrow.utcnow().datetime async with self.engine.acquire() as conn: list_of_expired_uploads = await db_file_meta_data.list_fmds( conn, expired_after=now @@ -853,7 +851,7 @@ async def _clean_expired_uploads(self) -> None: ), reraise=False, log=_logger, - max_concurrency=MAX_CONCURRENT_DB_TASKS, + max_concurrency=_NO_CONCURRENCY, ) list_of_fmds_to_delete = [ expired_fmd @@ -867,16 +865,24 @@ async def _clean_expired_uploads(self) -> None: async def _revert_file( conn: SAConnection, fmd: FileMetaDataAtDB ) -> FileMetaDataAtDB: + if is_valid_managed_multipart_upload(fmd.upload_id): + assert fmd.upload_id # nosec + await s3_client.abort_multipart_upload( + bucket=fmd.bucket_name, + file_id=fmd.file_id, + upload_id=fmd.upload_id, + ) await s3_client.undelete_file(fmd.bucket_name, fmd.file_id) return await self._update_database_from_storage(conn, fmd) s3_client = get_s3_client(self.app) async with self.engine.acquire() as conn: + # NOTE: no concurrency here as we want to run low resources reverted_fmds = await logged_gather( *(_revert_file(conn, fmd) for fmd in list_of_fmds_to_delete), reraise=False, log=_logger, - max_concurrency=MAX_CONCURRENT_DB_TASKS, + max_concurrency=_NO_CONCURRENCY, ) list_of_fmds_to_delete = [ fmd @@ -892,123 +898,17 @@ async def _revert_file( "following unfinished/incomplete uploads will now be deleted : [%s]", [fmd.file_id for fmd in list_of_fmds_to_delete], ) - await logged_gather( - *( - self.delete_file(fmd.user_id, fmd.file_id) - for fmd in list_of_fmds_to_delete - if fmd.user_id is not None - ), - log=_logger, - max_concurrency=MAX_CONCURRENT_DB_TASKS, - ) + for fmd in list_of_fmds_to_delete: + if fmd.user_id is not None: + await self.delete_file(fmd.user_id, fmd.file_id) + _logger.warning( "pending/incomplete uploads of [%s] removed", [fmd.file_id for fmd in list_of_fmds_to_delete], ) - async def _clean_dangling_multipart_uploads(self): - """this method removes any dangling multipart upload that - was initiated on S3 backend if it does not exist in file_meta_data - table. - Use-cases: - - presigned multipart upload: a multipart upload is created after the entry in the table ( - if the expiry date is still in the future we do not remove the upload - ) - - S3 external or internal potentially multipart upload (using S3 direct access we do not know - if they create multipart uploads and have no control over it, the only thing we know is the upload - expiry date) - --> we only remove dangling upload IDs which expiry date is in the past or that have no upload in process - or no entry at all in the database - - """ - current_multipart_uploads: list[ - tuple[UploadID, SimcoreS3FileID] - ] = await get_s3_client(self.app).list_ongoing_multipart_uploads( - self.simcore_bucket_name - ) - if not current_multipart_uploads: - return - _logger.debug("found %s", f"{current_multipart_uploads=}") - - # there are some multipart uploads, checking if - # there is a counterpart in file_meta_data - # NOTE: S3 url encode file uuid with specific characters - async with self.engine.acquire() as conn: - # files have a 1 to 1 entry in the file_meta_data table - file_ids: list[SimcoreS3FileID] = [ - SimcoreS3FileID(urllib.parse.unquote(f)) - for _, f in current_multipart_uploads - ] - # if a file is part of directory, check if this directory is present in - # the file_meta_data table; extracting the SimcoreS3DirectoryID from - # the file path to find it's equivalent - directory_and_file_ids: list[SimcoreS3FileID] = file_ids + [ - SimcoreS3FileID(get_simcore_directory(file_id)) for file_id in file_ids - ] - - list_of_known_metadata_entries: list[ - FileMetaDataAtDB - ] = await db_file_meta_data.list_fmds( - conn, file_ids=list(set(directory_and_file_ids)) - ) - _logger.debug("metadata entries %s", f"{list_of_known_metadata_entries=}") - - # known uploads do have an expiry date (regardless of upload ID that we do not always know) - list_of_known_uploads = [ - fmd for fmd in list_of_known_metadata_entries if fmd.upload_expires_at - ] - - # To compile the list of valid uploads, check that the s3_object is - # part of the known uploads. - # The known uploads is composed of entries for files or for directories. - # checking if the s3_object is part of either one of those - list_of_valid_upload_ids: list[str] = [] - known_directory_names: set[str] = { - x.object_name for x in list_of_known_uploads if x.is_directory is True - } - known_file_names: set[str] = { - x.object_name for x in list_of_known_uploads if x.is_directory is False - } - for upload_id, s3_object_name in current_multipart_uploads: - file_id = SimcoreS3FileID(urllib.parse.unquote(s3_object_name)) - if ( - file_id in known_file_names - or get_simcore_directory(file_id) in known_directory_names - ): - list_of_valid_upload_ids.append(upload_id) - - _logger.debug("found the following %s", f"{list_of_valid_upload_ids=}") - if list_of_invalid_uploads := [ - ( - upload_id, - file_id, - ) - for upload_id, file_id in current_multipart_uploads - if upload_id not in list_of_valid_upload_ids - ]: - _logger.debug( - "the following %s was found and will now be aborted", - f"{list_of_invalid_uploads=}", - ) - await logged_gather( - *( - get_s3_client(self.app).abort_multipart_upload( - self.simcore_bucket_name, file_id, upload_id - ) - for upload_id, file_id in list_of_invalid_uploads - ), - max_concurrency=MAX_CONCURRENT_S3_TASKS, - ) - _logger.warning( - "Dangling multipart uploads '%s', were aborted. " - "TIP: There were multipart uploads active on S3 with no counter-part in the file_meta_data database. " - "This might indicate that something went wrong in how storage handles multipart uploads!!", - f"{list_of_invalid_uploads}", - ) - async def clean_expired_uploads(self) -> None: await self._clean_expired_uploads() - await self._clean_dangling_multipart_uploads() async def _update_database_from_storage( self, conn: SAConnection, fmd: FileMetaDataAtDB @@ -1171,7 +1071,7 @@ async def _create_fmd_for_upload( is_directory: bool, sha256_checksum: SHA256Str | None, ) -> FileMetaDataAtDB: - now = datetime.datetime.utcnow() + now = arrow.utcnow().datetime upload_expiration_date = now + datetime.timedelta( seconds=self.settings.STORAGE_DEFAULT_PRESIGNED_LINK_EXPIRATION_SECONDS ) diff --git a/services/storage/tests/fixtures/data_models.py b/services/storage/tests/fixtures/data_models.py index 66690264ad5..9fb00685e84 100644 --- a/services/storage/tests/fixtures/data_models.py +++ b/services/storage/tests/fixtures/data_models.py @@ -188,7 +188,7 @@ async def random_project_with_files( upload_file: Callable[..., Awaitable[tuple[Path, SimcoreS3FileID]]], faker: Faker, ) -> Callable[ - [int, tuple[ByteSize, ...]], + [int, tuple[ByteSize, ...], tuple[SHA256Str, ...]], Awaitable[ tuple[ dict[str, Any], dict[NodeID, dict[SimcoreS3FileID, dict[str, Path | str]]] diff --git a/services/storage/tests/unit/test_dsm_dsmcleaner.py b/services/storage/tests/unit/test_dsm_dsmcleaner.py index e9d1220da77..36c6ae7342f 100644 --- a/services/storage/tests/unit/test_dsm_dsmcleaner.py +++ b/services/storage/tests/unit/test_dsm_dsmcleaner.py @@ -9,9 +9,11 @@ import asyncio import datetime import urllib.parse +from collections.abc import Awaitable, Callable from pathlib import Path -from typing import Awaitable, Callable, Final +from typing import Final +import arrow import pytest from aiopg.sa.engine import Engine from faker import Faker @@ -54,41 +56,6 @@ def simcore_directory_id(simcore_file_id: SimcoreS3FileID) -> SimcoreS3FileID: ) -async def test_clean_expired_uploads_aborts_dangling_multipart_uploads( - disabled_dsm_cleaner_task, - storage_s3_client: StorageS3Client, - storage_s3_bucket: S3BucketName, - simcore_s3_dsm: SimcoreS3DataManager, -): - """in this test we create a purely dangling multipart upload with no correspongin - entry in file_metadata table - """ - file_id = _faker.file_name() - file_size = parse_obj_as(ByteSize, "100Mib") - upload_links = await storage_s3_client.create_multipart_upload_links( - storage_s3_bucket, - file_id, - file_size, - expiration_secs=3600, - sha256_checksum=parse_obj_as(SHA256Str, _faker.sha256()), - ) - - # ensure we have now an upload id - all_ongoing_uploads = await storage_s3_client.list_ongoing_multipart_uploads( - storage_s3_bucket - ) - assert len(all_ongoing_uploads) == 1 - ongoing_upload_id, ongoing_file_id = all_ongoing_uploads[0] - assert upload_links.upload_id == ongoing_upload_id - assert ongoing_file_id == file_id - - # now run the cleaner - await simcore_s3_dsm.clean_expired_uploads() - - # since there is no entry in the db, this upload shall be cleaned up - assert not await storage_s3_client.list_ongoing_multipart_uploads(storage_s3_bucket) - - @pytest.mark.parametrize( "file_size", [ByteSize(0), parse_obj_as(ByteSize, "10Mib"), parse_obj_as(ByteSize, "100Mib")], @@ -227,7 +194,7 @@ async def test_clean_expired_uploads_deletes_expired_pending_uploads( await conn.execute( file_meta_data.update() .where(file_meta_data.c.file_id == file_or_directory_id) - .values(upload_expires_at=datetime.datetime.utcnow()) + .values(upload_expires_at=arrow.utcnow().datetime) ) await asyncio.sleep(1) await simcore_s3_dsm.clean_expired_uploads() @@ -314,7 +281,7 @@ async def test_clean_expired_uploads_reverts_to_last_known_version_expired_pendi await conn.execute( file_meta_data.update() .where(file_meta_data.c.file_id == file_id) - .values(upload_expires_at=datetime.datetime.utcnow()) + .values(upload_expires_at=arrow.utcnow().datetime) ) await asyncio.sleep(1) await simcore_s3_dsm.clean_expired_uploads() @@ -356,7 +323,7 @@ async def test_clean_expired_uploads_does_not_clean_multipart_upload_on_creation the cleaner in between to ensure the cleaner does not break the mechanism""" file_or_directory_id = simcore_directory_id if is_directory else simcore_file_id - later_than_now = datetime.datetime.utcnow() + datetime.timedelta(minutes=5) + later_than_now = arrow.utcnow().datetime + datetime.timedelta(minutes=5) fmd = FileMetaData.from_simcore_node( user_id, file_or_directory_id, @@ -425,69 +392,3 @@ async def test_clean_expired_uploads_does_not_clean_multipart_upload_on_creation ) assert len(all_ongoing_uploads_after_clean) == len(file_ids_to_upload) assert all_ongoing_uploads == all_ongoing_uploads_after_clean - - -@pytest.mark.parametrize( - "file_size", - [parse_obj_as(ByteSize, "100Mib")], - ids=byte_size_ids, -) -@pytest.mark.parametrize("checksum", [_faker.sha256(), None]) -async def test_clean_expired_uploads_cleans_dangling_multipart_uploads_if_no_corresponding_upload_found( - disabled_dsm_cleaner_task, - aiopg_engine: Engine, - simcore_s3_dsm: SimcoreS3DataManager, - simcore_file_id: SimcoreS3FileID, - user_id: UserID, - file_size: ByteSize, - storage_s3_client: StorageS3Client, - storage_s3_bucket: S3BucketName, - checksum: SHA256Str | None, -): - """This test reproduces what create_file_upload_links in dsm does, but running - the cleaner in between to ensure the cleaner does not break the mechanism""" - later_than_now = datetime.datetime.utcnow() + datetime.timedelta(minutes=5) - fmd = FileMetaData.from_simcore_node( - user_id, - simcore_file_id, - storage_s3_bucket, - simcore_s3_dsm.location_id, - simcore_s3_dsm.location_name, - upload_expires_at=later_than_now, - sha256_checksum=checksum, - ) - # we create the entry in the db - async with aiopg_engine.acquire() as conn: - await db_file_meta_data.upsert(conn, fmd) - - # ensure the database is correctly set up - fmd_in_db = await db_file_meta_data.get(conn, simcore_file_id) - assert fmd_in_db - assert fmd_in_db.upload_expires_at - # we create the multipart upload link - upload_links = await storage_s3_client.create_multipart_upload_links( - storage_s3_bucket, - simcore_file_id, - file_size, - expiration_secs=3600, - sha256_checksum=parse_obj_as(SHA256Str, _faker.sha256()), - ) - - # ensure we have now an upload id - all_ongoing_uploads = await storage_s3_client.list_ongoing_multipart_uploads( - storage_s3_bucket - ) - assert len(all_ongoing_uploads) == 1 - ongoing_upload_id, ongoing_file_id = all_ongoing_uploads[0] - assert upload_links.upload_id == ongoing_upload_id - assert urllib.parse.unquote(ongoing_file_id) == simcore_file_id - - # now cleanup, we do not have an explicit upload_id in the database yet - await simcore_s3_dsm.clean_expired_uploads() - - # ensure we STILL have the same upload id - all_ongoing_uploads_after_clean = ( - await storage_s3_client.list_ongoing_multipart_uploads(storage_s3_bucket) - ) - assert len(all_ongoing_uploads_after_clean) == 1 - assert all_ongoing_uploads == all_ongoing_uploads_after_clean diff --git a/services/storage/tests/unit/test_handlers_simcore_s3.py b/services/storage/tests/unit/test_handlers_simcore_s3.py index ee3396f97da..348e214d5b5 100644 --- a/services/storage/tests/unit/test_handlers_simcore_s3.py +++ b/services/storage/tests/unit/test_handlers_simcore_s3.py @@ -6,9 +6,10 @@ # pylint:disable=too-many-nested-blocks import sys +from collections.abc import Awaitable, Callable from copy import deepcopy from pathlib import Path -from typing import Any, Awaitable, Callable, Literal +from typing import Any, Literal import pytest import sqlalchemy as sa @@ -24,7 +25,6 @@ from models_library.utils.change_case import camel_to_snake from models_library.utils.fastapi_encoders import jsonable_encoder from pydantic import ByteSize, parse_file_as, parse_obj_as -from pytest_mock import MockerFixture from pytest_simcore.helpers.utils_assert import assert_status from servicelib.aiohttp import status from servicelib.aiohttp.long_running_tasks.client import long_running_task_request @@ -192,7 +192,7 @@ async def test_copy_folders_from_valid_project_with_one_large_file( create_simcore_file_id: Callable[[ProjectID, NodeID, str], SimcoreS3FileID], aiopg_engine: Engine, random_project_with_files: Callable[ - ..., + [int, tuple[ByteSize], tuple[SHA256Str]], Awaitable[ tuple[ dict[str, Any], @@ -206,9 +206,9 @@ async def test_copy_folders_from_valid_project_with_one_large_file( SHA256Str, "0b3216d95ec5a36c120ba16c88911dcf5ff655925d0fbdbc74cf95baf86de6fc" ) src_project, src_projects_list = await random_project_with_files( - num_nodes=1, - file_sizes=tuple([parse_obj_as(ByteSize, "210Mib")]), - file_checksums=tuple([sha256_checksum]), + 1, + (parse_obj_as(ByteSize, "210Mib"),), + (sha256_checksum,), ) # 2. create a dst project without files dst_project, nodes_map = clone_project_data(src_project) @@ -381,16 +381,6 @@ async def _create_and_delete_folders_from_project( assert not data -@pytest.fixture -def mock_check_project_exists(mocker: MockerFixture): - # NOTE: this avoid having to inject project in database - mock = mocker.patch( - "simcore_service_storage.dsm._check_project_exists", - autospec=True, - return_value=None, - ) - - @pytest.mark.parametrize( "project", [pytest.param(prj, id=prj.name) for prj in _get_project_with_data()],