Skip to content

Commit

Permalink
♻️ reusing the same S3 client (#5289)
Browse files Browse the repository at this point in the history
  • Loading branch information
matusdrobuliak66 authored Apr 8, 2024
1 parent 3fa4101 commit c5aa314
Show file tree
Hide file tree
Showing 21 changed files with 166 additions and 193 deletions.
43 changes: 23 additions & 20 deletions packages/aws-library/src/aws_library/s3/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,31 @@
from typing import cast

import aioboto3
import botocore.exceptions
from aiobotocore.session import ClientCreatorContext
from botocore.client import Config
from models_library.api_schemas_storage import S3BucketName
from pydantic import AnyUrl, parse_obj_as
from settings_library.s3 import S3Settings
from types_aiobotocore_s3 import S3Client

from .errors import S3RuntimeError
from .errors import s3_exception_handler

_logger = logging.getLogger(__name__)

_S3_MAX_CONCURRENCY_DEFAULT = 10


@dataclass(frozen=True)
class SimcoreS3API:
client: S3Client
session: aioboto3.Session
exit_stack: contextlib.AsyncExitStack
transfer_max_concurrency: int

@classmethod
async def create(cls, settings: S3Settings) -> "SimcoreS3API":
async def create(
cls, settings: S3Settings, s3_max_concurrency: int = _S3_MAX_CONCURRENCY_DEFAULT
) -> "SimcoreS3API":
session = aioboto3.Session()
session_client = session.client(
"s3",
Expand All @@ -37,10 +41,11 @@ async def create(cls, settings: S3Settings) -> "SimcoreS3API":
)
assert isinstance(session_client, ClientCreatorContext) # nosec
exit_stack = contextlib.AsyncExitStack()
s3_client = cast(
S3Settings, await exit_stack.enter_async_context(session_client)
)
return cls(s3_client, session, exit_stack)
s3_client = cast(S3Client, await exit_stack.enter_async_context(session_client))
# NOTE: this triggers a botocore.exception.ClientError in case the connection is not made to the S3 backend
await s3_client.list_buckets()

return cls(s3_client, session, exit_stack, s3_max_concurrency)

async def close(self) -> None:
await self.exit_stack.aclose()
Expand All @@ -53,21 +58,19 @@ async def http_check_bucket_connected(self, bucket: S3BucketName) -> bool:
except Exception: # pylint: disable=broad-except
return False

async def create_presigned_download_link(
@s3_exception_handler(_logger)
async def create_single_presigned_download_link(
self,
bucket_name: S3BucketName,
object_key: str,
expiration_secs: int,
) -> AnyUrl:
try:
# NOTE: ensure the bucket/object exists, this will raise if not
await self.client.head_bucket(Bucket=bucket_name)
generated_link = await self.client.generate_presigned_url(
"get_object",
Params={"Bucket": bucket_name, "Key": object_key},
ExpiresIn=expiration_secs,
)
url: AnyUrl = parse_obj_as(AnyUrl, generated_link)
return url
except botocore.exceptions.ClientError as exc:
raise S3RuntimeError from exc # pragma: no cover
# NOTE: ensure the bucket/object exists, this will raise if not
await self.client.head_bucket(Bucket=bucket_name)
generated_link = await self.client.generate_presigned_url(
"get_object",
Params={"Bucket": bucket_name, "Key": object_key},
ExpiresIn=expiration_secs,
)
url: AnyUrl = parse_obj_as(AnyUrl, generated_link)
return url
57 changes: 57 additions & 0 deletions packages/aws-library/src/aws_library/s3/errors.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
import functools
import logging

from botocore import exceptions as botocore_exc
from pydantic.errors import PydanticErrorMixin


Expand All @@ -7,3 +11,56 @@ class S3RuntimeError(PydanticErrorMixin, RuntimeError):

class S3NotConnectedError(S3RuntimeError):
msg_template: str = "Cannot connect with s3 server"


class S3AccessError(S3RuntimeError):
code = "s3_access.error"
msg_template: str = "Unexpected error while accessing S3 backend"


class S3BucketInvalidError(S3AccessError):
code = "s3_bucket.invalid_error"
msg_template: str = "The bucket '{bucket}' is invalid"


class S3KeyNotFoundError(S3AccessError):
code = "s3_key.not_found_error"
msg_template: str = "The file {key} in {bucket} was not found"


def s3_exception_handler(log: logging.Logger):
"""converts typical aiobotocore/boto exceptions to storage exceptions
NOTE: this is a work in progress as more exceptions might arise in different
use-cases
"""

def decorator(func): # noqa: C901
@functools.wraps(func)
async def wrapper(self, *args, **kwargs):
try:
return await func(self, *args, **kwargs)
except self.client.exceptions.NoSuchBucket as exc:
raise S3BucketInvalidError(
bucket=exc.response.get("Error", {}).get("BucketName", "undefined")
) from exc
except botocore_exc.ClientError as exc:
status_code = int(exc.response.get("Error", {}).get("Code", -1))
operation_name = exc.operation_name

match status_code, operation_name:
case 404, "HeadObject":
raise S3KeyNotFoundError(bucket=args[0], key=args[1]) from exc
case (404, "HeadBucket") | (403, "HeadBucket"):
raise S3BucketInvalidError(bucket=args[0]) from exc
case _:
raise S3AccessError from exc
except botocore_exc.EndpointConnectionError as exc:
raise S3AccessError from exc

except botocore_exc.BotoCoreError as exc:
log.exception("Unexpected error in s3 client: ")
raise S3AccessError from exc

return wrapper

return decorator
2 changes: 1 addition & 1 deletion packages/aws-library/tests/test_s3_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ async def upload_file_to_bucket(
async def test_create_single_presigned_download_link(
simcore_s3_api: SimcoreS3API, upload_file_to_bucket: None, create_s3_bucket
):
download_url = await simcore_s3_api.create_presigned_download_link(
download_url = await simcore_s3_api.create_single_presigned_download_link(
create_s3_bucket, "test.csv", 50
)
assert isinstance(download_url, AnyUrl)
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ async def export_service_runs(
)

# Create presigned S3 link
generated_url: AnyUrl = await s3_client.create_presigned_download_link(
generated_url: AnyUrl = await s3_client.create_single_presigned_download_link(
bucket_name=s3_bucket_name,
object_key=s3_object_key,
expiration_secs=_PRESIGNED_LINK_EXPIRATION_SEC,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ async def mocked_export(mocker: MockerFixture):
@pytest.fixture
async def mocked_presigned_link(mocker: MockerFixture):
mock_presigned_link = mocker.patch(
"simcore_service_resource_usage_tracker.services.resource_tracker_service_runs.SimcoreS3API.create_presigned_download_link",
"simcore_service_resource_usage_tracker.services.resource_tracker_service_runs.SimcoreS3API.create_single_presigned_download_link",
return_value=parse_obj_as(
AnyUrl,
"https://www.testing.com/",
Expand Down
1 change: 1 addition & 0 deletions services/storage/requirements/_base.in
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
--constraint ../../../requirements/constraints.txt


--requirement ../../../packages/aws-library/requirements/_base.in
--requirement ../../../packages/models-library/requirements/_base.in
--requirement ../../../packages/postgres-database/requirements/_base.in
--requirement ../../../packages/settings-library/requirements/_base.in
Expand Down
1 change: 1 addition & 0 deletions services/storage/requirements/_base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ typing-extensions==4.10.0
# pydantic
# typer-slim
# types-aiobotocore
# types-aiobotocore-ec2
# types-aiobotocore-s3
ujson==5.9.0
# via aiohttp-swagger
Expand Down
1 change: 1 addition & 0 deletions services/storage/requirements/ci.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
--requirement _test.txt

# installs this repo's packages
simcore-aws-library @ ../../packages/aws-library/
simcore-models-library @ ../../packages/models-library/
simcore-postgres-database @ ../../packages/postgres-database/
pytest-simcore @ ../../packages/pytest-simcore/
Expand Down
1 change: 1 addition & 0 deletions services/storage/requirements/dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
--requirement _tools.txt

# installs this repo's packages
--editable ../../packages/aws-library/
--editable ../../packages/models-library
--editable ../../packages/postgres-database/
--editable ../../packages/pytest-simcore/
Expand Down
1 change: 1 addition & 0 deletions services/storage/requirements/prod.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
--requirement _base.txt

# installs this repo's packages
simcore-aws-library @ ../../packages/aws-library/
simcore-models-library @ ../../packages/models-library/
simcore-postgres-database @ ../../packages/postgres-database/
simcore-service-library[aiohttp] @ ../../packages/service-library
Expand Down
15 changes: 0 additions & 15 deletions services/storage/src/simcore_service_storage/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,3 @@ class ProjectNotFoundError(DatabaseAccessError):
class LinkAlreadyExistsError(DatabaseAccessError):
code = "link.already_exists_error"
msg_template: str = "The link {file_id} already exists"


class S3AccessError(StorageRuntimeError):
code = "s3_access.error"
msg_template: str = "Unexpected error while accessing S3 backend"


class S3BucketInvalidError(S3AccessError):
code = "s3_bucket.invalid_error"
msg_template: str = "The bucket '{bucket}' is invalid"


class S3KeyNotFoundError(S3AccessError):
code = "s3_key.not_found_error"
msg_template: str = "The file {key} in {bucket} was not found"
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import logging

from aiohttp import web
from aws_library.s3.errors import S3AccessError, S3BucketInvalidError
from models_library.api_schemas_storage import HealthCheck, S3BucketName
from models_library.app_diagnostics import AppStatusCheck
from servicelib.json_serialization import json_dumps
Expand All @@ -15,7 +16,6 @@
from .constants import APP_CONFIG_KEY
from .db import get_engine_state
from .db import is_service_responsive as is_pg_responsive
from .exceptions import S3AccessError, S3BucketInvalidError
from .s3 import get_s3_client
from .settings import Settings

Expand Down
37 changes: 18 additions & 19 deletions services/storage/src/simcore_service_storage/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
"""
import json
import logging
from contextlib import AsyncExitStack
from typing import cast

from aiohttp import web
Expand All @@ -25,30 +24,30 @@ async def setup_s3_client(app):
storage_s3_settings = storage_settings.STORAGE_S3
assert storage_s3_settings # nosec

async with AsyncExitStack() as exit_stack:
client = None
async for attempt in AsyncRetrying(
wait=wait_fixed(RETRY_WAIT_SECS),
before_sleep=before_sleep_log(log, logging.WARNING),
reraise=True,
):
with attempt:
client = await StorageS3Client.create(
exit_stack,
storage_s3_settings,
storage_settings.STORAGE_S3_CLIENT_MAX_TRANSFER_CONCURRENCY,
)
log.info(
"S3 client %s successfully created [%s]",
f"{client=}",
json.dumps(attempt.retry_state.retry_object.statistics),
)
client = None
async for attempt in AsyncRetrying(
wait=wait_fixed(RETRY_WAIT_SECS),
before_sleep=before_sleep_log(log, logging.WARNING),
reraise=True,
):
with attempt:
client = await StorageS3Client.create(
storage_s3_settings,
storage_settings.STORAGE_S3_CLIENT_MAX_TRANSFER_CONCURRENCY,
)
log.info(
"S3 client %s successfully created [%s]",
f"{client=}",
json.dumps(attempt.retry_state.retry_object.statistics),
)
assert client # nosec
app[APP_S3_KEY] = client

yield
# tear-down
log.debug("closing %s", f"{client=}")
await client.close()

log.info("closed s3 client %s", f"{client=}")


Expand Down
50 changes: 10 additions & 40 deletions services/storage/src/simcore_service_storage/s3_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,20 @@
import logging
import urllib.parse
from collections.abc import AsyncGenerator, Callable
from contextlib import AsyncExitStack
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Final, TypeAlias, cast
from typing import Any, Final, TypeAlias

import aioboto3
from aiobotocore.session import ClientCreatorContext
from aws_library.s3.client import SimcoreS3API
from aws_library.s3.errors import S3KeyNotFoundError, s3_exception_handler
from boto3.s3.transfer import TransferConfig
from botocore.client import Config
from models_library.api_schemas_storage import UploadedPart
from models_library.basic_types import SHA256Str
from models_library.projects import ProjectID
from models_library.projects_nodes_io import NodeID, SimcoreS3FileID
from pydantic import AnyUrl, ByteSize, NonNegativeInt, parse_obj_as
from servicelib.logging_utils import log_context
from servicelib.utils import logged_gather
from settings_library.s3 import S3Settings
from simcore_service_storage.exceptions import S3KeyNotFoundError
from types_aiobotocore_s3 import S3Client
from types_aiobotocore_s3.type_defs import (
ListObjectsV2OutputTypeDef,
Expand All @@ -30,7 +26,7 @@

from .constants import EXPAND_DIR_MAX_ITEM_COUNT, MULTIPART_UPLOADS_MIN_TOTAL_SIZE
from .models import ETag, MultiPartUploadLinks, S3BucketName, UploadID
from .s3_utils import compute_num_file_chunks, s3_exception_handler
from .s3_utils import compute_num_file_chunks

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -59,9 +55,11 @@ def from_botocore_object(obj: ObjectTypeDef) -> "S3MetaData":
file_id=SimcoreS3FileID(obj["Key"]),
last_modified=obj["LastModified"],
e_tag=json.loads(obj["ETag"]),
sha256_checksum=SHA256Str(obj.get("ChecksumSHA256"))
if obj.get("ChecksumSHA256")
else None,
sha256_checksum=(
SHA256Str(obj.get("ChecksumSHA256"))
if obj.get("ChecksumSHA256")
else None
),
size=obj["Size"],
)

Expand All @@ -85,35 +83,7 @@ async def _list_objects_v2_paginated_gen(
yield items_in_page


@dataclass
class StorageS3Client: # pylint: disable=too-many-public-methods
session: aioboto3.Session
client: S3Client
transfer_max_concurrency: int

@classmethod
async def create(
cls, exit_stack: AsyncExitStack, settings: S3Settings, s3_max_concurrency: int
) -> "StorageS3Client":
# upon creation the client does not try to connect, one need to make an operation
session = aioboto3.Session()
# NOTE: session.client returns an aiobotocore client enhanced with aioboto3 fcts (e.g. download_file, upload_file, copy_file...)
session_client = session.client(
"s3",
endpoint_url=settings.S3_ENDPOINT,
aws_access_key_id=settings.S3_ACCESS_KEY,
aws_secret_access_key=settings.S3_SECRET_KEY,
aws_session_token=settings.S3_ACCESS_TOKEN,
region_name=settings.S3_REGION,
config=Config(signature_version="s3v4"),
)
assert isinstance(session_client, ClientCreatorContext) # nosec
client = cast(S3Client, await exit_stack.enter_async_context(session_client))
# NOTE: this triggers a botocore.exception.ClientError in case the connection is not made to the S3 backend
await client.list_buckets()

return cls(session, client, s3_max_concurrency)

class StorageS3Client(SimcoreS3API): # pylint: disable=too-many-public-methods
@s3_exception_handler(_logger)
async def create_bucket(self, bucket: S3BucketName) -> None:
_logger.debug("Creating bucket: %s", bucket)
Expand Down
Loading

0 comments on commit c5aa314

Please sign in to comment.