Skip to content

Commit

Permalink
Merge branch 'master' into is3318/resend-sms-handler
Browse files Browse the repository at this point in the history
  • Loading branch information
pcrespov authored Dec 8, 2022
2 parents 00ad66a + 0f04727 commit c08c261
Show file tree
Hide file tree
Showing 20 changed files with 719 additions and 536 deletions.
25 changes: 16 additions & 9 deletions services/autoscaling/src/simcore_service_autoscaling/api/health.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from fastapi.responses import PlainTextResponse
from pydantic import BaseModel

from ..rabbitmq import get_rabbitmq_client
from ..modules.rabbitmq import get_rabbitmq_client
from .dependencies.application import get_app

router = APIRouter()
Expand All @@ -22,23 +22,30 @@ async def health_check():
return f"{__name__}.health_check@{datetime.utcnow().isoformat()}"


class _RabbitMQStatus(BaseModel):
initialized: bool
connection_state: bool
class _ComponentStatus(BaseModel):
is_enabled: bool
is_responsive: bool


class _StatusGet(BaseModel):
rabbitmq: _RabbitMQStatus
rabbitmq: _ComponentStatus
ec2: _ComponentStatus


@router.get("/status", include_in_schema=True, response_model=_StatusGet)
async def get_status(app: FastAPI = Depends(get_app)) -> _StatusGet:

return _StatusGet(
rabbitmq=_RabbitMQStatus(
initialized=bool(app.state.rabbitmq_client),
connection_state=await get_rabbitmq_client(app).ping()
rabbitmq=_ComponentStatus(
is_enabled=bool(app.state.rabbitmq_client),
is_responsive=await get_rabbitmq_client(app).ping()
if app.state.rabbitmq_client
else False,
)
),
ec2=_ComponentStatus(
is_enabled=bool(app.state.ec2_client),
is_responsive=await app.state.ec2_client.ping()
if app.state.ec2_client
else False,
),
)
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
)
from ..api.routes import setup_api_routes
from ..dynamic_scaling import setup as setup_background_task
from ..rabbitmq import setup as setup_rabbitmq
from ..modules.ec2 import setup as setup_ec2
from ..modules.rabbitmq import setup as setup_rabbitmq
from .settings import ApplicationSettings

logger = logging.getLogger(__name__)
Expand All @@ -37,6 +38,7 @@ def create_app(settings: ApplicationSettings) -> FastAPI:
# PLUGINS SETUP
setup_api_routes(app)
setup_rabbitmq(app)
setup_ec2(app)
# autoscaler background task
setup_background_task(app)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,22 @@


class AutoscalingRuntimeError(PydanticErrorMixin, RuntimeError):
...
msg_template: str = "Autoscaling unexpected error"


class ConfigurationError(AutoscalingRuntimeError):
code = "autoscaling.configuration_error"
msg_template: str = "Application misconfiguration: {msg}"


class Ec2NotConnectedError(AutoscalingRuntimeError):
msg_template: str = "Cannot connect with ec2 server"


class Ec2InstanceNotFoundError(AutoscalingRuntimeError):
code = "autoscaling.ec2_instance_not_found_error"
msg_template: str = "Needed instance was not found"


class Ec2TooManyInstancesError(AutoscalingRuntimeError):
code = "autoscaling.ec2_too_many_instances_error"
msg_template: str = (
"The maximum amount of instances {num_instances} is already reached!"
)
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@
from pydantic import parse_obj_as
from types_aiobotocore_ec2.literals import InstanceTypeType

from . import utils_aws, utils_docker
from . import utils_docker
from ._meta import VERSION
from .core.errors import Ec2InstanceNotFoundError
from .core.settings import ApplicationSettings
from .utils import rabbitmq
from .modules.ec2 import get_ec2_client
from .utils import ec2, rabbitmq

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -61,10 +62,10 @@ async def check_dynamic_resources(app: FastAPI) -> None:

