Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Autoscaling: have only one ec2 client #3643

Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from fastapi.responses import PlainTextResponse
from pydantic import BaseModel

from ..rabbitmq import get_rabbitmq_client
from ..modules.rabbitmq import get_rabbitmq_client
from .dependencies.application import get_app

router = APIRouter()
Expand All @@ -22,23 +22,30 @@ async def health_check():
return f"{__name__}.health_check@{datetime.utcnow().isoformat()}"


class _RabbitMQStatus(BaseModel):
initialized: bool
connection_state: bool
class _ComponentStatus(BaseModel):
is_enabled: bool
is_responsive: bool


class _StatusGet(BaseModel):
rabbitmq: _RabbitMQStatus
rabbitmq: _ComponentStatus
ec2: _ComponentStatus


@router.get("/status", include_in_schema=True, response_model=_StatusGet)
async def get_status(app: FastAPI = Depends(get_app)) -> _StatusGet:

return _StatusGet(
rabbitmq=_RabbitMQStatus(
initialized=bool(app.state.rabbitmq_client),
connection_state=await get_rabbitmq_client(app).ping()
rabbitmq=_ComponentStatus(
is_enabled=bool(app.state.rabbitmq_client),
is_responsive=await get_rabbitmq_client(app).ping()
if app.state.rabbitmq_client
else False,
)
),
ec2=_ComponentStatus(
is_enabled=bool(app.state.ec2_client),
is_responsive=await app.state.ec2_client.ping()
if app.state.ec2_client
else False,
),
)
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
)
from ..api.routes import setup_api_routes
from ..dynamic_scaling import setup as setup_background_task
from ..rabbitmq import setup as setup_rabbitmq
from ..modules.ec2 import setup as setup_ec2
from ..modules.rabbitmq import setup as setup_rabbitmq
from .settings import ApplicationSettings

logger = logging.getLogger(__name__)
Expand All @@ -37,6 +38,7 @@ def create_app(settings: ApplicationSettings) -> FastAPI:
# PLUGINS SETUP
setup_api_routes(app)
setup_rabbitmq(app)
setup_ec2(app)
# autoscaler background task
setup_background_task(app)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,22 @@


class AutoscalingRuntimeError(PydanticErrorMixin, RuntimeError):
...
msg_template: str = "Autoscaling unexpected error"


class ConfigurationError(AutoscalingRuntimeError):
code = "autoscaling.configuration_error"
msg_template: str = "Application misconfiguration: {msg}"


class Ec2NotConnectedError(AutoscalingRuntimeError):
sanderegg marked this conversation as resolved.
Show resolved Hide resolved
msg_template: str = "Cannot connect with ec2 server"


class Ec2InstanceNotFoundError(AutoscalingRuntimeError):
code = "autoscaling.ec2_instance_not_found_error"
msg_template: str = "Needed instance was not found"


class Ec2TooManyInstancesError(AutoscalingRuntimeError):
code = "autoscaling.ec2_too_many_instances_error"
msg_template: str = (
"The maximum amount of instances {num_instances} is already reached!"
)
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@
from pydantic import parse_obj_as
from types_aiobotocore_ec2.literals import InstanceTypeType

from . import utils_aws, utils_docker
from . import utils_docker
from ._meta import VERSION
from .core.errors import Ec2InstanceNotFoundError
from .core.settings import ApplicationSettings
from .utils import rabbitmq
from .modules.ec2 import get_ec2_client
from .utils import ec2, rabbitmq

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -61,10 +62,10 @@ async def check_dynamic_resources(app: FastAPI) -> None:

