Skip to content

Commit

Permalink
♻️🐛Storage: disable handling of dangling multipart uploads (#5978)
Browse files Browse the repository at this point in the history
  • Loading branch information
sanderegg authored Jun 20, 2024
1 parent e08c73e commit ad2d3e3
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 249 deletions.
2 changes: 1 addition & 1 deletion .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ exclude_lines =
# Don't complain if non-runnable code isn't run:
if 0:
if __name__ == .__main__.:

if __name__ == __main__.:
# Don't complain about abstract methods, they aren't run:
@(abc\.)?abstract(((class|static)?method)|property)

Expand Down
152 changes: 26 additions & 126 deletions services/storage/src/simcore_service_storage/simcore_s3_dsm.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from pathlib import Path
from typing import Any, Final, cast

import arrow
from aiohttp import web
from aiopg.sa import Engine
from aiopg.sa.connection import SAConnection
Expand Down Expand Up @@ -37,7 +38,6 @@
APP_DB_ENGINE_KEY,
DATCORE_ID,
EXPAND_DIR_MAX_ITEM_COUNT,
MAX_CONCURRENT_DB_TASKS,
MAX_CONCURRENT_S3_TASKS,
MAX_LINK_CHUNK_BYTE_SIZE,
S3_UNDEFINED_OR_EXTERNAL_MULTIPART_ID,
Expand Down Expand Up @@ -71,18 +71,15 @@
from .s3_client import S3MetaData, StorageS3Client
from .s3_utils import S3TransferDataCB, update_task_progress
from .settings import Settings
from .simcore_s3_dsm_utils import (
expand_directory,
get_directory_file_id,
get_simcore_directory,
)
from .simcore_s3_dsm_utils import expand_directory, get_directory_file_id
from .utils import (
convert_db_to_model,
download_to_file_or_raise,
is_file_entry_valid,
is_valid_managed_multipart_upload,
)

_NO_CONCURRENCY: Final[int] = 1
_MAX_PARALLEL_S3_CALLS: Final[NonNegativeInt] = 10

_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -577,7 +574,7 @@ async def delete_project_simcore_s3(
self.simcore_bucket_name, project_id, node_id
)

async def deep_copy_project_simcore_s3(
async def deep_copy_project_simcore_s3( # noqa: C901
self,
user_id: UserID,
src_project: dict[str, Any],
Expand Down Expand Up @@ -788,19 +785,20 @@ async def create_soft_link(
async def synchronise_meta_data_table(
self, *, dry_run: bool
) -> list[StorageFileID]:
file_ids_to_remove = []

async with self.engine.acquire() as conn:
_logger.warning(
"Total number of entries to check %d",
await db_file_meta_data.total(conn),
)
# iterate over all entries to check if there is a file in the S3 backend
async for fmd in db_file_meta_data.list_valid_uploads(conn):
file_ids_to_remove = [
fmd.file_id
async for fmd in db_file_meta_data.list_valid_uploads(conn)
if not await get_s3_client(self.app).file_exists(
self.simcore_bucket_name, s3_object=fmd.object_name
):
# this file does not exist in S3
file_ids_to_remove.append(fmd.file_id)
)
]

if not dry_run:
await db_file_meta_data.delete(conn, file_ids_to_remove)
Expand Down Expand Up @@ -832,7 +830,7 @@ async def _clean_expired_uploads(self) -> None:
1. will try to update the entry from S3 backend if exists
2. will delete the entry if nothing exists in S3 backend.
"""
now = datetime.datetime.utcnow()
now = arrow.utcnow().datetime
async with self.engine.acquire() as conn:
list_of_expired_uploads = await db_file_meta_data.list_fmds(
conn, expired_after=now
Expand All @@ -853,7 +851,7 @@ async def _clean_expired_uploads(self) -> None:
),
reraise=False,
log=_logger,
max_concurrency=MAX_CONCURRENT_DB_TASKS,
max_concurrency=_NO_CONCURRENCY,
)
list_of_fmds_to_delete = [
expired_fmd
Expand All @@ -867,16 +865,24 @@ async def _clean_expired_uploads(self) -> None:
async def _revert_file(
conn: SAConnection, fmd: FileMetaDataAtDB
) -> FileMetaDataAtDB:
if is_valid_managed_multipart_upload(fmd.upload_id):
assert fmd.upload_id # nosec
await s3_client.abort_multipart_upload(
bucket=fmd.bucket_name,
file_id=fmd.file_id,
upload_id=fmd.upload_id,
)
await s3_client.undelete_file(fmd.bucket_name, fmd.file_id)
return await self._update_database_from_storage(conn, fmd)

s3_client = get_s3_client(self.app)
async with self.engine.acquire() as conn:
# NOTE: no concurrency here as we want to run low resources
reverted_fmds = await logged_gather(
*(_revert_file(conn, fmd) for fmd in list_of_fmds_to_delete),
reraise=False,
log=_logger,
max_concurrency=MAX_CONCURRENT_DB_TASKS,
max_concurrency=_NO_CONCURRENCY,
)
list_of_fmds_to_delete = [
fmd
Expand All @@ -892,123 +898,17 @@ async def _revert_file(
"following unfinished/incomplete uploads will now be deleted : [%s]",
[fmd.file_id for fmd in list_of_fmds_to_delete],
)
await logged_gather(
*(
self.delete_file(fmd.user_id, fmd.file_id)
for fmd in list_of_fmds_to_delete
if fmd.user_id is not None
),
log=_logger,
max_concurrency=MAX_CONCURRENT_DB_TASKS,
)
for fmd in list_of_fmds_to_delete:
if fmd.user_id is not None:
await self.delete_file(fmd.user_id, fmd.file_id)

_logger.warning(
"pending/incomplete uploads of [%s] removed",
[fmd.file_id for fmd in list_of_fmds_to_delete],
)

async def _clean_dangling_multipart_uploads(self):
"""this method removes any dangling multipart upload that
was initiated on S3 backend if it does not exist in file_meta_data
table.
Use-cases:
- presigned multipart upload: a multipart upload is created after the entry in the table (
if the expiry date is still in the future we do not remove the upload
)
- S3 external or internal potentially multipart upload (using S3 direct access we do not know
if they create multipart uploads and have no control over it, the only thing we know is the upload
expiry date)
--> we only remove dangling upload IDs which expiry date is in the past or that have no upload in process
or no entry at all in the database
"""
current_multipart_uploads: list[
tuple[UploadID, SimcoreS3FileID]
] = await get_s3_client(self.app).list_ongoing_multipart_uploads(
self.simcore_bucket_name
)
if not current_multipart_uploads:
return
_logger.debug("found %s", f"{current_multipart_uploads=}")

# there are some multipart uploads, checking if
# there is a counterpart in file_meta_data
# NOTE: S3 url encode file uuid with specific characters
async with self.engine.acquire() as conn:
# files have a 1 to 1 entry in the file_meta_data table
file_ids: list[SimcoreS3FileID] = [
SimcoreS3FileID(urllib.parse.unquote(f))
for _, f in current_multipart_uploads
]
# if a file is part of directory, check if this directory is present in
# the file_meta_data table; extracting the SimcoreS3DirectoryID from
# the file path to find it's equivalent
directory_and_file_ids: list[SimcoreS3FileID] = file_ids + [
SimcoreS3FileID(get_simcore_directory(file_id)) for file_id in file_ids
]

list_of_known_metadata_entries: list[
FileMetaDataAtDB
] = await db_file_meta_data.list_fmds(
conn, file_ids=list(set(directory_and_file_ids))
)
_logger.debug("metadata entries %s", f"{list_of_known_metadata_entries=}")

# known uploads do have an expiry date (regardless of upload ID that we do not always know)
list_of_known_uploads = [
fmd for fmd in list_of_known_metadata_entries if fmd.upload_expires_at
]

# To compile the list of valid uploads, check that the s3_object is
# part of the known uploads.
# The known uploads is composed of entries for files or for directories.
# checking if the s3_object is part of either one of those
list_of_valid_upload_ids: list[str] = []
known_directory_names: set[str] = {
x.object_name for x in list_of_known_uploads if x.is_directory is True
}
known_file_names: set[str] = {
x.object_name for x in list_of_known_uploads if x.is_directory is False
}
for upload_id, s3_object_name in current_multipart_uploads:
file_id = SimcoreS3FileID(urllib.parse.unquote(s3_object_name))
if (
file_id in known_file_names
or get_simcore_directory(file_id) in known_directory_names
):
list_of_valid_upload_ids.append(upload_id)

_logger.debug("found the following %s", f"{list_of_valid_upload_ids=}")
if list_of_invalid_uploads := [
(
upload_id,
file_id,
)
for upload_id, file_id in current_multipart_uploads
if upload_id not in list_of_valid_upload_ids
]:
_logger.debug(
"the following %s was found and will now be aborted",
f"{list_of_invalid_uploads=}",
)
await logged_gather(
*(
get_s3_client(self.app).abort_multipart_upload(
self.simcore_bucket_name, file_id, upload_id
)
for upload_id, file_id in list_of_invalid_uploads
),
max_concurrency=MAX_CONCURRENT_S3_TASKS,
)
_logger.warning(
"Dangling multipart uploads '%s', were aborted. "
"TIP: There were multipart uploads active on S3 with no counter-part in the file_meta_data database. "
"This might indicate that something went wrong in how storage handles multipart uploads!!",
f"{list_of_invalid_uploads}",
)

async def clean_expired_uploads(self) -> None:
await self._clean_expired_uploads()
await self._clean_dangling_multipart_uploads()

async def _update_database_from_storage(
self, conn: SAConnection, fmd: FileMetaDataAtDB
Expand Down Expand Up @@ -1171,7 +1071,7 @@ async def _create_fmd_for_upload(
is_directory: bool,
sha256_checksum: SHA256Str | None,
) -> FileMetaDataAtDB:
now = datetime.datetime.utcnow()
now = arrow.utcnow().datetime
upload_expiration_date = now + datetime.timedelta(
seconds=self.settings.STORAGE_DEFAULT_PRESIGNED_LINK_EXPIRATION_SECONDS
)
Expand Down
2 changes: 1 addition & 1 deletion services/storage/tests/fixtures/data_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ async def random_project_with_files(
upload_file: Callable[..., Awaitable[tuple[Path, SimcoreS3FileID]]],
faker: Faker,
) -> Callable[
[int, tuple[ByteSize, ...]],
[int, tuple[ByteSize, ...], tuple[SHA256Str, ...]],
Awaitable[
tuple[
dict[str, Any], dict[NodeID, dict[SimcoreS3FileID, dict[str, Path | str]]]
Expand Down
Loading

0 comments on commit ad2d3e3

Please sign in to comment.