From 35fa357102dc2911a93b7ede77a8486093564a31 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Tue, 6 Dec 2022 17:09:47 +0100 Subject: [PATCH] moving ec2 client to its own module --- .../dynamic_scaling_core.py | 6 +- .../modules/__init__.py | 0 .../modules/ec2.py | 37 ++-- .../simcore_service_autoscaling/utils_aws.py | 176 ++++++++---------- services/autoscaling/tests/unit/conftest.py | 12 +- services/autoscaling/tests/unit/test_ec2.py | 41 ++++ .../autoscaling/tests/unit/test_utils_aws.py | 23 --- 7 files changed, 152 insertions(+), 143 deletions(-) create mode 100644 services/autoscaling/src/simcore_service_autoscaling/modules/__init__.py create mode 100644 services/autoscaling/tests/unit/test_ec2.py diff --git a/services/autoscaling/src/simcore_service_autoscaling/dynamic_scaling_core.py b/services/autoscaling/src/simcore_service_autoscaling/dynamic_scaling_core.py index 60fcea5a384b..d563c65e4e53 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/dynamic_scaling_core.py +++ b/services/autoscaling/src/simcore_service_autoscaling/dynamic_scaling_core.py @@ -11,6 +11,7 @@ from ._meta import VERSION from .core.errors import Ec2InstanceNotFoundError from .core.settings import ApplicationSettings +from .modules.ec2 import get_ec2_client from .utils import rabbitmq logger = logging.getLogger(__name__) @@ -61,8 +62,9 @@ async def check_dynamic_resources(app: FastAPI) -> None: assert app_settings.AUTOSCALING_EC2_ACCESS # nosec assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec + ec2_client = get_ec2_client(app) list_of_ec2_instances = await utils_aws.get_ec2_instance_capabilities( - app_settings.AUTOSCALING_EC2_ACCESS, app_settings.AUTOSCALING_EC2_INSTANCES + ec2_client, app_settings.AUTOSCALING_EC2_INSTANCES ) for task in pending_tasks: @@ -85,7 +87,7 @@ async def check_dynamic_resources(app: FastAPI) -> None: logger.debug("%s", f"{ec2_instances_needed[0]=}") new_instance_dns_name = await utils_aws.start_aws_instance( - app_settings.AUTOSCALING_EC2_ACCESS, + ec2_client, app_settings.AUTOSCALING_EC2_INSTANCES, instance_type=parse_obj_as( InstanceTypeType, ec2_instances_needed[0].name diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/__init__.py b/services/autoscaling/src/simcore_service_autoscaling/modules/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/ec2.py b/services/autoscaling/src/simcore_service_autoscaling/modules/ec2.py index 141199085d37..da71a98aa4dd 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/ec2.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/ec2.py @@ -15,19 +15,13 @@ @dataclass -class AutoscalingEC2Client: - session: aioboto3.Session +class AutoscalingEC2: client: EC2Client + session: aioboto3.Session + exit_stack: contextlib.AsyncExitStack - -def setup(app: FastAPI) -> None: - async def on_startup() -> None: - app.state.ec2_client = None - settings: Optional[EC2Settings] = app.state.settings.EC2Settings - if not settings: - logger.warning("EC2 client is de-activated in the settings") - return - + @classmethod + async def create(cls, settings: EC2Settings) -> "AutoscalingEC2": session = aioboto3.Session() session_client = session.client( "ec2", @@ -41,20 +35,33 @@ async def on_startup() -> None: ec2_client = cast( EC2Client, await exit_stack.enter_async_context(session_client) ) + return cls(ec2_client, session, exit_stack) + + async def close(self) -> None: + await self.exit_stack.aclose() + + +def setup(app: FastAPI) -> None: + async def on_startup() -> None: + app.state.ec2_client = None + settings: Optional[EC2Settings] = app.state.settings.EC2Settings + if not settings: + logger.warning("EC2 client is de-activated in the settings") + return - app.state.ec2_client = ec2_client + app.state.ec2_client = await AutoscalingEC2.create(settings) async def on_shutdown() -> None: if app.state.ec2_client: - await app.state.ec2_client.close() + await cast(AutoscalingEC2, app.state.ec2_client).close() app.add_event_handler("startup", on_startup) app.add_event_handler("shutdown", on_shutdown) -def get_ec2_client(app: FastAPI) -> RabbitMQClient: +def get_ec2_client(app: FastAPI) -> EC2Client: if not app.state.ec2_client: raise ConfigurationError( msg="EC2 client is not available. Please check the configuration." ) - return cast(RabbitMQClient, app.state.ec2_client) + return cast(AutoscalingEC2, app.state.ec2_client).client diff --git a/services/autoscaling/src/simcore_service_autoscaling/utils_aws.py b/services/autoscaling/src/simcore_service_autoscaling/utils_aws.py index 0e554ca307a0..0f32a4632596 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/utils_aws.py +++ b/services/autoscaling/src/simcore_service_autoscaling/utils_aws.py @@ -6,9 +6,8 @@ import logging from collections import OrderedDict from textwrap import dedent -from typing import AsyncIterator, Callable, cast +from typing import Callable, cast -import aioboto3 from pydantic import ByteSize, parse_obj_as from servicelib.logging_utils import log_context from types_aiobotocore_ec2 import EC2Client @@ -20,39 +19,21 @@ Ec2InstanceNotFoundError, Ec2TooManyInstancesError, ) -from .core.settings import EC2InstancesSettings, EC2Settings +from .core.settings import EC2InstancesSettings from .models import EC2Instance, Resources logger = logging.getLogger(__name__) -@contextlib.asynccontextmanager -async def ec2_client(settings: EC2Settings) -> AsyncIterator[EC2Client]: - ec2 = None - try: - session = aioboto3.Session() - async with session.client( - "ec2", - endpoint_url=settings.EC2_ENDPOINT, - aws_access_key_id=settings.EC2_ACCESS_KEY_ID, - aws_secret_access_key=settings.EC2_SECRET_ACCESS_KEY, - region_name=settings.EC2_REGION_NAME, - ) as ec2: - yield ec2 - finally: - if ec2: - await ec2.close() - - async def get_ec2_instance_capabilities( - settings: EC2Settings, instance_settings: EC2InstancesSettings + ec2_client: EC2Client, + instance_settings: EC2InstancesSettings, ) -> list[EC2Instance]: - async with ec2_client(settings) as ec2: - instance_types = await ec2.describe_instance_types( - InstanceTypes=cast( - list[InstanceTypeType], instance_settings.EC2_INSTANCES_ALLOWED_TYPES - ) + instance_types = await ec2_client.describe_instance_types( + InstanceTypes=cast( + list[InstanceTypeType], instance_settings.EC2_INSTANCES_ALLOWED_TYPES ) + ) list_instances: list[EC2Instance] = [] for instance in instance_types.get("InstanceTypes", []): @@ -127,7 +108,7 @@ def _is_ec2_instance_running(instance: ReservationTypeDef): async def start_aws_instance( - settings: EC2Settings, + ec2_client: EC2Client, instance_settings: EC2InstancesSettings, instance_type: InstanceTypeType, tags: dict[str, str], @@ -138,76 +119,73 @@ async def start_aws_instance( logging.INFO, msg=f"launching AWS instance {instance_type} with {tags=}", ): - async with ec2_client(settings) as client: - # first check the max amount is not already reached - if current_instances := await client.describe_instances( - Filters=[ - {"Name": "tag-key", "Values": [tag_key]} for tag_key in tags.keys() - ] + # first check the max amount is not already reached + if current_instances := await ec2_client.describe_instances( + Filters=[ + {"Name": "tag-key", "Values": [tag_key]} for tag_key in tags.keys() + ] + ): + if ( + len(current_instances.get("Reservations", [])) + >= instance_settings.EC2_INSTANCES_MAX_INSTANCES + ) and all( + _is_ec2_instance_running(instance) + for instance in current_instances["Reservations"] ): - if ( - len(current_instances.get("Reservations", [])) - >= instance_settings.EC2_INSTANCES_MAX_INSTANCES - ) and all( - _is_ec2_instance_running(instance) - for instance in current_instances["Reservations"] - ): - raise Ec2TooManyInstancesError( - num_instances=instance_settings.EC2_INSTANCES_MAX_INSTANCES - ) - - instances = await client.run_instances( - ImageId=instance_settings.EC2_INSTANCES_AMI_ID, - MinCount=1, - MaxCount=1, - InstanceType=instance_type, - InstanceInitiatedShutdownBehavior="terminate", - KeyName=instance_settings.EC2_INSTANCES_KEY_NAME, - SubnetId=instance_settings.EC2_INSTANCES_SUBNET_ID, - TagSpecifications=[ - { - "ResourceType": "instance", - "Tags": [ - {"Key": tag_key, "Value": tag_value} - for tag_key, tag_value in tags.items() - ], - } - ], - UserData=_compose_user_data(startup_script), - SecurityGroupIds=instance_settings.EC2_INSTANCES_SECURITY_GROUP_IDS, - ) - instance_id = instances["Instances"][0]["InstanceId"] - logger.info( - "New instance launched: %s, waiting for it to start now...", instance_id - ) - # wait for the instance to be in a running state - # NOTE: reference to EC2 states https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-lifecycle.html - waiter = client.get_waiter("instance_exists") - await waiter.wait(InstanceIds=[instance_id]) - logger.info( - "instance %s exists now, waiting for running state...", instance_id - ) - - waiter = client.get_waiter("instance_running") - await waiter.wait(InstanceIds=[instance_id]) - logger.info("instance %s is now running", instance_id) - - # NOTE: this is currently disactivated as this makes starting an instance - # take between 2-4 minutes more and it seems to be responsive much before - # nevertheless if we get weird errors, this should be activated again! - - # waiter = client.get_waiter("instance_status_ok") - # await waiter.wait(InstanceIds=[instance_id]) - # logger.info("instance %s status is OK...", instance_id) + raise Ec2TooManyInstancesError( + num_instances=instance_settings.EC2_INSTANCES_MAX_INSTANCES + ) - # get the private IP - instances = await client.describe_instances(InstanceIds=[instance_id]) - private_dns_name: str = instances["Reservations"][0]["Instances"][0][ - "PrivateDnsName" - ] - logger.info( - "instance %s is available on %s, happy computing!!", - instance_id, - private_dns_name, - ) - return private_dns_name + instances = await ec2_client.run_instances( + ImageId=instance_settings.EC2_INSTANCES_AMI_ID, + MinCount=1, + MaxCount=1, + InstanceType=instance_type, + InstanceInitiatedShutdownBehavior="terminate", + KeyName=instance_settings.EC2_INSTANCES_KEY_NAME, + SubnetId=instance_settings.EC2_INSTANCES_SUBNET_ID, + TagSpecifications=[ + { + "ResourceType": "instance", + "Tags": [ + {"Key": tag_key, "Value": tag_value} + for tag_key, tag_value in tags.items() + ], + } + ], + UserData=_compose_user_data(startup_script), + SecurityGroupIds=instance_settings.EC2_INSTANCES_SECURITY_GROUP_IDS, + ) + instance_id = instances["Instances"][0]["InstanceId"] + logger.info( + "New instance launched: %s, waiting for it to start now...", instance_id + ) + # wait for the instance to be in a running state + # NOTE: reference to EC2 states https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-lifecycle.html + waiter = ec2_client.get_waiter("instance_exists") + await waiter.wait(InstanceIds=[instance_id]) + logger.info("instance %s exists now, waiting for running state...", instance_id) + + waiter = ec2_client.get_waiter("instance_running") + await waiter.wait(InstanceIds=[instance_id]) + logger.info("instance %s is now running", instance_id) + + # NOTE: this is currently disactivated as this makes starting an instance + # take between 2-4 minutes more and it seems to be responsive much before + # nevertheless if we get weird errors, this should be activated again! + + # waiter = client.get_waiter("instance_status_ok") + # await waiter.wait(InstanceIds=[instance_id]) + # logger.info("instance %s status is OK...", instance_id) + + # get the private IP + instances = await ec2_client.describe_instances(InstanceIds=[instance_id]) + private_dns_name: str = instances["Reservations"][0]["Instances"][0][ + "PrivateDnsName" + ] + logger.info( + "instance %s is available on %s, happy computing!!", + instance_id, + private_dns_name, + ) + return private_dns_name diff --git a/services/autoscaling/tests/unit/conftest.py b/services/autoscaling/tests/unit/conftest.py index fce71fd792bb..50e9ed54a9a6 100644 --- a/services/autoscaling/tests/unit/conftest.py +++ b/services/autoscaling/tests/unit/conftest.py @@ -39,8 +39,8 @@ from simcore_service_autoscaling.core.application import create_app from simcore_service_autoscaling.core.settings import ApplicationSettings, EC2Settings from simcore_service_autoscaling.models import SimcoreServiceDockerLabelKeys +from simcore_service_autoscaling.modules.ec2 import AutoscalingEC2 from simcore_service_autoscaling.utils_aws import EC2Client -from simcore_service_autoscaling.utils_aws import ec2_client as autoscaling_ec2_client from tenacity import retry from tenacity._asyncio import AsyncRetrying from tenacity.retry import retry_if_exception_type @@ -512,10 +512,14 @@ async def aws_ami_id( @pytest.fixture -async def ec2_client() -> AsyncIterator[EC2Client]: +async def ec2_client( + app_environment: EnvVarsDict, +) -> AsyncIterator[EC2Client]: settings = EC2Settings.create_from_envs() - async with autoscaling_ec2_client(settings) as client: - yield client + ec2 = await AutoscalingEC2.create(settings) + assert ec2 + yield ec2.client + await ec2.close() @pytest.fixture diff --git a/services/autoscaling/tests/unit/test_ec2.py b/services/autoscaling/tests/unit/test_ec2.py new file mode 100644 index 000000000000..6d80e6a679aa --- /dev/null +++ b/services/autoscaling/tests/unit/test_ec2.py @@ -0,0 +1,41 @@ +# pylint: disable=redefined-outer-name +# pylint: disable=unused-argument +# pylint: disable=unused-variable + +import botocore.exceptions +import pytest +from pytest_simcore.helpers.utils_envs import EnvVarsDict +from simcore_service_autoscaling.core.settings import EC2Settings +from simcore_service_autoscaling.modules.ec2 import AutoscalingEC2 +from types_aiobotocore_ec2 import EC2Client + + +@pytest.fixture +def ec2_settings( + app_environment: EnvVarsDict, +) -> EC2Settings: + return EC2Settings.create_from_envs() + + +async def test_ec2_client_lifespan(ec2_settings: EC2Settings): + ec2 = await AutoscalingEC2.create(settings=ec2_settings) + assert ec2 + assert ec2.client + assert ec2.exit_stack + assert ec2.session + + await ec2.close() + + +async def test_ec2_client_raises_when_no_connection_available(ec2_client: EC2Client): + with pytest.raises( + botocore.exceptions.ClientError, match=r".+ AWS was not able to validate .+" + ): + await ec2_client.describe_account_attributes(DryRun=True) + + +async def test_ec2_client_with_mock_server( + mocked_aws_server_envs: None, ec2_client: EC2Client +): + # passes without exception + await ec2_client.describe_account_attributes(DryRun=True) diff --git a/services/autoscaling/tests/unit/test_utils_aws.py b/services/autoscaling/tests/unit/test_utils_aws.py index 6f246d1b1651..68c73472edf7 100644 --- a/services/autoscaling/tests/unit/test_utils_aws.py +++ b/services/autoscaling/tests/unit/test_utils_aws.py @@ -5,7 +5,6 @@ import random -import botocore.exceptions import pytest from faker import Faker from pydantic import ByteSize @@ -22,7 +21,6 @@ EC2Instance, _compose_user_data, closest_instance_policy, - ec2_client, find_best_fitting_ec2_instance, get_ec2_instance_capabilities, start_aws_instance, @@ -36,27 +34,6 @@ def app_settings( return ApplicationSettings.create_from_envs() -async def test_ec2_client(app_settings: ApplicationSettings): - assert app_settings.AUTOSCALING_EC2_ACCESS - async with ec2_client(app_settings.AUTOSCALING_EC2_ACCESS) as client: - ... - - with pytest.raises( - botocore.exceptions.ClientError, match=r".+ AWS was not able to validate .+" - ): - async with ec2_client(app_settings.AUTOSCALING_EC2_ACCESS) as client: - await client.describe_account_attributes(DryRun=True) - - -async def test_ec2_client_with_mock_server( - mocked_aws_server_envs: None, app_settings: ApplicationSettings -): - # passes without exception - assert app_settings.AUTOSCALING_EC2_ACCESS - async with ec2_client(app_settings.AUTOSCALING_EC2_ACCESS) as client: - await client.describe_account_attributes(DryRun=True) - - async def test_get_ec2_instance_capabilities( mocked_aws_server_envs: None, aws_allowed_ec2_instance_type_names: list[str],