Skip to content

Commit

Permalink
[DPE-5036][DPE-3666] Create s3 bucket if not existing (#58)
Browse files Browse the repository at this point in the history
  • Loading branch information
Batalex authored Dec 11, 2024
1 parent 4e008ae commit 2945bb6
Show file tree
Hide file tree
Showing 7 changed files with 1,219 additions and 439 deletions.
17 changes: 11 additions & 6 deletions charmcraft.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,24 @@
type: charm
parts:
charm:
build-snaps:
- rustup
charm-binary-python-packages:
- setuptools
- jsonschema
build-packages:
- libffi-dev
- libssl-dev
- pkg-config
- rustc
- cargo
override-build:
# get latest rust stable toolchain
rustup default stable

craftctl default
bases:
- build-on:
- name: ubuntu
channel: "22.04"
- name: ubuntu
channel: "22.04"
run-on:
- name: ubuntu
channel: "22.04"
- name: ubuntu
channel: "22.04"
1,431 changes: 1,033 additions & 398 deletions poetry.lock

Large diffs are not rendered by default.

21 changes: 10 additions & 11 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@ markers = ["unstable"]
line-length = 99
target-version = ["py310"]

[tool.isort]
profile = "black"

[tool.poetry]
name = "spark-history-server-k8s-operator"
version = "1.0"
description = "spark-history-server-k8s-operator"
authors = []
package-mode = false

[tool.poetry.dependencies]
python = ">=3.8,<4.0"
Expand All @@ -35,8 +33,7 @@ pydantic ="^1.10.7"
pyyaml = ">=6.0.1"
boto3 = ">=1.34.55"
jsonschema = "^4.19.1"
# FIXME: Unpin once rustc 1.76 is available at build time
rpds-py = "<0.19"
tenacity = ">=8.0.1"

[tool.poetry.group.charm-libs.dependencies]
ops = "^2.13.0"
Expand All @@ -58,6 +55,7 @@ optional = true
black = ">=22.3.0"
ruff = ">=0.2.2"
codespell = ">=2.2.2"
boto3-stubs = { extras = ["s3"], version = "^1.35.8" }

[tool.poetry.group.unit]
optional = true
Expand All @@ -68,6 +66,7 @@ pytest-asyncio = ">=0.21"
coverage = {extras = ["toml"], version = ">7.0"}
ops-scenario = "==6.1.7"
cosl = ">=0.0.32"
moto = {extras = ["s3"], version = "^5.0.22"}

[tool.poetry.group.integration]
optional = true
Expand All @@ -84,6 +83,11 @@ pytest-operator-cache = {git = "https://github.com/canonical/data-platform-workf

[tool.ruff]
line-length = 99
extend-exclude = ["__pycache__", "*.egg_info"]
target-version="py310"
src = ["src", "tests"]

[tool.ruff.lint]
select = ["E", "W", "F", "C", "N", "D", "I001"]
extend-ignore = [
"D203",
Expand All @@ -100,10 +104,5 @@ extend-ignore = [
"D413",
]
ignore = ["E501", "D107"]
extend-exclude = ["__pycache__", "*.egg_info"]
per-file-ignores = {"tests/*" = ["D100","D101","D102","D103","D104", "E999"], "src/literals.py" = ["D101"]}
target-version="py310"
src = ["src", "tests"]

[tool.ruff.mccabe]
max-complexity = 10
mccabe.max-complexity = 10
13 changes: 7 additions & 6 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
attrs==24.2.0 ; python_version >= "3.8" and python_version < "4.0"
boto3==1.35.26 ; python_version >= "3.8" and python_version < "4.0"
botocore==1.35.26 ; python_version >= "3.8" and python_version < "4.0"
boto3==1.35.76 ; python_version >= "3.8" and python_version < "4.0"
botocore==1.35.76 ; python_version >= "3.8" and python_version < "4.0"
importlib-resources==6.4.5 ; python_version >= "3.8" and python_version < "3.9"
jmespath==1.0.1 ; python_version >= "3.8" and python_version < "4.0"
jsonschema-specifications==2023.12.1 ; python_version >= "3.8" and python_version < "4.0"
jsonschema==4.23.0 ; python_version >= "3.8" and python_version < "4.0"
ops==2.16.0 ; python_version >= "3.8" and python_version < "4.0"
pkgutil-resolve-name==1.3.10 ; python_version >= "3.8" and python_version < "3.9"
pydantic==1.10.18 ; python_version >= "3.8" and python_version < "4.0"
pydantic==1.10.19 ; python_version >= "3.8" and python_version < "4.0"
python-dateutil==2.9.0.post0 ; python_version >= "3.8" and python_version < "4.0"
pyyaml==6.0.2 ; python_version >= "3.8" and python_version < "4.0"
referencing==0.35.1 ; python_version >= "3.8" and python_version < "4.0"
rpds-py==0.18.1 ; python_version >= "3.8" and python_version < "4.0"
s3transfer==0.10.2 ; python_version >= "3.8" and python_version < "4.0"
six==1.16.0 ; python_version >= "3.8" and python_version < "4.0"
rpds-py==0.20.1 ; python_version >= "3.8" and python_version < "4.0"
s3transfer==0.10.4 ; python_version >= "3.8" and python_version < "4.0"
six==1.17.0 ; python_version >= "3.8" and python_version < "4.0"
tenacity==8.5.0 ; python_version >= "3.8" and python_version < "4.0"
typing-extensions==4.12.2 ; python_version >= "3.8" and python_version < "4.0"
urllib3==1.26.20 ; python_version >= "3.8" and python_version < "3.10"
urllib3==2.2.3 ; python_version >= "3.10" and python_version < "4.0"
Expand Down
45 changes: 45 additions & 0 deletions src/managers/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,23 @@
# See LICENSE file for licensing details.

"""S3 manager."""
from __future__ import annotations

import os
import tempfile
from functools import cached_property
from typing import TYPE_CHECKING

import boto3
from botocore.exceptions import ClientError, SSLError
from tenacity import retry, retry_if_exception_cause_type, stop_after_attempt, wait_fixed

from common.utils import WithLogging
from core.domain import S3ConnectionInfo

if TYPE_CHECKING:
from mypy_boto3_s3.client import S3Client


class S3Manager(WithLogging):
"""Class exposing business logic for interacting with S3 service."""
Expand All @@ -28,6 +35,41 @@ def session(self):
aws_secret_access_key=self.config.secret_key,
)

def get_or_create_bucket(self, client: S3Client) -> bool:
"""Create bucket if it does not exists."""
bucket_name = self.config.bucket
bucket_exists = True

try:
client.head_bucket(Bucket=bucket_name)
except ClientError as ex:
if "(403)" in ex.args[0]:
self.logger.error("Wrong credentials or access to bucket is forbidden")
return False
elif "(404)" in ex.args[0]:
bucket_exists = False
else:
self.logger.info(f"Using existing bucket {bucket_name}")

if not bucket_exists:
client.create_bucket(Bucket=bucket_name)
self._wait_unit_exists(client)
self.logger.info(f"Created bucket {bucket_name}")

client.put_object(Bucket=bucket_name, Key=os.path.join(self.config.path, ""))

return True

@retry(
wait=wait_fixed(5),
stop=stop_after_attempt(20),
retry=retry_if_exception_cause_type(ClientError),
reraise=True,
)
def _wait_unit_exists(self, client: S3Client):
"""Poll s3 API until resource is found."""
client.head_bucket(Bucket=self.config.bucket)

def verify(self) -> bool:
"""Verify S3 credentials."""
with tempfile.NamedTemporaryFile() as ca_file:
Expand All @@ -54,4 +96,7 @@ def verify(self) -> bool:
self.logger.error(f"S3 related error {e}")
return False

if not self.get_or_create_bucket(s3):
return False

return True
18 changes: 0 additions & 18 deletions tests/integration/test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import subprocess
from pathlib import Path
from subprocess import PIPE, check_output
from time import sleep
from typing import Dict

import boto3
Expand Down Expand Up @@ -64,23 +63,6 @@ def setup_s3_bucket_for_history_server(
s3.delete_objects(Bucket=bucket_str, Delete={"Objects": objs})
s3.delete_bucket(Bucket=bucket_str)

logger.info("create bucket in minio")
for i in range(0, 30):
try:
s3.create_bucket(Bucket=bucket_str)
break
except Exception as e:
if i >= 30:
logger.error(f"create bucket failed....exiting....\n{str(e)}")
raise
else:
logger.warning(f"create bucket failed....retrying in 10 secs.....\n{str(e)}")
sleep(10)
continue

s3.put_object(Bucket=bucket_str, Key=("spark-events/"))
logger.debug(s3.list_buckets())


def setup_azure_container_for_history_server(container: str, path: str) -> None:
"""Setup azure container."""
Expand Down
113 changes: 113 additions & 0 deletions tests/unit/test_component_s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
from __future__ import annotations

import os
from typing import TYPE_CHECKING
from unittest.mock import Mock

import boto3
import pytest
from moto import mock_aws

from core.domain import S3ConnectionInfo
from managers.s3 import S3Manager

if TYPE_CHECKING:
from mypy_boto3_s3.client import S3Client


@pytest.fixture(scope="function")
def aws_credentials():
"""Mocked AWS Credentials for moto."""
os.environ["AWS_ACCESS_KEY_ID"] = "testing"
os.environ["AWS_SECRET_ACCESS_KEY"] = "testing"
os.environ["AWS_SECURITY_TOKEN"] = "testing"
os.environ["AWS_SESSION_TOKEN"] = "testing"
os.environ["AWS_DEFAULT_REGION"] = "us-east-1"


@pytest.fixture(scope="function")
def s3(aws_credentials):
"""Return a mocked S3 client.
All boto3 call will be mocked from this point.
"""
with mock_aws():
yield boto3.client("s3", region_name="us-east-1")


def test_bucket_created_on_verify(s3: S3Client) -> None:
"""If the bucket does not exist, we create it."""
# Given
bucket_name = "test_bucket"
connection_info = Mock(spec=S3ConnectionInfo)
connection_info.endpoint = ""
connection_info.access_key = ""
connection_info.secret_key = ""
connection_info.bucket = bucket_name
connection_info.path = "path"
connection_info.tls_ca_chain = []
s3_manager = S3Manager(connection_info)

assert not len(s3.list_buckets()["Buckets"])

# When
s3_manager.verify()

# Then
assert len(buckets := s3.list_buckets()["Buckets"]) == 1
assert buckets[0].get("Name", "") == bucket_name
# Note that the path provided as been transformed into a directory structure
assert "Contents" in s3.list_objects_v2(Bucket=bucket_name, Prefix="path/", MaxKeys=1)


def test_bucket_existing_path_created_on_verify(s3: S3Client) -> None:
"""If the bucket does exist, we use it and add the path."""
# Given
bucket_name = "test_bucket"
connection_info = Mock(spec=S3ConnectionInfo)
connection_info.endpoint = ""
connection_info.access_key = ""
connection_info.secret_key = ""
connection_info.bucket = bucket_name
connection_info.path = "path"
connection_info.tls_ca_chain = []
s3_manager = S3Manager(connection_info)

s3.create_bucket(Bucket=bucket_name)
assert len(buckets := s3.list_buckets()["Buckets"]) == 1
assert buckets[0].get("Name", "") == bucket_name

# When
s3_manager.verify()

# Then
assert len(buckets := s3.list_buckets()["Buckets"]) == 1
# Note that the path provided as been transformed into a directory structure
assert "Contents" in s3.list_objects_v2(Bucket=bucket_name, Prefix="path/", MaxKeys=1)


def test_path_existing_still_ok_on_verify(s3: S3Client) -> None:
"""If the path already exists, safe to overwrite it."""
# Given
bucket_name = "test_bucket"
connection_info = Mock(spec=S3ConnectionInfo)
connection_info.endpoint = ""
connection_info.access_key = ""
connection_info.secret_key = ""
connection_info.bucket = bucket_name
connection_info.path = "path"
connection_info.tls_ca_chain = []
s3_manager = S3Manager(connection_info)

s3.create_bucket(Bucket=bucket_name)
s3.put_object(Bucket=bucket_name, Key="path/")
assert len(buckets := s3.list_buckets()["Buckets"]) == 1
assert buckets[0].get("Name", "") == bucket_name

# When
s3_manager.verify()

# Then
assert len(buckets := s3.list_buckets()["Buckets"]) == 1
# Note that the path provided as been transformed into a directory structure
assert "Contents" in s3.list_objects_v2(Bucket=bucket_name, Prefix="path/", MaxKeys=3)

0 comments on commit 2945bb6

Please sign in to comment.