Skip to content

Commit

Permalink
renaming
Browse files Browse the repository at this point in the history
  • Loading branch information
sanderegg committed Dec 7, 2022
1 parent 4e97f4a commit 23a7365
Show file tree
Hide file tree
Showing 12 changed files with 375 additions and 376 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
from pydantic import parse_obj_as
from types_aiobotocore_ec2.literals import InstanceTypeType

from . import utils_aws, utils_docker
from . import utils_docker
from ._meta import VERSION
from .core.errors import Ec2InstanceNotFoundError
from .core.settings import ApplicationSettings
from .modules.ec2 import get_ec2_client
from .utils import rabbitmq
from .utils import ec2, rabbitmq

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -63,10 +63,9 @@ async def check_dynamic_resources(app: FastAPI) -> None:
assert app_settings.AUTOSCALING_EC2_ACCESS # nosec
assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec
ec2_client = get_ec2_client(app)
list_of_ec2_instances = await utils_aws.get_ec2_instance_capabilities(
ec2_client, app_settings.AUTOSCALING_EC2_INSTANCES
list_of_ec2_instances = await ec2_client.get_ec2_instance_capabilities(
app_settings.AUTOSCALING_EC2_INSTANCES
)

for task in pending_tasks:
await rabbitmq.post_log_message(
app,
Expand All @@ -76,18 +75,17 @@ async def check_dynamic_resources(app: FastAPI) -> None:
)
try:
ec2_instances_needed = [
utils_aws.find_best_fitting_ec2_instance(
ec2.find_best_fitting_ec2_instance(
list_of_ec2_instances,
utils_docker.get_max_resources_from_docker_task(task),
score_type=utils_aws.closest_instance_policy,
score_type=ec2.closest_instance_policy,
)
]
assert app_settings.AUTOSCALING_EC2_ACCESS # nosec
assert app_settings.AUTOSCALING_NODES_MONITORING # nosec

logger.debug("%s", f"{ec2_instances_needed[0]=}")
new_instance_dns_name = await utils_aws.start_aws_instance(
ec2_client,
new_instance_dns_name = await ec2_client.start_aws_instance(
app_settings.AUTOSCALING_EC2_INSTANCES,
instance_type=parse_obj_as(
InstanceTypeType, ec2_instances_needed[0].name
Expand Down
136 changes: 132 additions & 4 deletions services/autoscaling/src/simcore_service_autoscaling/modules/ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,37 @@
import aioboto3
from aiobotocore.session import ClientCreatorContext
from fastapi import FastAPI
from pydantic import ByteSize, parse_obj_as
from servicelib.logging_utils import log_context
from tenacity._asyncio import AsyncRetrying
from tenacity.before_sleep import before_sleep_log
from tenacity.stop import stop_after_delay
from tenacity.wait import wait_random_exponential
from types_aiobotocore_ec2 import EC2Client
from types_aiobotocore_ec2.literals import InstanceTypeType
from types_aiobotocore_ec2.type_defs import ReservationTypeDef

from ..core.errors import ConfigurationError, Ec2NotConnectedError
from ..core.settings import EC2Settings
from ..core.errors import (
ConfigurationError,
Ec2NotConnectedError,
Ec2TooManyInstancesError,
)
from ..core.settings import EC2InstancesSettings, EC2Settings
from ..models import EC2Instance
from ..utils.ec2 import compose_user_data

InstancePrivateDNSName = str

logger = logging.getLogger(__name__)


def _is_ec2_instance_running(instance: ReservationTypeDef):
return (
instance.get("Instances", [{}])[0].get("State", {}).get("Name", "not_running")
== "running"
)


@dataclass
class AutoscalingEC2:
client: EC2Client
Expand Down Expand Up @@ -51,6 +70,115 @@ async def ping(self) -> bool:
except Exception: # pylint: disable=broad-except
return False

async def get_ec2_instance_capabilities(
self,
instance_settings: EC2InstancesSettings,
) -> list[EC2Instance]:
instance_types = await self.client.describe_instance_types(
InstanceTypes=cast(
list[InstanceTypeType], instance_settings.EC2_INSTANCES_ALLOWED_TYPES
)
)

list_instances: list[EC2Instance] = []
for instance in instance_types.get("InstanceTypes", []):
with contextlib.suppress(KeyError):
list_instances.append(
EC2Instance(
name=instance["InstanceType"],
cpus=instance["VCpuInfo"]["DefaultVCpus"],
ram=parse_obj_as(
ByteSize, f"{instance['MemoryInfo']['SizeInMiB']}MiB"
),
)
)
return list_instances

async def start_aws_instance(
self,
instance_settings: EC2InstancesSettings,
instance_type: InstanceTypeType,
tags: dict[str, str],
startup_script: str,
) -> InstancePrivateDNSName:
with log_context(
logger,
logging.INFO,
msg=f"launching AWS instance {instance_type} with {tags=}",
):
# first check the max amount is not already reached
if current_instances := await self.client.describe_instances(
Filters=[
{"Name": "tag-key", "Values": [tag_key]} for tag_key in tags.keys()
]
):
if (
len(current_instances.get("Reservations", []))
>= instance_settings.EC2_INSTANCES_MAX_INSTANCES
) and all(
_is_ec2_instance_running(instance)
for instance in current_instances["Reservations"]
):
raise Ec2TooManyInstancesError(
num_instances=instance_settings.EC2_INSTANCES_MAX_INSTANCES
)

instances = await self.client.run_instances(
ImageId=instance_settings.EC2_INSTANCES_AMI_ID,
MinCount=1,
MaxCount=1,
InstanceType=instance_type,
InstanceInitiatedShutdownBehavior="terminate",
KeyName=instance_settings.EC2_INSTANCES_KEY_NAME,
SubnetId=instance_settings.EC2_INSTANCES_SUBNET_ID,
TagSpecifications=[
{
"ResourceType": "instance",
"Tags": [
{"Key": tag_key, "Value": tag_value}
for tag_key, tag_value in tags.items()
],
}
],
UserData=compose_user_data(startup_script),
SecurityGroupIds=instance_settings.EC2_INSTANCES_SECURITY_GROUP_IDS,
)
instance_id = instances["Instances"][0]["InstanceId"]
logger.info(
"New instance launched: %s, waiting for it to start now...", instance_id
)
# wait for the instance to be in a running state
# NOTE: reference to EC2 states https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-lifecycle.html
waiter = self.client.get_waiter("instance_exists")
await waiter.wait(InstanceIds=[instance_id])
logger.info(
"instance %s exists now, waiting for running state...", instance_id
)

waiter = self.client.get_waiter("instance_running")
await waiter.wait(InstanceIds=[instance_id])
logger.info("instance %s is now running", instance_id)

# NOTE: this is currently deactivated as this makes starting an instance
# take between 2-4 minutes more and it seems to be responsive much before
# nevertheless if we get weird errors, this should be activated again!

# waiter = client.get_waiter("instance_status_ok")
# await waiter.wait(InstanceIds=[instance_id])
# logger.info("instance %s status is OK...", instance_id)

# get the private IP
instances = await self.client.describe_instances(InstanceIds=[instance_id])
private_dns_name: str = instances["Reservations"][0]["Instances"][0][
"PrivateDnsName"
]
logger.info(
"instance %s is available on %s, happy computing!!",
instance_id,
private_dns_name,
)
return private_dns_name


def setup(app: FastAPI) -> None:
async def on_startup() -> None:
Expand Down Expand Up @@ -82,9 +210,9 @@ async def on_shutdown() -> None:
app.add_event_handler("shutdown", on_shutdown)


def get_ec2_client(app: FastAPI) -> EC2Client:
def get_ec2_client(app: FastAPI) -> AutoscalingEC2:
if not app.state.ec2_client:
raise ConfigurationError(
msg="EC2 client is not available. Please check the configuration."
)
return cast(AutoscalingEC2, app.state.ec2_client).client
return cast(AutoscalingEC2, app.state.ec2_client)
60 changes: 60 additions & 0 deletions services/autoscaling/src/simcore_service_autoscaling/utils/ec2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
""" Free helper functions for AWS API
"""

import logging
from collections import OrderedDict
from textwrap import dedent
from typing import Callable

from ..core.errors import ConfigurationError, Ec2InstanceNotFoundError
from ..models import EC2Instance, Resources

logger = logging.getLogger(__name__)


def compose_user_data(docker_join_bash_command: str) -> str:
return dedent(
f"""\
#!/bin/bash
{docker_join_bash_command}
"""
)


def closest_instance_policy(
ec2_instance: EC2Instance,
resources: Resources,
) -> float:
if ec2_instance.cpus < resources.cpus or ec2_instance.ram < resources.ram:
return 0
# compute a score for all the instances that are above expectations
# best is the exact ec2 instance
cpu_ratio = float(ec2_instance.cpus - resources.cpus) / float(ec2_instance.cpus)
ram_ratio = float(ec2_instance.ram - resources.ram) / float(ec2_instance.ram)
return 100 * (1.0 - cpu_ratio) * (1.0 - ram_ratio)


def find_best_fitting_ec2_instance(
allowed_ec2_instances: list[EC2Instance],
resources: Resources,
score_type: Callable[[EC2Instance, Resources], float] = closest_instance_policy,
) -> EC2Instance:
if not allowed_ec2_instances:
raise ConfigurationError(msg="allowed ec2 instances is missing!")
score_to_ec2_candidate: dict[float, EC2Instance] = OrderedDict(
sorted(
{
score_type(instance, resources): instance
for instance in allowed_ec2_instances
}.items(),
reverse=True,
)
)

score, instance = next(iter(score_to_ec2_candidate.items()))
if score == 0:
raise Ec2InstanceNotFoundError(
needed_resources=resources, msg="no adequate EC2 instance found!"
)
return instance
Loading

0 comments on commit 23a7365

Please sign in to comment.