Skip to content

Commit

Permalink
Merge pull request #120 from chrisburr/use-aiobotocore
Browse files Browse the repository at this point in the history
Use aiobotocore
  • Loading branch information
chaen authored Oct 3, 2023
2 parents 4c5744b + c6797ad commit 3d80739
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 81 deletions.
2 changes: 2 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,6 @@ repos:
- types-PyYAML
- types-cachetools
- types-requests
- types-aiobotocore[essential]
- boto3-stubs[essential]
exclude: ^(src/diracx/client/|tests/|build)
8 changes: 4 additions & 4 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ dependencies:
- types-requests
- uvicorn
- moto
- mypy-boto3-s3
- aiobotocore
- botocore
- boto3-stubs
# - pip:
# - git+https://github.com/DIRACGrid/DIRAC.git@integration
- pip:
- types-aiobotocore[essential]
- boto3-stubs[essential]
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ package_dir =
= src
python_requires = >=3.10
install_requires =
aiobotocore
authlib
aiohttp
aiomysql
Expand Down
24 changes: 14 additions & 10 deletions src/diracx/core/s3.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
"""Utilities for interacting with S3-compatible storage."""
from __future__ import annotations

__all__ = ("s3_bucket_exists", "s3_object_exists", "generate_presigned_upload")
__all__ = (
"s3_bucket_exists",
"s3_object_exists",
"generate_presigned_upload",
)

import base64
from typing import TYPE_CHECKING, TypedDict, cast
Expand All @@ -11,27 +15,27 @@
from .models import ChecksumAlgorithm

if TYPE_CHECKING:
from mypy_boto3_s3.client import S3Client
from types_aiobotocore_s3.client import S3Client


class S3PresignedPostInfo(TypedDict):
url: str
fields: dict[str, str]


def s3_bucket_exists(s3_client: S3Client, bucket_name: str) -> bool:
async def s3_bucket_exists(s3_client: S3Client, bucket_name: str) -> bool:
"""Check if a bucket exists in S3."""
return _s3_exists(s3_client.head_bucket, Bucket=bucket_name)
return await _s3_exists(s3_client.head_bucket, Bucket=bucket_name)


def s3_object_exists(s3_client: S3Client, bucket_name: str, key: str) -> bool:
async def s3_object_exists(s3_client: S3Client, bucket_name: str, key: str) -> bool:
"""Check if an object exists in an S3 bucket."""
return _s3_exists(s3_client.head_object, Bucket=bucket_name, Key=key)
return await _s3_exists(s3_client.head_object, Bucket=bucket_name, Key=key)


def _s3_exists(method, **kwargs: str) -> bool:
async def _s3_exists(method, **kwargs: str) -> bool:
try:
method(**kwargs)
await method(**kwargs)
except ClientError as e:
if e.response["Error"]["Code"] != "404":
raise
Expand All @@ -40,7 +44,7 @@ def _s3_exists(method, **kwargs: str) -> bool:
return True


