diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index b7d3d7ca..7c54631d 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -33,4 +33,6 @@ repos: - types-PyYAML - types-cachetools - types-requests + - types-aiobotocore[essential] + - boto3-stubs[essential] exclude: ^(src/diracx/client/|tests/|build) diff --git a/environment.yml b/environment.yml index a10cfad2..a8167b3c 100644 --- a/environment.yml +++ b/environment.yml @@ -51,8 +51,8 @@ dependencies: - types-requests - uvicorn - moto - - mypy-boto3-s3 + - aiobotocore - botocore - - boto3-stubs - # - pip: - # - git+https://github.com/DIRACGrid/DIRAC.git@integration + - pip: + - types-aiobotocore[essential] + - boto3-stubs[essential] diff --git a/setup.cfg b/setup.cfg index abd38583..01c95b0f 100644 --- a/setup.cfg +++ b/setup.cfg @@ -26,6 +26,7 @@ package_dir = = src python_requires = >=3.10 install_requires = + aiobotocore authlib aiohttp aiomysql diff --git a/src/diracx/core/s3.py b/src/diracx/core/s3.py index 34cf7990..6fc56657 100644 --- a/src/diracx/core/s3.py +++ b/src/diracx/core/s3.py @@ -1,7 +1,11 @@ """Utilities for interacting with S3-compatible storage.""" from __future__ import annotations -__all__ = ("s3_bucket_exists", "s3_object_exists", "generate_presigned_upload") +__all__ = ( + "s3_bucket_exists", + "s3_object_exists", + "generate_presigned_upload", +) import base64 from typing import TYPE_CHECKING, TypedDict, cast @@ -11,7 +15,7 @@ from .models import ChecksumAlgorithm if TYPE_CHECKING: - from mypy_boto3_s3.client import S3Client + from types_aiobotocore_s3.client import S3Client class S3PresignedPostInfo(TypedDict): @@ -19,19 +23,19 @@ class S3PresignedPostInfo(TypedDict): fields: dict[str, str] -def s3_bucket_exists(s3_client: S3Client, bucket_name: str) -> bool: +async def s3_bucket_exists(s3_client: S3Client, bucket_name: str) -> bool: """Check if a bucket exists in S3.""" - return _s3_exists(s3_client.head_bucket, Bucket=bucket_name) + return await _s3_exists(s3_client.head_bucket, Bucket=bucket_name) -def s3_object_exists(s3_client: S3Client, bucket_name: str, key: str) -> bool: +async def s3_object_exists(s3_client: S3Client, bucket_name: str, key: str) -> bool: """Check if an object exists in an S3 bucket.""" - return _s3_exists(s3_client.head_object, Bucket=bucket_name, Key=key) + return await _s3_exists(s3_client.head_object, Bucket=bucket_name, Key=key) -def _s3_exists(method, **kwargs: str) -> bool: +async def _s3_exists(method, **kwargs: str) -> bool: try: - method(**kwargs) + await method(**kwargs) except ClientError as e: if e.response["Error"]["Code"] != "404": raise @@ -40,7 +44,7 @@ def _s3_exists(method, **kwargs: str) -> bool: return True -def generate_presigned_upload( +async def generate_presigned_upload( s3_client: S3Client, bucket_name: str, key: str, @@ -60,7 +64,7 @@ def generate_presigned_upload( conditions = [["content-length-range", size, size]] + [ {k: v} for k, v in fields.items() ] - result = s3_client.generate_presigned_post( + result = await s3_client.generate_presigned_post( Bucket=bucket_name, Key=key, Fields=fields, diff --git a/src/diracx/routers/job_manager/sandboxes.py b/src/diracx/routers/job_manager/sandboxes.py index b47af93f..0c1c0ee8 100644 --- a/src/diracx/routers/job_manager/sandboxes.py +++ b/src/diracx/routers/job_manager/sandboxes.py @@ -1,9 +1,10 @@ from __future__ import annotations +import contextlib from http import HTTPStatus -from typing import TYPE_CHECKING, Annotated +from typing import TYPE_CHECKING, Annotated, AsyncIterator -import botocore.session +from aiobotocore.session import get_session from botocore.config import Config from botocore.errorfactory import ClientError from fastapi import Depends, HTTPException, Query @@ -22,7 +23,7 @@ from diracx.core.settings import ServiceSettingsBase if TYPE_CHECKING: - from mypy_boto3_s3.client import S3Client + from types_aiobotocore_s3.client import S3Client from ..auth import AuthorizedUserInfo, has_properties, verify_dirac_access_token from ..dependencies import SandboxMetadataDB, add_settings_annotation @@ -42,28 +43,26 @@ class SandboxStoreSettings(ServiceSettingsBase, env_prefix="DIRACX_SANDBOX_STORE url_validity_seconds: int = 5 * 60 _client: S3Client = PrivateAttr(None) - def __init__(self, **kwargs): - super().__init__(**kwargs) - - # TODO: Use async - session = botocore.session.get_session() - self._client = session.create_client( + @contextlib.asynccontextmanager + async def lifetime_function(self) -> AsyncIterator[None]: + async with get_session().create_client( "s3", - # endpoint_url=s3_cred["endpoint"], - # aws_access_key_id=s3_cred["access_key_id"], - # aws_secret_access_key=s3_cred["secret_access_key"], **self.s3_client_kwargs, config=Config(signature_version="v4"), - ) - if not s3_bucket_exists(self._client, self.bucket_name): - if not self.auto_create_bucket: - raise ValueError( - f"Bucket {self.bucket_name} does not exist and auto_create_bucket is disabled" - ) - try: - self._client.create_bucket(Bucket=self.bucket_name) - except ClientError as e: - raise ValueError(f"Failed to create bucket {self.bucket_name}") from e + ) as self._client: # type: ignore + if not await s3_bucket_exists(self._client, self.bucket_name): + if not self.auto_create_bucket: + raise ValueError( + f"Bucket {self.bucket_name} does not exist and auto_create_bucket is disabled" + ) + try: + await self._client.create_bucket(Bucket=self.bucket_name) + except ClientError as e: + raise ValueError( + f"Failed to create bucket {self.bucket_name}" + ) from e + + yield @property def s3_client(self) -> S3Client: @@ -116,7 +115,7 @@ async def initiate_sandbox_upload( await sandbox_metadata_db.update_sandbox_last_access_time(pfn) return SandboxUploadResponse(pfn=pfn) - upload_info = generate_presigned_upload( + upload_info = await generate_presigned_upload( settings.s3_client, settings.bucket_name, pfn_to_key(pfn), @@ -166,7 +165,7 @@ async def get_sandbox_file( """ # TODO: Prevent people from downloading other people's sandboxes? # TODO: Support by name and by job id? - presigned_url = settings.s3_client.generate_presigned_url( + presigned_url = await settings.s3_client.generate_presigned_url( ClientMethod="get_object", Params={"Bucket": settings.bucket_name, "Key": pfn_to_key(pfn)}, ExpiresIn=settings.url_validity_seconds, diff --git a/tests/conftest.py b/tests/conftest.py index 60ec907f..87f33410 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -14,7 +14,7 @@ from cryptography.hazmat.primitives.asymmetric import rsa from fastapi.testclient import TestClient from git import Repo -from moto import mock_s3 +from moto.server import ThreadedMotoServer from diracx.core.config import Config, ConfigSource from diracx.core.preferences import get_diracx_preferences @@ -80,14 +80,31 @@ def test_auth_settings() -> AuthSettings: ) +@pytest.fixture(scope="session") +def aio_moto(): + """Start the moto server in a separate thread and return the base URL + + The mocking provided by moto doesn't play nicely with aiobotocore so we use + the server directly. See https://github.com/aio-libs/aiobotocore/issues/755 + """ + port = 27132 + server = ThreadedMotoServer(port=port) + server.start() + yield { + "endpoint_url": f"http://localhost:{port}", + "aws_access_key_id": "testing", + "aws_secret_access_key": "testing", + } + server.stop() + + @pytest.fixture(scope="function") -def test_sandbox_settings() -> SandboxStoreSettings: - with mock_s3(): - yield SandboxStoreSettings( - bucket_name="sandboxes", - s3_client_kwargs={}, - auto_create_bucket=True, - ) +def test_sandbox_settings(aio_moto) -> SandboxStoreSettings: + yield SandboxStoreSettings( + bucket_name="sandboxes", + s3_client_kwargs=aio_moto, + auto_create_bucket=True, + ) @pytest.fixture diff --git a/tests/core/test_s3.py b/tests/core/test_s3.py index c6acb2a3..4c6fad5d 100644 --- a/tests/core/test_s3.py +++ b/tests/core/test_s3.py @@ -4,10 +4,9 @@ import hashlib import secrets -import botocore.exceptions import pytest import requests -from moto import mock_s3 +from aiobotocore.session import get_session from diracx.core.s3 import ( b16_to_b64, @@ -43,42 +42,39 @@ def test_b16_to_b64_random(): @pytest.fixture(scope="function") -def moto_s3(): +async def moto_s3(aio_moto): """Very basic moto-based S3 backend. This is a fixture that can be used to test S3 interactions using moto. Note that this is not a complete S3 backend, in particular authentication and validation of requests is not implemented. """ - with mock_s3(): - client = botocore.session.get_session().create_client("s3") - client.create_bucket(Bucket=BUCKET_NAME) - client.create_bucket(Bucket=OTHER_BUCKET_NAME) + async with get_session().create_client("s3", **aio_moto) as client: + await client.create_bucket(Bucket=BUCKET_NAME) + await client.create_bucket(Bucket=OTHER_BUCKET_NAME) yield client -def test_s3_bucket_exists(moto_s3): - assert s3_bucket_exists(moto_s3, BUCKET_NAME) - assert not s3_bucket_exists(moto_s3, MISSING_BUCKET_NAME) +async def test_s3_bucket_exists(moto_s3): + assert await s3_bucket_exists(moto_s3, BUCKET_NAME) + assert not await s3_bucket_exists(moto_s3, MISSING_BUCKET_NAME) -def test_s3_object_exists(moto_s3): - with pytest.raises(botocore.exceptions.ClientError): - s3_object_exists(moto_s3, MISSING_BUCKET_NAME, "key") +async def test_s3_object_exists(moto_s3): + assert not await s3_object_exists(moto_s3, MISSING_BUCKET_NAME, "key") + assert not await s3_object_exists(moto_s3, BUCKET_NAME, "key") + await moto_s3.put_object(Bucket=BUCKET_NAME, Key="key", Body=b"hello") + assert await s3_object_exists(moto_s3, BUCKET_NAME, "key") - assert not s3_object_exists(moto_s3, BUCKET_NAME, "key") - moto_s3.put_object(Bucket=BUCKET_NAME, Key="key", Body=b"hello") - assert s3_object_exists(moto_s3, BUCKET_NAME, "key") - -def test_presigned_upload_moto(moto_s3): +async def test_presigned_upload_moto(moto_s3): """Test the presigned upload with moto This doesn't actually test the signature, see test_presigned_upload_minio """ file_content, checksum = _random_file(128) key = f"{checksum}.dat" - upload_info = generate_presigned_upload( + upload_info = await generate_presigned_upload( moto_s3, BUCKET_NAME, key, "sha256", checksum, len(file_content), 60 ) @@ -89,30 +85,32 @@ def test_presigned_upload_moto(moto_s3): assert r.status_code == 204, r.text # Make sure the object is actually there - obj = moto_s3.get_object(Bucket=BUCKET_NAME, Key=key) - assert obj["Body"].read() == file_content + obj = await moto_s3.get_object(Bucket=BUCKET_NAME, Key=key) + assert (await obj["Body"].read()) == file_content -@pytest.fixture(scope="session") -def minio_client(demo_urls): +@pytest.fixture(scope="function") +async def minio_client(demo_urls): """Create a S3 client that uses minio from the demo as backend""" - yield botocore.session.get_session().create_client( + async with get_session().create_client( "s3", endpoint_url=demo_urls["minio"], aws_access_key_id="console", aws_secret_access_key="console123", - ) + ) as client: + yield client -@pytest.fixture(scope="session") -def test_bucket(minio_client): +@pytest.fixture(scope="function") +async def test_bucket(minio_client): """Create a test bucket that is cleaned up after the test session""" bucket_name = f"dirac-test-{secrets.token_hex(8)}" - minio_client.create_bucket(Bucket=bucket_name) + await minio_client.create_bucket(Bucket=bucket_name) yield bucket_name - for obj in minio_client.list_objects(Bucket=bucket_name)["Contents"]: - minio_client.delete_object(Bucket=bucket_name, Key=obj["Key"]) - minio_client.delete_bucket(Bucket=bucket_name) + objects = await minio_client.list_objects(Bucket=bucket_name) + for obj in objects.get("Contents", []): + await minio_client.delete_object(Bucket=bucket_name, Key=obj["Key"]) + await minio_client.delete_bucket(Bucket=bucket_name) @pytest.mark.parametrize( @@ -127,7 +125,7 @@ def test_bucket(minio_client): [_random_file(128)[0], _random_file(128)[1], 128, "ContentChecksumMismatch"], ], ) -def test_presigned_upload_minio( +async def test_presigned_upload_minio( minio_client, test_bucket, content, checksum, size, expected_error ): """Test the presigned upload with Minio @@ -138,7 +136,7 @@ def test_presigned_upload_minio( """ key = f"{checksum}.dat" # Prepare the signed URL - upload_info = generate_presigned_upload( + upload_info = await generate_presigned_upload( minio_client, test_bucket, key, "sha256", checksum, size, 60 ) # Ensure the URL doesn't work @@ -147,8 +145,8 @@ def test_presigned_upload_minio( ) if expected_error is None: assert r.status_code == 204, r.text - assert s3_object_exists(minio_client, test_bucket, key) + assert await s3_object_exists(minio_client, test_bucket, key) else: assert r.status_code == 400, r.text assert expected_error in r.text - assert not s3_object_exists(minio_client, test_bucket, key) + assert not (await s3_object_exists(minio_client, test_bucket, key))