assert app_settings.AUTOSCALING_EC2_ACCESS # nosec
assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec
list_of_ec2_instances = await utils_aws.get_ec2_instance_capabilities(
app_settings.AUTOSCALING_EC2_ACCESS, app_settings.AUTOSCALING_EC2_INSTANCES
ec2_client = get_ec2_client(app)
list_of_ec2_instances = await ec2_client.get_ec2_instance_capabilities(
app_settings.AUTOSCALING_EC2_INSTANCES
)

for task in pending_tasks:
await rabbitmq.post_log_message(
app,
Expand All @@ -74,18 +75,17 @@ async def check_dynamic_resources(app: FastAPI) -> None:
)
try:
ec2_instances_needed = [
utils_aws.find_best_fitting_ec2_instance(
ec2.find_best_fitting_ec2_instance(
list_of_ec2_instances,
utils_docker.get_max_resources_from_docker_task(task),
score_type=utils_aws.closest_instance_policy,
score_type=ec2.closest_instance_policy,
)
]
assert app_settings.AUTOSCALING_EC2_ACCESS # nosec
assert app_settings.AUTOSCALING_NODES_MONITORING # nosec

logger.debug("%s", f"{ec2_instances_needed[0]=}")
new_instance_dns_name = await utils_aws.start_aws_instance(
app_settings.AUTOSCALING_EC2_ACCESS,
new_instance_dns_name = await ec2_client.start_aws_instance(
app_settings.AUTOSCALING_EC2_INSTANCES,
instance_type=parse_obj_as(
InstanceTypeType, ec2_instances_needed[0].name
Expand Down
218 changes: 218 additions & 0 deletions services/autoscaling/src/simcore_service_autoscaling/modules/ec2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
import contextlib
import logging
from dataclasses import dataclass
from typing import Optional, cast

import aioboto3
from aiobotocore.session import ClientCreatorContext
from fastapi import FastAPI
from pydantic import ByteSize, parse_obj_as
from servicelib.logging_utils import log_context
from tenacity._asyncio import AsyncRetrying
from tenacity.before_sleep import before_sleep_log
from tenacity.stop import stop_after_delay
from tenacity.wait import wait_random_exponential
from types_aiobotocore_ec2 import EC2Client
from types_aiobotocore_ec2.literals import InstanceTypeType
from types_aiobotocore_ec2.type_defs import ReservationTypeDef

from ..core.errors import (
ConfigurationError,
Ec2NotConnectedError,
Ec2TooManyInstancesError,
)
from ..core.settings import EC2InstancesSettings, EC2Settings
from ..models import EC2Instance
from ..utils.ec2 import compose_user_data

InstancePrivateDNSName = str

logger = logging.getLogger(__name__)


def _is_ec2_instance_running(instance: ReservationTypeDef):
return (
instance.get("Instances", [{}])[0].get("State", {}).get("Name", "not_running")
== "running"
)


@dataclass
class AutoscalingEC2:
client: EC2Client
session: aioboto3.Session
exit_stack: contextlib.AsyncExitStack

@classmethod
async def create(cls, settings: EC2Settings) -> "AutoscalingEC2":
session = aioboto3.Session()
session_client = session.client(
"ec2",
endpoint_url=settings.EC2_ENDPOINT,
aws_access_key_id=settings.EC2_ACCESS_KEY_ID,
aws_secret_access_key=settings.EC2_SECRET_ACCESS_KEY,
region_name=settings.EC2_REGION_NAME,
)
assert isinstance(session_client, ClientCreatorContext) # nosec
exit_stack = contextlib.AsyncExitStack()
ec2_client = cast(
EC2Client, await exit_stack.enter_async_context(session_client)
)
return cls(ec2_client, session, exit_stack)

async def close(self) -> None:
await self.exit_stack.aclose()

async def ping(self) -> bool:
try:
await self.client.describe_account_attributes(DryRun=True)
return True
except Exception: # pylint: disable=broad-except
return False

async def get_ec2_instance_capabilities(
self,
instance_settings: EC2InstancesSettings,
) -> list[EC2Instance]:
instance_types = await self.client.describe_instance_types(
InstanceTypes=cast(
list[InstanceTypeType], instance_settings.EC2_INSTANCES_ALLOWED_TYPES
)
)

list_instances: list[EC2Instance] = []
for instance in instance_types.get("InstanceTypes", []):
with contextlib.suppress(KeyError):
list_instances.append(
EC2Instance(
name=instance["InstanceType"],
cpus=instance["VCpuInfo"]["DefaultVCpus"],
ram=parse_obj_as(
ByteSize, f"{instance['MemoryInfo']['SizeInMiB']}MiB"
),
)
)
return list_instances

async def start_aws_instance(
self,
instance_settings: EC2InstancesSettings,
instance_type: InstanceTypeType,
tags: dict[str, str],
startup_script: str,
) -> InstancePrivateDNSName:
with log_context(
logger,
logging.INFO,
msg=f"launching AWS instance {instance_type} with {tags=}",
):
# first check the max amount is not already reached
if current_instances := await self.client.describe_instances(
Filters=[
{"Name": "tag-key", "Values": [tag_key]} for tag_key in tags.keys()
]
):
if (
len(current_instances.get("Reservations", []))
>= instance_settings.EC2_INSTANCES_MAX_INSTANCES
) and all(
_is_ec2_instance_running(instance)
for instance in current_instances["Reservations"]
):
raise Ec2TooManyInstancesError(
num_instances=instance_settings.EC2_INSTANCES_MAX_INSTANCES
)

instances = await self.client.run_instances(
ImageId=instance_settings.EC2_INSTANCES_AMI_ID,
MinCount=1,
MaxCount=1,
InstanceType=instance_type,
InstanceInitiatedShutdownBehavior="terminate",
KeyName=instance_settings.EC2_INSTANCES_KEY_NAME,
SubnetId=instance_settings.EC2_INSTANCES_SUBNET_ID,
TagSpecifications=[
{
"ResourceType": "instance",
"Tags": [
{"Key": tag_key, "Value": tag_value}
for tag_key, tag_value in tags.items()
],
}
],
UserData=compose_user_data(startup_script),
SecurityGroupIds=instance_settings.EC2_INSTANCES_SECURITY_GROUP_IDS,
)
instance_id = instances["Instances"][0]["InstanceId"]
logger.info(
"New instance launched: %s, waiting for it to start now...", instance_id
)
# wait for the instance to be in a running state
# NOTE: reference to EC2 states https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-lifecycle.html
waiter = self.client.get_waiter("instance_exists")
await waiter.wait(InstanceIds=[instance_id])
logger.info(
"instance %s exists now, waiting for running state...", instance_id
)

waiter = self.client.get_waiter("instance_running")
await waiter.wait(InstanceIds=[instance_id])
logger.info("instance %s is now running", instance_id)

# NOTE: this is currently deactivated as this makes starting an instance
# take between 2-4 minutes more and it seems to be responsive much before
# nevertheless if we get weird errors, this should be activated again!

# waiter = client.get_waiter("instance_status_ok")
# await waiter.wait(InstanceIds=[instance_id])
# logger.info("instance %s status is OK...", instance_id)

# get the private IP
instances = await self.client.describe_instances(InstanceIds=[instance_id])
private_dns_name: str = instances["Reservations"][0]["Instances"][0][
"PrivateDnsName"
]
logger.info(
"instance %s is available on %s, happy computing!!",
instance_id,
private_dns_name,
)
return private_dns_name


def setup(app: FastAPI) -> None:
async def on_startup() -> None:
app.state.ec2_client = None
settings: Optional[EC2Settings] = app.state.settings.AUTOSCALING_EC2_ACCESS

if not settings:
logger.warning("EC2 client is de-activated in the settings")
return

app.state.ec2_client = client = await AutoscalingEC2.create(settings)

async for attempt in AsyncRetrying(
reraise=True,
stop=stop_after_delay(120),
wait=wait_random_exponential(max=30),
before_sleep=before_sleep_log(logger, logging.WARNING),
):
with attempt:
connected = await client.ping()
if not connected:
raise Ec2NotConnectedError()

async def on_shutdown() -> None:
if app.state.ec2_client:
await cast(AutoscalingEC2, app.state.ec2_client).close()

app.add_event_handler("startup", on_startup)
app.add_event_handler("shutdown", on_shutdown)


def get_ec2_client(app: FastAPI) -> AutoscalingEC2:
if not app.state.ec2_client:
raise ConfigurationError(
msg="EC2 client is not available. Please check the configuration."
)
return cast(AutoscalingEC2, app.state.ec2_client)
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from servicelib.rabbitmq_utils import wait_till_rabbitmq_responsive
from settings_library.rabbit import RabbitSettings

from .core.errors import ConfigurationError
from ..core.errors import ConfigurationError

logger = logging.getLogger(__name__)

Expand Down
Loading