assert app_settings.AUTOSCALING_EC2_ACCESS # nosec
assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec
list_of_ec2_instances = await utils_aws.get_ec2_instance_capabilities(
app_settings.AUTOSCALING_EC2_ACCESS, app_settings.AUTOSCALING_EC2_INSTANCES
ec2_client = get_ec2_client(app)
list_of_ec2_instances = await ec2_client.get_ec2_instance_capabilities(
app_settings.AUTOSCALING_EC2_INSTANCES
)

for task in pending_tasks:
await rabbitmq.post_log_message(
app,
Expand All @@ -74,18 +75,17 @@ async def check_dynamic_resources(app: FastAPI) -> None:
)
try:
ec2_instances_needed = [
utils_aws.find_best_fitting_ec2_instance(
ec2.find_best_fitting_ec2_instance(
list_of_ec2_instances,
utils_docker.get_max_resources_from_docker_task(task),
score_type=utils_aws.closest_instance_policy,
score_type=ec2.closest_instance_policy,
)
]
assert app_settings.AUTOSCALING_EC2_ACCESS # nosec
assert app_settings.AUTOSCALING_NODES_MONITORING # nosec

logger.debug("%s", f"{ec2_instances_needed[0]=}")
new_instance_dns_name = await utils_aws.start_aws_instance(
app_settings.AUTOSCALING_EC2_ACCESS,
new_instance_dns_name = await ec2_client.start_aws_instance(
app_settings.AUTOSCALING_EC2_INSTANCES,
instance_type=parse_obj_as(
InstanceTypeType, ec2_instances_needed[0].name
Expand Down
Empty file.
218 changes: 218 additions & 0 deletions services/autoscaling/src/simcore_service_autoscaling/modules/ec2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
import contextlib
import logging
from dataclasses import dataclass
from typing import Optional, cast

import aioboto3
from aiobotocore.session import ClientCreatorContext
from fastapi import FastAPI
from pydantic import ByteSize, parse_obj_as
from servicelib.logging_utils import log_context
from tenacity._asyncio import AsyncRetrying
from tenacity.before_sleep import before_sleep_log
from tenacity.stop import stop_after_delay
from tenacity.wait import wait_random_exponential
from types_aiobotocore_ec2 import EC2Client
from types_aiobotocore_ec2.literals import InstanceTypeType
from types_aiobotocore_ec2.type_defs import ReservationTypeDef

from ..core.errors import (
ConfigurationError,
Ec2NotConnectedError,
Ec2TooManyInstancesError,
)
from ..core.settings import EC2InstancesSettings, EC2Settings
from ..models import EC2Instance
from ..utils.ec2 import compose_user_data

InstancePrivateDNSName = str

logger = logging.getLogger(__name__)


def _is_ec2_instance_running(instance: ReservationTypeDef):
return (
instance.get("Instances", [{}])[0].get("State", {}).get("Name", "not_running")
== "running"
)


@dataclass
class AutoscalingEC2:
client: EC2Client
session: aioboto3.Session
exit_stack: contextlib.AsyncExitStack

@classmethod
async def create(cls, settings: EC2Settings) -> "AutoscalingEC2":
session = aioboto3.Session()
session_client = session.client(
"ec2",
endpoint_url=settings.EC2_ENDPOINT,
aws_access_key_id=settings.EC2_ACCESS_KEY_ID,
aws_secret_access_key=settings.EC2_SECRET_ACCESS_KEY,
region_name=settings.EC2_REGION_NAME,
)
assert isinstance(session_client, ClientCreatorContext) # nosec
exit_stack = contextlib.AsyncExitStack()
ec2_client = cast(
EC2Client, await exit_stack.enter_async_context(session_client)
)
return cls(ec2_client, session, exit_stack)

async def close(self) -> None:
await self.exit_stack.aclose()

async def ping(self) -> bool:
try:
await self.client.describe_account_attributes(DryRun=True)
return True
except Exception: # pylint: disable=broad-except
return False

async def get_ec2_instance_capabilities(
self,
instance_settings: EC2InstancesSettings,
) -> list[EC2Instance]:
instance_types = await self.client.describe_instance_types(
InstanceTypes=cast(
list[InstanceTypeType], instance_settings.EC2_INSTANCES_ALLOWED_TYPES
)
)

list_instances: list[EC2Instance] = []
for instance in instance_types.get("InstanceTypes", []):
with contextlib.suppress(KeyError):
list_instances.append(
EC2Instance(
name=instance["InstanceType"],
cpus=instance["VCpuInfo"]["DefaultVCpus"],
ram=parse_obj_as(
ByteSize, f"{instance['MemoryInfo']['SizeInMiB']}MiB"
),
)
)
return list_instances

async def start_aws_instance(
self,
instance_settings: EC2InstancesSettings,
instance_type: InstanceTypeType,
tags: dict[str, str],
startup_script: str,
) -> InstancePrivateDNSName:
with log_context(
logger,
logging.INFO,
msg=f"launching AWS instance {instance_type} with {tags=}",
):
# first check the max amount is not already reached
if current_instances := await self.client.describe_instances(
Filters=[
{"Name": "tag-key", "Values": [tag_key]} for tag_key in tags.keys()
]
):
if (
len(current_instances.get("Reservations", []))
>= instance_settings.EC2_INSTANCES_MAX_INSTANCES
) and all(
_is_ec2_instance_running(instance)
for instance in current_instances["Reservations"]
):
raise Ec2TooManyInstancesError(
num_instances=instance_settings.EC2_INSTANCES_MAX_INSTANCES
)

instances = await self.client.run_instances(
ImageId=instance_settings.EC2_INSTANCES_AMI_ID,
MinCount=1,
MaxCount=1,
InstanceType=instance_type,
InstanceInitiatedShutdownBehavior="terminate",
KeyName=instance_settings.EC2_INSTANCES_KEY_NAME,
SubnetId=instance_settings.EC2_INSTANCES_SUBNET_ID,
TagSpecifications=[
{
"ResourceType": "instance",
"Tags": [
{"Key": tag_key, "Value": tag_value}
for tag_key, tag_value in tags.items()
],
}
],
UserData=compose_user_data(startup_script),
SecurityGroupIds=instance_settings.EC2_INSTANCES_SECURITY_GROUP_IDS,
)
instance_id = instances["Instances"][0]["InstanceId"]
logger.info(
"New instance launched: %s, waiting for it to start now...", instance_id
)
# wait for the instance to be in a running state
# NOTE: reference to EC2 states https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-lifecycle.html
waiter = self.client.get_waiter("instance_exists")
await waiter.wait(InstanceIds=[instance_id])
logger.info(
"instance %s exists now, waiting for running state...", instance_id
)

waiter = self.client.get_waiter("instance_running")
await waiter.wait(InstanceIds=[instance_id])
logger.info("instance %s is now running", instance_id)

# NOTE: this is currently deactivated as this makes starting an instance
# take between 2-4 minutes more and it seems to be responsive much before
# nevertheless if we get weird errors, this should be activated again!

# waiter = client.get_waiter("instance_status_ok")
# await waiter.wait(InstanceIds=[instance_id])
# logger.info("instance %s status is OK...", instance_id)

# get the private IP
instances = await self.client.describe_instances(InstanceIds=[instance_id])
private_dns_name: str = instances["Reservations"][0]["Instances"][0][
"PrivateDnsName"
]
logger.info(
"instance %s is available on %s, happy computing!!",
instance_id,
private_dns_name,
)
return private_dns_name


def setup(app: FastAPI) -> None:
async def on_startup() -> None:
app.state.ec2_client = None
settings: Optional[EC2Settings] = app.state.settings.AUTOSCALING_EC2_ACCESS

if not settings:
logger.warning("EC2 client is de-activated in the settings")
return

app.state.ec2_client = client = await AutoscalingEC2.create(settings)

async for attempt in AsyncRetrying(
reraise=True,
stop=stop_after_delay(120),
wait=wait_random_exponential(max=30),
before_sleep=before_sleep_log(logger, logging.WARNING),
):
with attempt:
connected = await client.ping()
if not connected:
raise Ec2NotConnectedError()

async def on_shutdown() -> None:
if app.state.ec2_client:
await cast(AutoscalingEC2, app.state.ec2_client).close()

app.add_event_handler("startup", on_startup)
app.add_event_handler("shutdown", on_shutdown)


def get_ec2_client(app: FastAPI) -> AutoscalingEC2:
if not app.state.ec2_client:
raise ConfigurationError(
msg="EC2 client is not available. Please check the configuration."
)
return cast(AutoscalingEC2, app.state.ec2_client)
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from servicelib.rabbitmq_utils import wait_till_rabbitmq_responsive
from settings_library.rabbit import RabbitSettings

from .core.errors import ConfigurationError
from ..core.errors import ConfigurationError

logger = logging.getLogger(__name__)

Expand Down
Loading

0 comments on commit c08c261

Please sign in to comment.