def generate_presigned_upload(
async def generate_presigned_upload(
s3_client: S3Client,
bucket_name: str,
key: str,
Expand All @@ -60,7 +64,7 @@ def generate_presigned_upload(
conditions = [["content-length-range", size, size]] + [
{k: v} for k, v in fields.items()
]
result = s3_client.generate_presigned_post(
result = await s3_client.generate_presigned_post(
Bucket=bucket_name,
Key=key,
Fields=fields,
Expand Down
47 changes: 23 additions & 24 deletions src/diracx/routers/job_manager/sandboxes.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from __future__ import annotations

import contextlib
from http import HTTPStatus
from typing import TYPE_CHECKING, Annotated
from typing import TYPE_CHECKING, Annotated, AsyncIterator

import botocore.session
from aiobotocore.session import get_session
from botocore.config import Config
from botocore.errorfactory import ClientError
from fastapi import Depends, HTTPException, Query
Expand All @@ -22,7 +23,7 @@
from diracx.core.settings import ServiceSettingsBase

if TYPE_CHECKING:
from mypy_boto3_s3.client import S3Client
from types_aiobotocore_s3.client import S3Client

from ..auth import AuthorizedUserInfo, has_properties, verify_dirac_access_token
from ..dependencies import SandboxMetadataDB, add_settings_annotation
Expand All @@ -42,28 +43,26 @@ class SandboxStoreSettings(ServiceSettingsBase, env_prefix="DIRACX_SANDBOX_STORE
url_validity_seconds: int = 5 * 60
_client: S3Client = PrivateAttr(None)

def __init__(self, **kwargs):
super().__init__(**kwargs)

# TODO: Use async
session = botocore.session.get_session()
self._client = session.create_client(
@contextlib.asynccontextmanager
async def lifetime_function(self) -> AsyncIterator[None]:
async with get_session().create_client(
"s3",
# endpoint_url=s3_cred["endpoint"],
# aws_access_key_id=s3_cred["access_key_id"],
# aws_secret_access_key=s3_cred["secret_access_key"],
**self.s3_client_kwargs,
config=Config(signature_version="v4"),
)
if not s3_bucket_exists(self._client, self.bucket_name):
if not self.auto_create_bucket:
raise ValueError(
f"Bucket {self.bucket_name} does not exist and auto_create_bucket is disabled"
)
try:
self._client.create_bucket(Bucket=self.bucket_name)
except ClientError as e:
raise ValueError(f"Failed to create bucket {self.bucket_name}") from e
) as self._client: # type: ignore
if not await s3_bucket_exists(self._client, self.bucket_name):
if not self.auto_create_bucket:
raise ValueError(
f"Bucket {self.bucket_name} does not exist and auto_create_bucket is disabled"
)
try:
await self._client.create_bucket(Bucket=self.bucket_name)
except ClientError as e:
raise ValueError(
f"Failed to create bucket {self.bucket_name}"
) from e

yield

@property
def s3_client(self) -> S3Client:
Expand Down Expand Up @@ -116,7 +115,7 @@ async def initiate_sandbox_upload(
await sandbox_metadata_db.update_sandbox_last_access_time(pfn)
return SandboxUploadResponse(pfn=pfn)

upload_info = generate_presigned_upload(
upload_info = await generate_presigned_upload(
settings.s3_client,
settings.bucket_name,
pfn_to_key(pfn),
Expand Down Expand Up @@ -166,7 +165,7 @@ async def get_sandbox_file(
"""
# TODO: Prevent people from downloading other people's sandboxes?
# TODO: Support by name and by job id?
presigned_url = settings.s3_client.generate_presigned_url(
presigned_url = await settings.s3_client.generate_presigned_url(
ClientMethod="get_object",
Params={"Bucket": settings.bucket_name, "Key": pfn_to_key(pfn)},
ExpiresIn=settings.url_validity_seconds,
Expand Down
33 changes: 25 additions & 8 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from cryptography.hazmat.primitives.asymmetric import rsa
from fastapi.testclient import TestClient
from git import Repo
from moto import mock_s3
from moto.server import ThreadedMotoServer

from diracx.core.config import Config, ConfigSource
from diracx.core.preferences import get_diracx_preferences
Expand Down Expand Up @@ -80,14 +80,31 @@ def test_auth_settings() -> AuthSettings:
)


@pytest.fixture(scope="session")
def aio_moto():
"""Start the moto server in a separate thread and return the base URL
The mocking provided by moto doesn't play nicely with aiobotocore so we use
the server directly. See https://github.com/aio-libs/aiobotocore/issues/755
"""
port = 27132
server = ThreadedMotoServer(port=port)
server.start()
yield {
"endpoint_url": f"http://localhost:{port}",
"aws_access_key_id": "testing",
"aws_secret_access_key": "testing",
}
server.stop()


@pytest.fixture(scope="function")
def test_sandbox_settings() -> SandboxStoreSettings:
with mock_s3():
yield SandboxStoreSettings(
bucket_name="sandboxes",
s3_client_kwargs={},
auto_create_bucket=True,
)
def test_sandbox_settings(aio_moto) -> SandboxStoreSettings:
yield SandboxStoreSettings(
bucket_name="sandboxes",
s3_client_kwargs=aio_moto,
auto_create_bucket=True,
)


@pytest.fixture
Expand Down
68 changes: 33 additions & 35 deletions tests/core/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@
import hashlib
import secrets

import botocore.exceptions
import pytest
import requests
from moto import mock_s3
from aiobotocore.session import get_session

from diracx.core.s3 import (
b16_to_b64,
Expand Down Expand Up @@ -43,42 +42,39 @@ def test_b16_to_b64_random():


@pytest.fixture(scope="function")
def moto_s3():
async def moto_s3(aio_moto):
"""Very basic moto-based S3 backend.
This is a fixture that can be used to test S3 interactions using moto.
Note that this is not a complete S3 backend, in particular authentication
and validation of requests is not implemented.
"""
with mock_s3():
client = botocore.session.get_session().create_client("s3")
client.create_bucket(Bucket=BUCKET_NAME)
client.create_bucket(Bucket=OTHER_BUCKET_NAME)
async with get_session().create_client("s3", **aio_moto) as client:
await client.create_bucket(Bucket=BUCKET_NAME)
await client.create_bucket(Bucket=OTHER_BUCKET_NAME)
yield client


def test_s3_bucket_exists(moto_s3):
assert s3_bucket_exists(moto_s3, BUCKET_NAME)
assert not s3_bucket_exists(moto_s3, MISSING_BUCKET_NAME)
async def test_s3_bucket_exists(moto_s3):
assert await s3_bucket_exists(moto_s3, BUCKET_NAME)
assert not await s3_bucket_exists(moto_s3, MISSING_BUCKET_NAME)


def test_s3_object_exists(moto_s3):
with pytest.raises(botocore.exceptions.ClientError):
s3_object_exists(moto_s3, MISSING_BUCKET_NAME, "key")
async def test_s3_object_exists(moto_s3):
assert not await s3_object_exists(moto_s3, MISSING_BUCKET_NAME, "key")
assert not await s3_object_exists(moto_s3, BUCKET_NAME, "key")
await moto_s3.put_object(Bucket=BUCKET_NAME, Key="key", Body=b"hello")
assert await s3_object_exists(moto_s3, BUCKET_NAME, "key")

assert not s3_object_exists(moto_s3, BUCKET_NAME, "key")
moto_s3.put_object(Bucket=BUCKET_NAME, Key="key", Body=b"hello")
assert s3_object_exists(moto_s3, BUCKET_NAME, "key")


def test_presigned_upload_moto(moto_s3):
async def test_presigned_upload_moto(moto_s3):
"""Test the presigned upload with moto
This doesn't actually test the signature, see test_presigned_upload_minio
"""
file_content, checksum = _random_file(128)
key = f"{checksum}.dat"
upload_info = generate_presigned_upload(
upload_info = await generate_presigned_upload(
moto_s3, BUCKET_NAME, key, "sha256", checksum, len(file_content), 60
)

Expand All @@ -89,30 +85,32 @@ def test_presigned_upload_moto(moto_s3):
assert r.status_code == 204, r.text

# Make sure the object is actually there
obj = moto_s3.get_object(Bucket=BUCKET_NAME, Key=key)
assert obj["Body"].read() == file_content
obj = await moto_s3.get_object(Bucket=BUCKET_NAME, Key=key)
assert (await obj["Body"].read()) == file_content


@pytest.fixture(scope="session")
def minio_client(demo_urls):
@pytest.fixture(scope="function")
async def minio_client(demo_urls):
"""Create a S3 client that uses minio from the demo as backend"""
yield botocore.session.get_session().create_client(
async with get_session().create_client(
"s3",
endpoint_url=demo_urls["minio"],
aws_access_key_id="console",
aws_secret_access_key="console123",
)
) as client:
yield client


@pytest.fixture(scope="session")
def test_bucket(minio_client):
@pytest.fixture(scope="function")
async def test_bucket(minio_client):
"""Create a test bucket that is cleaned up after the test session"""
bucket_name = f"dirac-test-{secrets.token_hex(8)}"
minio_client.create_bucket(Bucket=bucket_name)
await minio_client.create_bucket(Bucket=bucket_name)
yield bucket_name
for obj in minio_client.list_objects(Bucket=bucket_name)["Contents"]:
minio_client.delete_object(Bucket=bucket_name, Key=obj["Key"])
minio_client.delete_bucket(Bucket=bucket_name)
objects = await minio_client.list_objects(Bucket=bucket_name)
for obj in objects.get("Contents", []):
await minio_client.delete_object(Bucket=bucket_name, Key=obj["Key"])
await minio_client.delete_bucket(Bucket=bucket_name)


@pytest.mark.parametrize(
Expand All @@ -127,7 +125,7 @@ def test_bucket(minio_client):
[_random_file(128)[0], _random_file(128)[1], 128, "ContentChecksumMismatch"],
],
)
def test_presigned_upload_minio(
async def test_presigned_upload_minio(
minio_client, test_bucket, content, checksum, size, expected_error
):
"""Test the presigned upload with Minio
Expand All @@ -138,7 +136,7 @@ def test_presigned_upload_minio(
"""
key = f"{checksum}.dat"
# Prepare the signed URL
upload_info = generate_presigned_upload(
upload_info = await generate_presigned_upload(
minio_client, test_bucket, key, "sha256", checksum, size, 60
)
# Ensure the URL doesn't work
Expand All @@ -147,8 +145,8 @@ def test_presigned_upload_minio(
)
if expected_error is None:
assert r.status_code == 204, r.text
assert s3_object_exists(minio_client, test_bucket, key)
assert await s3_object_exists(minio_client, test_bucket, key)
else:
assert r.status_code == 400, r.text
assert expected_error in r.text
assert not s3_object_exists(minio_client, test_bucket, key)
assert not (await s3_object_exists(minio_client, test_bucket, key))

0 comments on commit 3d80739

Please sign in to comment.