From 0f04727b1f36760d7c08dee8c5b331e3cdab9f10 Mon Sep 17 00:00:00 2001 From: Sylvain <35365065+sanderegg@users.noreply.github.com> Date: Thu, 8 Dec 2022 15:33:41 +0100 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20Autoscaling:=20have=20only=20one=20?= =?UTF-8?q?ec2=20client=20(#3643)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../simcore_service_autoscaling/api/health.py | 25 +- .../core/application.py | 4 +- .../core/errors.py | 9 +- .../dynamic_scaling_core.py | 18 +- .../modules/__init__.py | 0 .../modules/ec2.py | 218 +++++++++++++++++ .../{ => modules}/rabbitmq.py | 2 +- .../simcore_service_autoscaling/utils/ec2.py | 60 +++++ .../utils/rabbitmq.py | 2 +- .../simcore_service_autoscaling/utils_aws.py | 213 ----------------- services/autoscaling/tests/unit/conftest.py | 32 +-- .../autoscaling/tests/unit/test_api_health.py | 95 ++++++++ .../tests/unit/test_dynamic_scaling_core.py | 9 +- .../tests/unit/test_dynamic_scaling_task.py | 1 + .../tests/unit/test_health_route.py | 56 ----- .../tests/unit/test_modules_ec2.py | 187 +++++++++++++++ ...t_rabbitmq.py => test_modules_rabbitmq.py} | 12 +- .../autoscaling/tests/unit/test_utils_aws.py | 221 ------------------ .../autoscaling/tests/unit/test_utils_ec2.py | 88 +++++++ .../tests/unit/test_utils_rabbitmq.py | 3 + 20 files changed, 719 insertions(+), 536 deletions(-) create mode 100644 services/autoscaling/src/simcore_service_autoscaling/modules/__init__.py create mode 100644 services/autoscaling/src/simcore_service_autoscaling/modules/ec2.py rename services/autoscaling/src/simcore_service_autoscaling/{ => modules}/rabbitmq.py (97%) create mode 100644 services/autoscaling/src/simcore_service_autoscaling/utils/ec2.py delete mode 100644 services/autoscaling/src/simcore_service_autoscaling/utils_aws.py create mode 100644 services/autoscaling/tests/unit/test_api_health.py delete mode 100644 services/autoscaling/tests/unit/test_health_route.py create mode 100644 services/autoscaling/tests/unit/test_modules_ec2.py rename services/autoscaling/tests/unit/{test_rabbitmq.py => test_modules_rabbitmq.py} (94%) delete mode 100644 services/autoscaling/tests/unit/test_utils_aws.py create mode 100644 services/autoscaling/tests/unit/test_utils_ec2.py diff --git a/services/autoscaling/src/simcore_service_autoscaling/api/health.py b/services/autoscaling/src/simcore_service_autoscaling/api/health.py index 4bc8824453d..7c616e31461 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/api/health.py +++ b/services/autoscaling/src/simcore_service_autoscaling/api/health.py @@ -10,7 +10,7 @@ from fastapi.responses import PlainTextResponse from pydantic import BaseModel -from ..rabbitmq import get_rabbitmq_client +from ..modules.rabbitmq import get_rabbitmq_client from .dependencies.application import get_app router = APIRouter() @@ -22,23 +22,30 @@ async def health_check(): return f"{__name__}.health_check@{datetime.utcnow().isoformat()}" -class _RabbitMQStatus(BaseModel): - initialized: bool - connection_state: bool +class _ComponentStatus(BaseModel): + is_enabled: bool + is_responsive: bool class _StatusGet(BaseModel): - rabbitmq: _RabbitMQStatus + rabbitmq: _ComponentStatus + ec2: _ComponentStatus @router.get("/status", include_in_schema=True, response_model=_StatusGet) async def get_status(app: FastAPI = Depends(get_app)) -> _StatusGet: return _StatusGet( - rabbitmq=_RabbitMQStatus( - initialized=bool(app.state.rabbitmq_client), - connection_state=await get_rabbitmq_client(app).ping() + rabbitmq=_ComponentStatus( + is_enabled=bool(app.state.rabbitmq_client), + is_responsive=await get_rabbitmq_client(app).ping() if app.state.rabbitmq_client else False, - ) + ), + ec2=_ComponentStatus( + is_enabled=bool(app.state.ec2_client), + is_responsive=await app.state.ec2_client.ping() + if app.state.ec2_client + else False, + ), ) diff --git a/services/autoscaling/src/simcore_service_autoscaling/core/application.py b/services/autoscaling/src/simcore_service_autoscaling/core/application.py index 8703ea5ae27..b71b44db72a 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/core/application.py +++ b/services/autoscaling/src/simcore_service_autoscaling/core/application.py @@ -11,7 +11,8 @@ ) from ..api.routes import setup_api_routes from ..dynamic_scaling import setup as setup_background_task -from ..rabbitmq import setup as setup_rabbitmq +from ..modules.ec2 import setup as setup_ec2 +from ..modules.rabbitmq import setup as setup_rabbitmq from .settings import ApplicationSettings logger = logging.getLogger(__name__) @@ -37,6 +38,7 @@ def create_app(settings: ApplicationSettings) -> FastAPI: # PLUGINS SETUP setup_api_routes(app) setup_rabbitmq(app) + setup_ec2(app) # autoscaler background task setup_background_task(app) diff --git a/services/autoscaling/src/simcore_service_autoscaling/core/errors.py b/services/autoscaling/src/simcore_service_autoscaling/core/errors.py index 109793b8c2f..42fa28bce1b 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/core/errors.py +++ b/services/autoscaling/src/simcore_service_autoscaling/core/errors.py @@ -2,21 +2,22 @@ class AutoscalingRuntimeError(PydanticErrorMixin, RuntimeError): - ... + msg_template: str = "Autoscaling unexpected error" class ConfigurationError(AutoscalingRuntimeError): - code = "autoscaling.configuration_error" msg_template: str = "Application misconfiguration: {msg}" +class Ec2NotConnectedError(AutoscalingRuntimeError): + msg_template: str = "Cannot connect with ec2 server" + + class Ec2InstanceNotFoundError(AutoscalingRuntimeError): - code = "autoscaling.ec2_instance_not_found_error" msg_template: str = "Needed instance was not found" class Ec2TooManyInstancesError(AutoscalingRuntimeError): - code = "autoscaling.ec2_too_many_instances_error" msg_template: str = ( "The maximum amount of instances {num_instances} is already reached!" ) diff --git a/services/autoscaling/src/simcore_service_autoscaling/dynamic_scaling_core.py b/services/autoscaling/src/simcore_service_autoscaling/dynamic_scaling_core.py index 60fcea5a384..148ee2dd053 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/dynamic_scaling_core.py +++ b/services/autoscaling/src/simcore_service_autoscaling/dynamic_scaling_core.py @@ -7,11 +7,12 @@ from pydantic import parse_obj_as from types_aiobotocore_ec2.literals import InstanceTypeType -from . import utils_aws, utils_docker +from . import utils_docker from ._meta import VERSION from .core.errors import Ec2InstanceNotFoundError from .core.settings import ApplicationSettings -from .utils import rabbitmq +from .modules.ec2 import get_ec2_client +from .utils import ec2, rabbitmq logger = logging.getLogger(__name__) @@ -61,10 +62,10 @@ async def check_dynamic_resources(app: FastAPI) -> None: assert app_settings.AUTOSCALING_EC2_ACCESS # nosec assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec - list_of_ec2_instances = await utils_aws.get_ec2_instance_capabilities( - app_settings.AUTOSCALING_EC2_ACCESS, app_settings.AUTOSCALING_EC2_INSTANCES + ec2_client = get_ec2_client(app) + list_of_ec2_instances = await ec2_client.get_ec2_instance_capabilities( + app_settings.AUTOSCALING_EC2_INSTANCES ) - for task in pending_tasks: await rabbitmq.post_log_message( app, @@ -74,18 +75,17 @@ async def check_dynamic_resources(app: FastAPI) -> None: ) try: ec2_instances_needed = [ - utils_aws.find_best_fitting_ec2_instance( + ec2.find_best_fitting_ec2_instance( list_of_ec2_instances, utils_docker.get_max_resources_from_docker_task(task), - score_type=utils_aws.closest_instance_policy, + score_type=ec2.closest_instance_policy, ) ] assert app_settings.AUTOSCALING_EC2_ACCESS # nosec assert app_settings.AUTOSCALING_NODES_MONITORING # nosec logger.debug("%s", f"{ec2_instances_needed[0]=}") - new_instance_dns_name = await utils_aws.start_aws_instance( - app_settings.AUTOSCALING_EC2_ACCESS, + new_instance_dns_name = await ec2_client.start_aws_instance( app_settings.AUTOSCALING_EC2_INSTANCES, instance_type=parse_obj_as( InstanceTypeType, ec2_instances_needed[0].name diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/__init__.py b/services/autoscaling/src/simcore_service_autoscaling/modules/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/ec2.py b/services/autoscaling/src/simcore_service_autoscaling/modules/ec2.py new file mode 100644 index 00000000000..9e8b46156c1 --- /dev/null +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/ec2.py @@ -0,0 +1,218 @@ +import contextlib +import logging +from dataclasses import dataclass +from typing import Optional, cast + +import aioboto3 +from aiobotocore.session import ClientCreatorContext +from fastapi import FastAPI +from pydantic import ByteSize, parse_obj_as +from servicelib.logging_utils import log_context +from tenacity._asyncio import AsyncRetrying +from tenacity.before_sleep import before_sleep_log +from tenacity.stop import stop_after_delay +from tenacity.wait import wait_random_exponential +from types_aiobotocore_ec2 import EC2Client +from types_aiobotocore_ec2.literals import InstanceTypeType +from types_aiobotocore_ec2.type_defs import ReservationTypeDef + +from ..core.errors import ( + ConfigurationError, + Ec2NotConnectedError, + Ec2TooManyInstancesError, +) +from ..core.settings import EC2InstancesSettings, EC2Settings +from ..models import EC2Instance +from ..utils.ec2 import compose_user_data + +InstancePrivateDNSName = str + +logger = logging.getLogger(__name__) + + +def _is_ec2_instance_running(instance: ReservationTypeDef): + return ( + instance.get("Instances", [{}])[0].get("State", {}).get("Name", "not_running") + == "running" + ) + + +@dataclass +class AutoscalingEC2: + client: EC2Client + session: aioboto3.Session + exit_stack: contextlib.AsyncExitStack + + @classmethod + async def create(cls, settings: EC2Settings) -> "AutoscalingEC2": + session = aioboto3.Session() + session_client = session.client( + "ec2", + endpoint_url=settings.EC2_ENDPOINT, + aws_access_key_id=settings.EC2_ACCESS_KEY_ID, + aws_secret_access_key=settings.EC2_SECRET_ACCESS_KEY, + region_name=settings.EC2_REGION_NAME, + ) + assert isinstance(session_client, ClientCreatorContext) # nosec + exit_stack = contextlib.AsyncExitStack() + ec2_client = cast( + EC2Client, await exit_stack.enter_async_context(session_client) + ) + return cls(ec2_client, session, exit_stack) + + async def close(self) -> None: + await self.exit_stack.aclose() + + async def ping(self) -> bool: + try: + await self.client.describe_account_attributes(DryRun=True) + return True + except Exception: # pylint: disable=broad-except + return False + + async def get_ec2_instance_capabilities( + self, + instance_settings: EC2InstancesSettings, + ) -> list[EC2Instance]: + instance_types = await self.client.describe_instance_types( + InstanceTypes=cast( + list[InstanceTypeType], instance_settings.EC2_INSTANCES_ALLOWED_TYPES + ) + ) + + list_instances: list[EC2Instance] = [] + for instance in instance_types.get("InstanceTypes", []): + with contextlib.suppress(KeyError): + list_instances.append( + EC2Instance( + name=instance["InstanceType"], + cpus=instance["VCpuInfo"]["DefaultVCpus"], + ram=parse_obj_as( + ByteSize, f"{instance['MemoryInfo']['SizeInMiB']}MiB" + ), + ) + ) + return list_instances + + async def start_aws_instance( + self, + instance_settings: EC2InstancesSettings, + instance_type: InstanceTypeType, + tags: dict[str, str], + startup_script: str, + ) -> InstancePrivateDNSName: + with log_context( + logger, + logging.INFO, + msg=f"launching AWS instance {instance_type} with {tags=}", + ): + # first check the max amount is not already reached + if current_instances := await self.client.describe_instances( + Filters=[ + {"Name": "tag-key", "Values": [tag_key]} for tag_key in tags.keys() + ] + ): + if ( + len(current_instances.get("Reservations", [])) + >= instance_settings.EC2_INSTANCES_MAX_INSTANCES + ) and all( + _is_ec2_instance_running(instance) + for instance in current_instances["Reservations"] + ): + raise Ec2TooManyInstancesError( + num_instances=instance_settings.EC2_INSTANCES_MAX_INSTANCES + ) + + instances = await self.client.run_instances( + ImageId=instance_settings.EC2_INSTANCES_AMI_ID, + MinCount=1, + MaxCount=1, + InstanceType=instance_type, + InstanceInitiatedShutdownBehavior="terminate", + KeyName=instance_settings.EC2_INSTANCES_KEY_NAME, + SubnetId=instance_settings.EC2_INSTANCES_SUBNET_ID, + TagSpecifications=[ + { + "ResourceType": "instance", + "Tags": [ + {"Key": tag_key, "Value": tag_value} + for tag_key, tag_value in tags.items() + ], + } + ], + UserData=compose_user_data(startup_script), + SecurityGroupIds=instance_settings.EC2_INSTANCES_SECURITY_GROUP_IDS, + ) + instance_id = instances["Instances"][0]["InstanceId"] + logger.info( + "New instance launched: %s, waiting for it to start now...", instance_id + ) + # wait for the instance to be in a running state + # NOTE: reference to EC2 states https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-lifecycle.html + waiter = self.client.get_waiter("instance_exists") + await waiter.wait(InstanceIds=[instance_id]) + logger.info( + "instance %s exists now, waiting for running state...", instance_id + ) + + waiter = self.client.get_waiter("instance_running") + await waiter.wait(InstanceIds=[instance_id]) + logger.info("instance %s is now running", instance_id) + + # NOTE: this is currently deactivated as this makes starting an instance + # take between 2-4 minutes more and it seems to be responsive much before + # nevertheless if we get weird errors, this should be activated again! + + # waiter = client.get_waiter("instance_status_ok") + # await waiter.wait(InstanceIds=[instance_id]) + # logger.info("instance %s status is OK...", instance_id) + + # get the private IP + instances = await self.client.describe_instances(InstanceIds=[instance_id]) + private_dns_name: str = instances["Reservations"][0]["Instances"][0][ + "PrivateDnsName" + ] + logger.info( + "instance %s is available on %s, happy computing!!", + instance_id, + private_dns_name, + ) + return private_dns_name + + +def setup(app: FastAPI) -> None: + async def on_startup() -> None: + app.state.ec2_client = None + settings: Optional[EC2Settings] = app.state.settings.AUTOSCALING_EC2_ACCESS + + if not settings: + logger.warning("EC2 client is de-activated in the settings") + return + + app.state.ec2_client = client = await AutoscalingEC2.create(settings) + + async for attempt in AsyncRetrying( + reraise=True, + stop=stop_after_delay(120), + wait=wait_random_exponential(max=30), + before_sleep=before_sleep_log(logger, logging.WARNING), + ): + with attempt: + connected = await client.ping() + if not connected: + raise Ec2NotConnectedError() + + async def on_shutdown() -> None: + if app.state.ec2_client: + await cast(AutoscalingEC2, app.state.ec2_client).close() + + app.add_event_handler("startup", on_startup) + app.add_event_handler("shutdown", on_shutdown) + + +def get_ec2_client(app: FastAPI) -> AutoscalingEC2: + if not app.state.ec2_client: + raise ConfigurationError( + msg="EC2 client is not available. Please check the configuration." + ) + return cast(AutoscalingEC2, app.state.ec2_client) diff --git a/services/autoscaling/src/simcore_service_autoscaling/rabbitmq.py b/services/autoscaling/src/simcore_service_autoscaling/modules/rabbitmq.py similarity index 97% rename from services/autoscaling/src/simcore_service_autoscaling/rabbitmq.py rename to services/autoscaling/src/simcore_service_autoscaling/modules/rabbitmq.py index e033dca70d8..8652aa8e234 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/rabbitmq.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/rabbitmq.py @@ -9,7 +9,7 @@ from servicelib.rabbitmq_utils import wait_till_rabbitmq_responsive from settings_library.rabbit import RabbitSettings -from .core.errors import ConfigurationError +from ..core.errors import ConfigurationError logger = logging.getLogger(__name__) diff --git a/services/autoscaling/src/simcore_service_autoscaling/utils/ec2.py b/services/autoscaling/src/simcore_service_autoscaling/utils/ec2.py new file mode 100644 index 00000000000..e9c41c0810c --- /dev/null +++ b/services/autoscaling/src/simcore_service_autoscaling/utils/ec2.py @@ -0,0 +1,60 @@ +""" Free helper functions for AWS API + +""" + +import logging +from collections import OrderedDict +from textwrap import dedent +from typing import Callable + +from ..core.errors import ConfigurationError, Ec2InstanceNotFoundError +from ..models import EC2Instance, Resources + +logger = logging.getLogger(__name__) + + +def compose_user_data(docker_join_bash_command: str) -> str: + return dedent( + f"""\ +#!/bin/bash +{docker_join_bash_command} +""" + ) + + +def closest_instance_policy( + ec2_instance: EC2Instance, + resources: Resources, +) -> float: + if ec2_instance.cpus < resources.cpus or ec2_instance.ram < resources.ram: + return 0 + # compute a score for all the instances that are above expectations + # best is the exact ec2 instance + cpu_ratio = float(ec2_instance.cpus - resources.cpus) / float(ec2_instance.cpus) + ram_ratio = float(ec2_instance.ram - resources.ram) / float(ec2_instance.ram) + return 100 * (1.0 - cpu_ratio) * (1.0 - ram_ratio) + + +def find_best_fitting_ec2_instance( + allowed_ec2_instances: list[EC2Instance], + resources: Resources, + score_type: Callable[[EC2Instance, Resources], float] = closest_instance_policy, +) -> EC2Instance: + if not allowed_ec2_instances: + raise ConfigurationError(msg="allowed ec2 instances is missing!") + score_to_ec2_candidate: dict[float, EC2Instance] = OrderedDict( + sorted( + { + score_type(instance, resources): instance + for instance in allowed_ec2_instances + }.items(), + reverse=True, + ) + ) + + score, instance = next(iter(score_to_ec2_candidate.items())) + if score == 0: + raise Ec2InstanceNotFoundError( + needed_resources=resources, msg="no adequate EC2 instance found!" + ) + return instance diff --git a/services/autoscaling/src/simcore_service_autoscaling/utils/rabbitmq.py b/services/autoscaling/src/simcore_service_autoscaling/utils/rabbitmq.py index e2f432a6fcb..1b5536e4f30 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/utils/rabbitmq.py +++ b/services/autoscaling/src/simcore_service_autoscaling/utils/rabbitmq.py @@ -10,7 +10,7 @@ from servicelib.logging_utils import log_catch from ..models import Resources, SimcoreServiceDockerLabelKeys -from ..rabbitmq import post_message +from ..modules.rabbitmq import post_message logger = logging.getLogger(__name__) diff --git a/services/autoscaling/src/simcore_service_autoscaling/utils_aws.py b/services/autoscaling/src/simcore_service_autoscaling/utils_aws.py deleted file mode 100644 index 0e554ca307a..00000000000 --- a/services/autoscaling/src/simcore_service_autoscaling/utils_aws.py +++ /dev/null @@ -1,213 +0,0 @@ -""" Free helper functions for AWS API - -""" - -import contextlib -import logging -from collections import OrderedDict -from textwrap import dedent -from typing import AsyncIterator, Callable, cast - -import aioboto3 -from pydantic import ByteSize, parse_obj_as -from servicelib.logging_utils import log_context -from types_aiobotocore_ec2 import EC2Client -from types_aiobotocore_ec2.literals import InstanceTypeType -from types_aiobotocore_ec2.type_defs import ReservationTypeDef - -from .core.errors import ( - ConfigurationError, - Ec2InstanceNotFoundError, - Ec2TooManyInstancesError, -) -from .core.settings import EC2InstancesSettings, EC2Settings -from .models import EC2Instance, Resources - -logger = logging.getLogger(__name__) - - -@contextlib.asynccontextmanager -async def ec2_client(settings: EC2Settings) -> AsyncIterator[EC2Client]: - ec2 = None - try: - session = aioboto3.Session() - async with session.client( - "ec2", - endpoint_url=settings.EC2_ENDPOINT, - aws_access_key_id=settings.EC2_ACCESS_KEY_ID, - aws_secret_access_key=settings.EC2_SECRET_ACCESS_KEY, - region_name=settings.EC2_REGION_NAME, - ) as ec2: - yield ec2 - finally: - if ec2: - await ec2.close() - - -async def get_ec2_instance_capabilities( - settings: EC2Settings, instance_settings: EC2InstancesSettings -) -> list[EC2Instance]: - async with ec2_client(settings) as ec2: - instance_types = await ec2.describe_instance_types( - InstanceTypes=cast( - list[InstanceTypeType], instance_settings.EC2_INSTANCES_ALLOWED_TYPES - ) - ) - - list_instances: list[EC2Instance] = [] - for instance in instance_types.get("InstanceTypes", []): - with contextlib.suppress(KeyError): - list_instances.append( - EC2Instance( - name=instance["InstanceType"], - cpus=instance["VCpuInfo"]["DefaultVCpus"], - ram=parse_obj_as( - ByteSize, f"{instance['MemoryInfo']['SizeInMiB']}MiB" - ), - ) - ) - return list_instances - - -def closest_instance_policy( - ec2_instance: EC2Instance, - resources: Resources, -) -> float: - if ec2_instance.cpus < resources.cpus or ec2_instance.ram < resources.ram: - return 0 - # compute a score for all the instances that are above expectations - # best is the exact ec2 instance - cpu_ratio = float(ec2_instance.cpus - resources.cpus) / float(ec2_instance.cpus) - ram_ratio = float(ec2_instance.ram - resources.ram) / float(ec2_instance.ram) - return 100 * (1.0 - cpu_ratio) * (1.0 - ram_ratio) - - -def find_best_fitting_ec2_instance( - allowed_ec2_instances: list[EC2Instance], - resources: Resources, - score_type: Callable[[EC2Instance, Resources], float] = closest_instance_policy, -) -> EC2Instance: - if not allowed_ec2_instances: - raise ConfigurationError(msg="allowed ec2 instances is missing!") - score_to_ec2_candidate: dict[float, EC2Instance] = OrderedDict( - sorted( - { - score_type(instance, resources): instance - for instance in allowed_ec2_instances - }.items(), - reverse=True, - ) - ) - - score, instance = next(iter(score_to_ec2_candidate.items())) - if score == 0: - raise Ec2InstanceNotFoundError( - needed_resources=resources, msg="no adequate EC2 instance found!" - ) - return instance - - -def _compose_user_data(docker_join_bash_command: str) -> str: - return dedent( - f"""\ -#!/bin/bash -{docker_join_bash_command} -""" - ) - - -def _is_ec2_instance_running(instance: ReservationTypeDef): - return ( - instance.get("Instances", [{}])[0].get("State", {}).get("Name", "not_running") - == "running" - ) - - -InstancePrivateDNSName = str - - -async def start_aws_instance( - settings: EC2Settings, - instance_settings: EC2InstancesSettings, - instance_type: InstanceTypeType, - tags: dict[str, str], - startup_script: str, -) -> InstancePrivateDNSName: - with log_context( - logger, - logging.INFO, - msg=f"launching AWS instance {instance_type} with {tags=}", - ): - async with ec2_client(settings) as client: - # first check the max amount is not already reached - if current_instances := await client.describe_instances( - Filters=[ - {"Name": "tag-key", "Values": [tag_key]} for tag_key in tags.keys() - ] - ): - if ( - len(current_instances.get("Reservations", [])) - >= instance_settings.EC2_INSTANCES_MAX_INSTANCES - ) and all( - _is_ec2_instance_running(instance) - for instance in current_instances["Reservations"] - ): - raise Ec2TooManyInstancesError( - num_instances=instance_settings.EC2_INSTANCES_MAX_INSTANCES - ) - - instances = await client.run_instances( - ImageId=instance_settings.EC2_INSTANCES_AMI_ID, - MinCount=1, - MaxCount=1, - InstanceType=instance_type, - InstanceInitiatedShutdownBehavior="terminate", - KeyName=instance_settings.EC2_INSTANCES_KEY_NAME, - SubnetId=instance_settings.EC2_INSTANCES_SUBNET_ID, - TagSpecifications=[ - { - "ResourceType": "instance", - "Tags": [ - {"Key": tag_key, "Value": tag_value} - for tag_key, tag_value in tags.items() - ], - } - ], - UserData=_compose_user_data(startup_script), - SecurityGroupIds=instance_settings.EC2_INSTANCES_SECURITY_GROUP_IDS, - ) - instance_id = instances["Instances"][0]["InstanceId"] - logger.info( - "New instance launched: %s, waiting for it to start now...", instance_id - ) - # wait for the instance to be in a running state - # NOTE: reference to EC2 states https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-lifecycle.html - waiter = client.get_waiter("instance_exists") - await waiter.wait(InstanceIds=[instance_id]) - logger.info( - "instance %s exists now, waiting for running state...", instance_id - ) - - waiter = client.get_waiter("instance_running") - await waiter.wait(InstanceIds=[instance_id]) - logger.info("instance %s is now running", instance_id) - - # NOTE: this is currently disactivated as this makes starting an instance - # take between 2-4 minutes more and it seems to be responsive much before - # nevertheless if we get weird errors, this should be activated again! - - # waiter = client.get_waiter("instance_status_ok") - # await waiter.wait(InstanceIds=[instance_id]) - # logger.info("instance %s status is OK...", instance_id) - - # get the private IP - instances = await client.describe_instances(InstanceIds=[instance_id]) - private_dns_name: str = instances["Reservations"][0]["Instances"][0][ - "PrivateDnsName" - ] - logger.info( - "instance %s is available on %s, happy computing!!", - instance_id, - private_dns_name, - ) - return private_dns_name diff --git a/services/autoscaling/tests/unit/conftest.py b/services/autoscaling/tests/unit/conftest.py index fce71fd792b..bb2f7529ec9 100644 --- a/services/autoscaling/tests/unit/conftest.py +++ b/services/autoscaling/tests/unit/conftest.py @@ -39,13 +39,13 @@ from simcore_service_autoscaling.core.application import create_app from simcore_service_autoscaling.core.settings import ApplicationSettings, EC2Settings from simcore_service_autoscaling.models import SimcoreServiceDockerLabelKeys -from simcore_service_autoscaling.utils_aws import EC2Client -from simcore_service_autoscaling.utils_aws import ec2_client as autoscaling_ec2_client +from simcore_service_autoscaling.modules.ec2 import AutoscalingEC2 from tenacity import retry from tenacity._asyncio import AsyncRetrying from tenacity.retry import retry_if_exception_type from tenacity.stop import stop_after_delay from tenacity.wait import wait_fixed +from types_aiobotocore_ec2.client import EC2Client from types_aiobotocore_ec2.literals import InstanceTypeType pytest_plugins = [ @@ -128,6 +128,11 @@ def disabled_rabbitmq(app_environment: EnvVarsDict, monkeypatch: pytest.MonkeyPa monkeypatch.delenv("RABBIT_PASSWORD") +@pytest.fixture +def disabled_ec2(app_environment: EnvVarsDict, monkeypatch: pytest.MonkeyPatch): + monkeypatch.delenv("EC2_ACCESS_KEY_ID") + + @pytest.fixture def enabled_rabbitmq( app_environment: EnvVarsDict, rabbit_service: RabbitSettings @@ -512,22 +517,21 @@ async def aws_ami_id( @pytest.fixture -async def ec2_client() -> AsyncIterator[EC2Client]: +async def autoscaling_ec2( + app_environment: EnvVarsDict, +) -> AsyncIterator[AutoscalingEC2]: settings = EC2Settings.create_from_envs() - async with autoscaling_ec2_client(settings) as client: - yield client + ec2 = await AutoscalingEC2.create(settings) + assert ec2 + yield ec2 + await ec2.close() @pytest.fixture -def mocked_ec2_server_with_client( - mocked_aws_server_envs: None, - aws_vpc_id: str, - aws_subnet_id: str, - aws_security_group_id: str, - aws_ami_id: str, - ec2_client: EC2Client, -) -> Iterator[EC2Client]: - yield ec2_client +async def ec2_client( + autoscaling_ec2: AutoscalingEC2, +) -> AsyncIterator[EC2Client]: + yield autoscaling_ec2.client @pytest.fixture diff --git a/services/autoscaling/tests/unit/test_api_health.py b/services/autoscaling/tests/unit/test_api_health.py new file mode 100644 index 00000000000..523d6b927d9 --- /dev/null +++ b/services/autoscaling/tests/unit/test_api_health.py @@ -0,0 +1,95 @@ +# pylint:disable=unused-variable +# pylint:disable=unused-argument +# pylint:disable=redefined-outer-name + +import httpx +import pytest +from moto.server import ThreadedMotoServer +from pytest_simcore.helpers.utils_envs import EnvVarsDict +from starlette import status + +pytest_simcore_core_services_selection = [ + "rabbit", +] + +pytest_simcore_ops_services_selection = [] + + +@pytest.fixture +def app_environment( + app_environment: EnvVarsDict, + enabled_rabbitmq: None, + mocked_aws_server_envs: None, +) -> EnvVarsDict: + return app_environment + + +async def test_healthcheck(async_client: httpx.AsyncClient): + response = await async_client.get("/") + response.raise_for_status() + assert response.status_code == status.HTTP_200_OK + assert "simcore_service_autoscaling" in response.text + + +async def test_status_no_rabbit( + disabled_rabbitmq: None, + async_client: httpx.AsyncClient, +): + response = await async_client.get("/status") + response.raise_for_status() + assert response.status_code == status.HTTP_200_OK + status_response = response.json() + assert "rabbitmq" in status_response + rabbitmq_status = status_response["rabbitmq"] + assert "is_enabled" in rabbitmq_status + assert rabbitmq_status["is_enabled"] is False + assert rabbitmq_status["is_responsive"] is False + + assert "ec2" in status_response + ec2_status = status_response["ec2"] + assert "is_enabled" in ec2_status + assert ec2_status["is_enabled"] is True + assert ec2_status["is_responsive"] is True + + +async def test_status( + mocked_aws_server: ThreadedMotoServer, + async_client: httpx.AsyncClient, +): + # stop the aws server... + mocked_aws_server.stop() + + response = await async_client.get("/status") + response.raise_for_status() + assert response.status_code == status.HTTP_200_OK + status_response = response.json() + assert "rabbitmq" in status_response + rabbitmq_status = status_response["rabbitmq"] + assert "is_enabled" in rabbitmq_status + assert rabbitmq_status["is_enabled"] is True + assert rabbitmq_status["is_responsive"] is True + + assert "ec2" in status_response + ec2_status = status_response["ec2"] + assert "is_enabled" in ec2_status + assert ec2_status["is_enabled"] is True + assert ec2_status["is_responsive"] is False + + # restart the server + mocked_aws_server.start() + + response = await async_client.get("/status") + response.raise_for_status() + assert response.status_code == status.HTTP_200_OK + status_response = response.json() + assert "rabbitmq" in status_response + rabbitmq_status = status_response["rabbitmq"] + assert "is_enabled" in rabbitmq_status + assert rabbitmq_status["is_enabled"] is True + assert rabbitmq_status["is_responsive"] is True + + assert "ec2" in status_response + ec2_status = status_response["ec2"] + assert "is_enabled" in ec2_status + assert ec2_status["is_enabled"] is True + assert ec2_status["is_responsive"] is True diff --git a/services/autoscaling/tests/unit/test_dynamic_scaling_core.py b/services/autoscaling/tests/unit/test_dynamic_scaling_core.py index 305caa81ded..5bd7a5ee43f 100644 --- a/services/autoscaling/tests/unit/test_dynamic_scaling_core.py +++ b/services/autoscaling/tests/unit/test_dynamic_scaling_core.py @@ -15,7 +15,7 @@ from pytest_mock.plugin import MockerFixture from simcore_service_autoscaling.core.settings import ApplicationSettings from simcore_service_autoscaling.dynamic_scaling_core import check_dynamic_resources -from simcore_service_autoscaling.utils_aws import EC2Client +from types_aiobotocore_ec2.client import EC2Client @pytest.fixture @@ -25,10 +25,11 @@ def aws_instance_private_dns() -> str: @pytest.fixture def mock_start_aws_instance( - mocker: MockerFixture, aws_instance_private_dns: str + mocker: MockerFixture, + aws_instance_private_dns: str, ) -> Iterator[mock.Mock]: mocked_start_aws_instance = mocker.patch( - "simcore_service_autoscaling.dynamic_scaling_core.utils_aws.start_aws_instance", + "simcore_service_autoscaling.modules.ec2.AutoscalingEC2.start_aws_instance", autospec=True, return_value=aws_instance_private_dns, ) @@ -133,7 +134,7 @@ async def test_check_dynamic_resources_with_pending_resources_starts_r5n_4xlarge await check_dynamic_resources(initialized_app) mock_start_aws_instance.assert_called_once_with( - app_settings.AUTOSCALING_EC2_ACCESS, + mock.ANY, app_settings.AUTOSCALING_EC2_INSTANCES, instance_type="r5n.4xlarge", tags=mock.ANY, diff --git a/services/autoscaling/tests/unit/test_dynamic_scaling_task.py b/services/autoscaling/tests/unit/test_dynamic_scaling_task.py index d4e16cfd05e..3b7c5df7938 100644 --- a/services/autoscaling/tests/unit/test_dynamic_scaling_task.py +++ b/services/autoscaling/tests/unit/test_dynamic_scaling_task.py @@ -20,6 +20,7 @@ def app_environment( app_environment: EnvVarsDict, disabled_rabbitmq: None, + mocked_aws_server_envs: None, monkeypatch: pytest.MonkeyPatch, ) -> EnvVarsDict: # fast interval diff --git a/services/autoscaling/tests/unit/test_health_route.py b/services/autoscaling/tests/unit/test_health_route.py deleted file mode 100644 index feebb8ddee3..00000000000 --- a/services/autoscaling/tests/unit/test_health_route.py +++ /dev/null @@ -1,56 +0,0 @@ -# pylint:disable=unused-variable -# pylint:disable=unused-argument -# pylint:disable=redefined-outer-name - -import httpx -import pytest -from pytest_simcore.helpers.utils_envs import EnvVarsDict -from starlette import status - -pytest_simcore_core_services_selection = [ - "rabbit", -] - -pytest_simcore_ops_services_selection = [] - - -@pytest.fixture -def app_environment( - app_environment: EnvVarsDict, - enabled_rabbitmq: None, - monkeypatch: pytest.MonkeyPatch, -) -> EnvVarsDict: - return app_environment - - -async def test_healthcheck(async_client: httpx.AsyncClient): - response = await async_client.get("/") - response.raise_for_status() - assert response.status_code == status.HTTP_200_OK - assert "simcore_service_autoscaling" in response.text - - -async def test_status_no_rabbit( - disabled_rabbitmq: None, async_client: httpx.AsyncClient -): - response = await async_client.get("/status") - response.raise_for_status() - assert response.status_code == status.HTTP_200_OK - status_response = response.json() - assert "rabbitmq" in status_response - rabbitmq_status = status_response["rabbitmq"] - assert "initialized" in rabbitmq_status - assert rabbitmq_status["initialized"] is False - assert rabbitmq_status["connection_state"] is False - - -async def test_status(async_client: httpx.AsyncClient): - response = await async_client.get("/status") - response.raise_for_status() - assert response.status_code == status.HTTP_200_OK - status_response = response.json() - assert "rabbitmq" in status_response - rabbitmq_status = status_response["rabbitmq"] - assert "initialized" in rabbitmq_status - assert rabbitmq_status["initialized"] is True - assert rabbitmq_status["connection_state"] is True diff --git a/services/autoscaling/tests/unit/test_modules_ec2.py b/services/autoscaling/tests/unit/test_modules_ec2.py new file mode 100644 index 00000000000..0965e7d60b0 --- /dev/null +++ b/services/autoscaling/tests/unit/test_modules_ec2.py @@ -0,0 +1,187 @@ +# pylint: disable=redefined-outer-name +# pylint: disable=unused-argument +# pylint: disable=unused-variable + +import botocore.exceptions +import pytest +from faker import Faker +from fastapi import FastAPI +from moto.server import ThreadedMotoServer +from pytest_simcore.helpers.utils_envs import EnvVarsDict +from simcore_service_autoscaling.core.errors import ( + ConfigurationError, + Ec2TooManyInstancesError, +) +from simcore_service_autoscaling.core.settings import ApplicationSettings, EC2Settings +from simcore_service_autoscaling.modules.ec2 import AutoscalingEC2, get_ec2_client +from types_aiobotocore_ec2 import EC2Client + + +@pytest.fixture +def ec2_settings( + app_environment: EnvVarsDict, +) -> EC2Settings: + return EC2Settings.create_from_envs() + + +@pytest.fixture +def app_settings( + app_environment: EnvVarsDict, +) -> ApplicationSettings: + return ApplicationSettings.create_from_envs() + + +async def test_ec2_client_lifespan(ec2_settings: EC2Settings): + ec2 = await AutoscalingEC2.create(settings=ec2_settings) + assert ec2 + assert ec2.client + assert ec2.exit_stack + assert ec2.session + + await ec2.close() + + +async def test_ec2_client_raises_when_no_connection_available(ec2_client: EC2Client): + with pytest.raises( + botocore.exceptions.ClientError, match=r".+ AWS was not able to validate .+" + ): + await ec2_client.describe_account_attributes(DryRun=True) + + +async def test_ec2_client_with_mock_server( + mocked_aws_server_envs: None, ec2_client: EC2Client +): + # passes without exception + await ec2_client.describe_account_attributes(DryRun=True) + + +async def test_ec2_does_not_initialize_if_deactivated( + disabled_rabbitmq: None, disabled_ec2: None, initialized_app: FastAPI +): + assert hasattr(initialized_app.state, "ec2_client") + assert initialized_app.state.ec2_client == None + with pytest.raises(ConfigurationError): + get_ec2_client(initialized_app) + + +async def test_ec2_client_when_ec2_server_goes_up_and_down( + mocked_aws_server: ThreadedMotoServer, + mocked_aws_server_envs: None, + ec2_client: EC2Client, +): + # passes without exception + await ec2_client.describe_account_attributes(DryRun=True) + mocked_aws_server.stop() + with pytest.raises(botocore.exceptions.EndpointConnectionError): + await ec2_client.describe_account_attributes(DryRun=True) + + # restart + mocked_aws_server.start() + # passes without exception + await ec2_client.describe_account_attributes(DryRun=True) + + +async def test_get_ec2_instance_capabilities( + mocked_aws_server_envs: None, + aws_allowed_ec2_instance_type_names: list[str], + app_settings: ApplicationSettings, + autoscaling_ec2: AutoscalingEC2, +): + assert app_settings.AUTOSCALING_EC2_INSTANCES + instance_types = await autoscaling_ec2.get_ec2_instance_capabilities( + app_settings.AUTOSCALING_EC2_INSTANCES + ) + assert instance_types + assert len(instance_types) == len( + app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES + ) + + # all the instance names are found and valid + assert all( + i.name in app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES + for i in instance_types + ) + for ( + instance_type_name + ) in app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES: + assert any(i.name == instance_type_name for i in instance_types) + + +async def test_start_aws_instance( + mocked_aws_server_envs: None, + aws_vpc_id: str, + aws_subnet_id: str, + aws_security_group_id: str, + aws_ami_id: str, + ec2_client: EC2Client, + autoscaling_ec2: AutoscalingEC2, + app_settings: ApplicationSettings, + faker: Faker, +): + assert app_settings.AUTOSCALING_EC2_ACCESS + assert app_settings.AUTOSCALING_EC2_INSTANCES + # we have nothing running now in ec2 + all_instances = await ec2_client.describe_instances() + assert not all_instances["Reservations"] + + instance_type = faker.pystr() + tags = faker.pydict(allowed_types=(str,)) + startup_script = faker.pystr() + await autoscaling_ec2.start_aws_instance( + app_settings.AUTOSCALING_EC2_INSTANCES, + instance_type, + tags=tags, + startup_script=startup_script, + ) + + # check we have that now in ec2 + all_instances = await ec2_client.describe_instances() + assert len(all_instances["Reservations"]) == 1 + running_instance = all_instances["Reservations"][0] + assert "Instances" in running_instance + assert len(running_instance["Instances"]) == 1 + running_instance = running_instance["Instances"][0] + assert "InstanceType" in running_instance + assert running_instance["InstanceType"] == instance_type + assert "Tags" in running_instance + assert running_instance["Tags"] == [ + {"Key": key, "Value": value} for key, value in tags.items() + ] + + +async def test_start_aws_instance_is_limited_in_number_of_instances( + mocked_aws_server_envs: None, + aws_vpc_id: str, + aws_subnet_id: str, + aws_security_group_id: str, + aws_ami_id: str, + ec2_client: EC2Client, + autoscaling_ec2: AutoscalingEC2, + app_settings: ApplicationSettings, + faker: Faker, +): + assert app_settings.AUTOSCALING_EC2_ACCESS + assert app_settings.AUTOSCALING_EC2_INSTANCES + # we have nothing running now in ec2 + all_instances = await ec2_client.describe_instances() + assert not all_instances["Reservations"] + + # create as many instances as we can + tags = faker.pydict(allowed_types=(str,)) + startup_script = faker.pystr() + for _ in range(app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES): + await autoscaling_ec2.start_aws_instance( + app_settings.AUTOSCALING_EC2_INSTANCES, + faker.pystr(), + tags=tags, + startup_script=startup_script, + ) + + # now creating one more shall fail + with pytest.raises(Ec2TooManyInstancesError): + await autoscaling_ec2.start_aws_instance( + app_settings.AUTOSCALING_EC2_INSTANCES, + faker.pystr(), + tags=tags, + startup_script=startup_script, + ) diff --git a/services/autoscaling/tests/unit/test_rabbitmq.py b/services/autoscaling/tests/unit/test_modules_rabbitmq.py similarity index 94% rename from services/autoscaling/tests/unit/test_rabbitmq.py rename to services/autoscaling/tests/unit/test_modules_rabbitmq.py index b5b21361d58..1628734ab34 100644 --- a/services/autoscaling/tests/unit/test_rabbitmq.py +++ b/services/autoscaling/tests/unit/test_modules_rabbitmq.py @@ -19,7 +19,10 @@ from servicelib.rabbitmq import RabbitMQClient from settings_library.rabbit import RabbitSettings from simcore_service_autoscaling.core.errors import ConfigurationError -from simcore_service_autoscaling.rabbitmq import get_rabbitmq_client, post_message +from simcore_service_autoscaling.modules.rabbitmq import ( + get_rabbitmq_client, + post_message, +) from tenacity import retry from tenacity._asyncio import AsyncRetrying from tenacity.retry import retry_if_exception_type @@ -76,7 +79,7 @@ def rabbit_message( def test_rabbitmq_does_not_initialize_if_deactivated( - disabled_rabbitmq, initialized_app: FastAPI + disabled_rabbitmq: None, disabled_ec2: None, initialized_app: FastAPI ): assert hasattr(initialized_app.state, "rabbitmq_client") assert initialized_app.state.rabbitmq_client == None @@ -85,7 +88,7 @@ def test_rabbitmq_does_not_initialize_if_deactivated( def test_rabbitmq_initializes( - enabled_rabbitmq: RabbitSettings, initialized_app: FastAPI + enabled_rabbitmq: RabbitSettings, disabled_ec2: None, initialized_app: FastAPI ): assert hasattr(initialized_app.state, "rabbitmq_client") assert initialized_app.state.rabbitmq_client is not None @@ -95,6 +98,7 @@ def test_rabbitmq_initializes( async def test_post_message( disable_dynamic_service_background_task, enabled_rabbitmq: RabbitSettings, + disabled_ec2: None, initialized_app: FastAPI, rabbit_message: RabbitMessageBase, rabbit_client: RabbitMQClient, @@ -117,6 +121,7 @@ async def test_post_message( async def test_post_message_with_disabled_rabbit_does_not_raise( disabled_rabbitmq: None, + disabled_ec2: None, initialized_app: FastAPI, rabbit_message: RabbitMessageBase, ): @@ -153,6 +158,7 @@ async def _check_service_task_gone(service: Mapping[str, Any]) -> None: async def test_post_message_when_rabbit_disconnected( enabled_rabbitmq: RabbitSettings, + disabled_ec2: None, initialized_app: FastAPI, rabbit_autoscaling_message: RabbitAutoscalingMessage, async_docker_client: aiodocker.Docker, diff --git a/services/autoscaling/tests/unit/test_utils_aws.py b/services/autoscaling/tests/unit/test_utils_aws.py deleted file mode 100644 index 6f246d1b165..00000000000 --- a/services/autoscaling/tests/unit/test_utils_aws.py +++ /dev/null @@ -1,221 +0,0 @@ -# pylint: disable=redefined-outer-name -# pylint: disable=unused-argument -# pylint: disable=unused-variable - - -import random - -import botocore.exceptions -import pytest -from faker import Faker -from pydantic import ByteSize -from pytest_simcore.helpers.utils_envs import EnvVarsDict -from simcore_service_autoscaling.core.errors import ( - ConfigurationError, - Ec2InstanceNotFoundError, - Ec2TooManyInstancesError, -) -from simcore_service_autoscaling.core.settings import ApplicationSettings -from simcore_service_autoscaling.models import Resources -from simcore_service_autoscaling.utils_aws import ( - EC2Client, - EC2Instance, - _compose_user_data, - closest_instance_policy, - ec2_client, - find_best_fitting_ec2_instance, - get_ec2_instance_capabilities, - start_aws_instance, -) - - -@pytest.fixture -def app_settings( - app_environment: EnvVarsDict, -) -> ApplicationSettings: - return ApplicationSettings.create_from_envs() - - -async def test_ec2_client(app_settings: ApplicationSettings): - assert app_settings.AUTOSCALING_EC2_ACCESS - async with ec2_client(app_settings.AUTOSCALING_EC2_ACCESS) as client: - ... - - with pytest.raises( - botocore.exceptions.ClientError, match=r".+ AWS was not able to validate .+" - ): - async with ec2_client(app_settings.AUTOSCALING_EC2_ACCESS) as client: - await client.describe_account_attributes(DryRun=True) - - -async def test_ec2_client_with_mock_server( - mocked_aws_server_envs: None, app_settings: ApplicationSettings -): - # passes without exception - assert app_settings.AUTOSCALING_EC2_ACCESS - async with ec2_client(app_settings.AUTOSCALING_EC2_ACCESS) as client: - await client.describe_account_attributes(DryRun=True) - - -async def test_get_ec2_instance_capabilities( - mocked_aws_server_envs: None, - aws_allowed_ec2_instance_type_names: list[str], - app_settings: ApplicationSettings, -): - assert app_settings.AUTOSCALING_EC2_ACCESS - assert app_settings.AUTOSCALING_EC2_INSTANCES - instance_types = await get_ec2_instance_capabilities( - app_settings.AUTOSCALING_EC2_ACCESS, app_settings.AUTOSCALING_EC2_INSTANCES - ) - assert instance_types - assert len(instance_types) == len( - app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES - ) - - # all the instance names are found and valid - assert all( - i.name in app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES - for i in instance_types - ) - for ( - instance_type_name - ) in app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES: - assert any(i.name == instance_type_name for i in instance_types) - - -async def test_find_best_fitting_ec2_instance_with_no_instances_raises(): - # this shall raise as there are no available instances - with pytest.raises(ConfigurationError): - find_best_fitting_ec2_instance( - allowed_ec2_instances=[], - resources=Resources(cpus=0, ram=ByteSize(0)), - ) - - -@pytest.fixture -def random_fake_available_instances(faker: Faker) -> list[EC2Instance]: - list_of_instances = [ - EC2Instance( - name=faker.pystr(), - cpus=n, - ram=ByteSize(n), - ) - for n in range(1, 30) - ] - random.shuffle(list_of_instances) - return list_of_instances - - -async def test_find_best_fitting_ec2_instance_closest_instance_policy_with_resource_0_raises( - random_fake_available_instances: list[EC2Instance], -): - with pytest.raises(Ec2InstanceNotFoundError): - find_best_fitting_ec2_instance( - allowed_ec2_instances=random_fake_available_instances, - resources=Resources(cpus=0, ram=ByteSize(0)), - score_type=closest_instance_policy, - ) - - -@pytest.mark.parametrize( - "needed_resources,expected_ec2_instance", - [ - ( - Resources(cpus=n, ram=ByteSize(n)), - EC2Instance(name="fake", cpus=n, ram=ByteSize(n)), - ) - for n in range(1, 30) - ], -) -async def test_find_best_fitting_ec2_instance_closest_instance_policy( - needed_resources: Resources, - expected_ec2_instance: EC2Instance, - random_fake_available_instances: list[EC2Instance], -): - found_instance: EC2Instance = find_best_fitting_ec2_instance( - allowed_ec2_instances=random_fake_available_instances, - resources=needed_resources, - score_type=closest_instance_policy, - ) - - assert found_instance.dict(exclude={"name"}) == expected_ec2_instance.dict( - exclude={"name"} - ) - - -def test_compose_user_data(faker: Faker): - command = faker.text() - user_data = _compose_user_data(command) - assert user_data.startswith("#!/bin/bash") - assert command in user_data - - -async def test_start_aws_instance( - faker: Faker, - mocked_ec2_server_with_client: EC2Client, - app_settings: ApplicationSettings, -): - assert app_settings.AUTOSCALING_EC2_ACCESS - assert app_settings.AUTOSCALING_EC2_INSTANCES - # we have nothing running now in ec2 - all_instances = await mocked_ec2_server_with_client.describe_instances() - assert not all_instances["Reservations"] - - instance_type = faker.pystr() - tags = faker.pydict(allowed_types=(str,)) - startup_script = faker.pystr() - await start_aws_instance( - app_settings.AUTOSCALING_EC2_ACCESS, - app_settings.AUTOSCALING_EC2_INSTANCES, - instance_type, - tags=tags, - startup_script=startup_script, - ) - - # check we have that now in ec2 - all_instances = await mocked_ec2_server_with_client.describe_instances() - assert len(all_instances["Reservations"]) == 1 - running_instance = all_instances["Reservations"][0] - assert "Instances" in running_instance - assert len(running_instance["Instances"]) == 1 - running_instance = running_instance["Instances"][0] - assert "InstanceType" in running_instance - assert running_instance["InstanceType"] == instance_type - assert "Tags" in running_instance - assert running_instance["Tags"] == [ - {"Key": key, "Value": value} for key, value in tags.items() - ] - - -async def test_start_aws_instance_is_limited_in_number_of_instances( - mocked_ec2_server_with_client: EC2Client, - app_settings: ApplicationSettings, - faker: Faker, -): - assert app_settings.AUTOSCALING_EC2_ACCESS - assert app_settings.AUTOSCALING_EC2_INSTANCES - # we have nothing running now in ec2 - all_instances = await mocked_ec2_server_with_client.describe_instances() - assert not all_instances["Reservations"] - - # create as many instances as we can - tags = faker.pydict(allowed_types=(str,)) - startup_script = faker.pystr() - for _ in range(app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES): - await start_aws_instance( - app_settings.AUTOSCALING_EC2_ACCESS, - app_settings.AUTOSCALING_EC2_INSTANCES, - faker.pystr(), - tags=tags, - startup_script=startup_script, - ) - - # now creating one more shall fail - with pytest.raises(Ec2TooManyInstancesError): - await start_aws_instance( - app_settings.AUTOSCALING_EC2_ACCESS, - app_settings.AUTOSCALING_EC2_INSTANCES, - faker.pystr(), - tags=tags, - startup_script=startup_script, - ) diff --git a/services/autoscaling/tests/unit/test_utils_ec2.py b/services/autoscaling/tests/unit/test_utils_ec2.py new file mode 100644 index 00000000000..2e7cace7451 --- /dev/null +++ b/services/autoscaling/tests/unit/test_utils_ec2.py @@ -0,0 +1,88 @@ +# pylint: disable=redefined-outer-name +# pylint: disable=unused-argument +# pylint: disable=unused-variable + + +import random + +import pytest +from faker import Faker +from pydantic import ByteSize +from simcore_service_autoscaling.core.errors import ( + ConfigurationError, + Ec2InstanceNotFoundError, +) +from simcore_service_autoscaling.models import Resources +from simcore_service_autoscaling.utils.ec2 import ( + EC2Instance, + closest_instance_policy, + compose_user_data, + find_best_fitting_ec2_instance, +) + + +async def test_find_best_fitting_ec2_instance_with_no_instances_raises(): + # this shall raise as there are no available instances + with pytest.raises(ConfigurationError): + find_best_fitting_ec2_instance( + allowed_ec2_instances=[], + resources=Resources(cpus=0, ram=ByteSize(0)), + ) + + +@pytest.fixture +def random_fake_available_instances(faker: Faker) -> list[EC2Instance]: + list_of_instances = [ + EC2Instance( + name=faker.pystr(), + cpus=n, + ram=ByteSize(n), + ) + for n in range(1, 30) + ] + random.shuffle(list_of_instances) + return list_of_instances + + +async def test_find_best_fitting_ec2_instance_closest_instance_policy_with_resource_0_raises( + random_fake_available_instances: list[EC2Instance], +): + with pytest.raises(Ec2InstanceNotFoundError): + find_best_fitting_ec2_instance( + allowed_ec2_instances=random_fake_available_instances, + resources=Resources(cpus=0, ram=ByteSize(0)), + score_type=closest_instance_policy, + ) + + +@pytest.mark.parametrize( + "needed_resources,expected_ec2_instance", + [ + ( + Resources(cpus=n, ram=ByteSize(n)), + EC2Instance(name="fake", cpus=n, ram=ByteSize(n)), + ) + for n in range(1, 30) + ], +) +async def test_find_best_fitting_ec2_instance_closest_instance_policy( + needed_resources: Resources, + expected_ec2_instance: EC2Instance, + random_fake_available_instances: list[EC2Instance], +): + found_instance: EC2Instance = find_best_fitting_ec2_instance( + allowed_ec2_instances=random_fake_available_instances, + resources=needed_resources, + score_type=closest_instance_policy, + ) + + assert found_instance.dict(exclude={"name"}) == expected_ec2_instance.dict( + exclude={"name"} + ) + + +def test_compose_user_data(faker: Faker): + command = faker.text() + user_data = compose_user_data(command) + assert user_data.startswith("#!/bin/bash") + assert command in user_data diff --git a/services/autoscaling/tests/unit/test_utils_rabbitmq.py b/services/autoscaling/tests/unit/test_utils_rabbitmq.py index f06936a4808..ab49fe1c3b3 100644 --- a/services/autoscaling/tests/unit/test_utils_rabbitmq.py +++ b/services/autoscaling/tests/unit/test_utils_rabbitmq.py @@ -1,6 +1,7 @@ # pylint:disable=unused-variable # pylint:disable=unused-argument # pylint:disable=redefined-outer-name +# pylint:disable=too-many-arguments from typing import Any, Awaitable, Callable, Mapping @@ -40,6 +41,7 @@ async def test_post_log_message( disable_dynamic_service_background_task, enabled_rabbitmq: RabbitSettings, + disabled_ec2: None, initialized_app: FastAPI, rabbit_client: RabbitMQClient, mocker: MockerFixture, @@ -93,6 +95,7 @@ async def test_post_log_message( async def test_post_log_message_does_not_raise_if_service_has_no_labels( disable_dynamic_service_background_task, enabled_rabbitmq: RabbitSettings, + disabled_ec2: None, initialized_app: FastAPI, async_docker_client: aiodocker.Docker, create_service: Callable[[dict[str, Any]], Awaitable[Mapping[str, Any]]],