Skip to content

Commit

Permalink
✨Computational autoscaling: find out which EC2 type is necessary (#4975)
Browse files Browse the repository at this point in the history
  • Loading branch information
sanderegg authored Nov 8, 2023
1 parent b7f43af commit 0e2a8b9
Show file tree
Hide file tree
Showing 33 changed files with 779 additions and 337 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from typing import Final

DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY: Final[str] = "EC2-INSTANCE-TYPE"
5 changes: 5 additions & 0 deletions packages/models-library/src/models_library/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ class DockerGenericTag(ConstrainedStr):
_UNDEFINED_LABEL_VALUE_INT: Final[str] = "0"


DOCKER_TASK_EC2_INSTANCE_TYPE_PLACEMENT_CONSTRAINT_KEY: Final[
DockerLabelKey
] = parse_obj_as(DockerLabelKey, "ec2-instance-type")


def to_simcore_runtime_docker_label_key(key: str) -> DockerLabelKey:
return DockerLabelKey(
f"{_SIMCORE_RUNTIME_DOCKER_LABEL_PREFIX}{key.replace('_', '-').lower()}"
Expand Down
4 changes: 2 additions & 2 deletions services/autoscaling/requirements/_tools.txt
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ tomli==2.0.1
# pip-tools
# pylint
# pyproject-hooks
tomlkit==0.12.1
tomlkit==0.12.2
# via pylint
typing-extensions==4.5.0
typing-extensions==4.8.0
# via
# -c requirements/_base.txt
# -c requirements/_test.txt
Expand Down
1 change: 1 addition & 0 deletions services/autoscaling/requirements/ci.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
../../packages/pytest-simcore
../../packages/service-library[fastapi]
../../packages/settings-library
../../packages/dask-task-models-library

# installs current package
.
1 change: 1 addition & 0 deletions services/autoscaling/requirements/dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
--editable ../../packages/pytest-simcore
--editable ../../packages/service-library[fastapi]
--editable ../../packages/settings-library
--editable ../../packages/dask-task-models-library

# installs current package
--editable .
1 change: 1 addition & 0 deletions services/autoscaling/requirements/prod.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@
../../packages/models-library
../../packages/service-library[fastapi]
../../packages/settings-library
../../packages/dask-task-models-library
# installs current package
.
4 changes: 4 additions & 0 deletions services/autoscaling/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,7 @@ commit_args = --no-verify

[tool:pytest]
asyncio_mode = auto
markers =
slow: marks tests as slow (deselect with '-m "not slow"')
acceptance_test: "marks tests as 'acceptance tests' i.e. does the system do what the user expects? Typically those are workflows."
testit: "marks test to run during development"
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ class Ec2InstanceNotFoundError(AutoscalingRuntimeError):
msg_template: str = "EC2 instance was not found"


class Ec2InstanceInvalidError(AutoscalingRuntimeError):
msg_template: str = "Invalid EC2 defined: {msg}"


class Ec2TooManyInstancesError(AutoscalingRuntimeError):
msg_template: str = (
"The maximum amount of instances {num_instances} is already reached!"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class EC2InstancesSettings(BaseCustomSettings):
)
EC2_INSTANCES_TIME_BEFORE_TERMINATION: datetime.timedelta = Field(
default=datetime.timedelta(minutes=1),
description="Time after which an EC2 instance may be terminated (repeat every hour, min 0, max 59 minutes)"
description="Time after which an EC2 instance may be terminated (0<=T<=59 minutes, is automatically capped)"
"(default to seconds, or see https://pydantic-docs.helpmanual.io/usage/types/#datetime-types for string formating)",
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ def create_as_empty(cls) -> "Resources":
def __ge__(self, other: "Resources") -> bool:
return self.cpus >= other.cpus and self.ram >= other.ram

def __gt__(self, other: "Resources") -> bool:
return self.cpus > other.cpus or self.ram > other.ram

def __add__(self, other: "Resources") -> "Resources":
return Resources.construct(
**{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import logging
from typing import cast

import arrow
from fastapi import FastAPI
from models_library.generated_models.docker_rest_api import (
Availability,
Expand All @@ -17,6 +18,8 @@
from types_aiobotocore_ec2.literals import InstanceTypeType

from ..core.errors import (
DaskWorkerNotFoundError,
Ec2InstanceInvalidError,
Ec2InstanceNotFoundError,
Ec2InvalidDnsNameError,
Ec2TooManyInstancesError,
Expand All @@ -29,10 +32,12 @@
EC2InstanceType,
Resources,
)
from ..utils import ec2, utils_docker
from ..utils import utils_docker, utils_ec2
from ..utils.auto_scaling_core import (
associate_ec2_instances_with_nodes,
ec2_startup_script,
filter_by_task_defined_instance,
find_selected_instance_type_for_task,
node_host_name_from_ec2_private_dns,
)
from ..utils.rabbitmq import post_autoscaling_status_message
Expand Down Expand Up @@ -106,7 +111,10 @@ def _node_not_ready(node: Node) -> bool:


async def _cleanup_disconnected_nodes(app: FastAPI, cluster: Cluster) -> Cluster:
await utils_docker.remove_nodes(get_docker_client(app), cluster.disconnected_nodes)
if cluster.disconnected_nodes:
await utils_docker.remove_nodes(
get_docker_client(app), cluster.disconnected_nodes
)
return dataclasses.replace(cluster, disconnected_nodes=[])


Expand All @@ -128,13 +136,13 @@ async def _try_attach_pending_ec2s(
new_node = await utils_docker.tag_node(
get_docker_client(app),
new_node,
tags=auto_scaling_mode.get_new_node_docker_tags(app),
tags=auto_scaling_mode.get_new_node_docker_tags(app, instance_data),
available=False,
)
new_found_instances.append(AssociatedInstance(new_node, instance_data))
else:
still_pending_ec2s.append(instance_data)
except Ec2InvalidDnsNameError:
except Ec2InvalidDnsNameError: # noqa: PERF203
_logger.exception("Unexpected EC2 private dns")
# NOTE: first provision the reserve drained nodes if possible
all_drained_nodes = (
Expand Down Expand Up @@ -177,6 +185,27 @@ def _sort_according_to_allowed_types(instance_type: EC2InstanceType) -> int:
return allowed_instance_types


async def _activate_and_notify(
app: FastAPI,
auto_scaling_mode: BaseAutoscaling,
drained_node: AssociatedInstance,
tasks: list,
) -> list:
await asyncio.gather(
utils_docker.set_node_availability(
get_docker_client(app), drained_node.node, available=True
),
auto_scaling_mode.log_message_from_tasks(
app,
tasks,
"cluster adjusted, service should start shortly...",
level=logging.INFO,
),
auto_scaling_mode.progress_message_from_tasks(app, tasks, progress=1.0),
)
return tasks


async def _activate_drained_nodes(
app: FastAPI,
cluster: Cluster,
Expand Down Expand Up @@ -210,28 +239,12 @@ async def _activate_drained_nodes(
if assigned_tasks
]

async def _activate_and_notify(
drained_node: AssociatedInstance, tasks: list
) -> list:
await asyncio.gather(
*(
utils_docker.set_node_availability(
get_docker_client(app), drained_node.node, available=True
),
auto_scaling_mode.log_message_from_tasks(
app,
tasks,
"cluster adjusted, service should start shortly...",
level=logging.INFO,
),
auto_scaling_mode.progress_message_from_tasks(app, tasks, progress=1.0),
)
)
return tasks

# activate these nodes now
await asyncio.gather(
*(_activate_and_notify(node, tasks) for node, tasks in nodes_to_activate)
*(
_activate_and_notify(app, auto_scaling_mode, node, tasks)
for node, tasks in nodes_to_activate
)
)
new_active_nodes = [node for node, _ in nodes_to_activate]
new_active_node_ids = {node.ec2_instance.id for node in new_active_nodes}
Expand Down Expand Up @@ -263,50 +276,74 @@ async def _find_needed_instances(
type_to_instance_map = {t.name: t for t in available_ec2_types}

# 1. check first the pending task needs
active_instance_to_tasks: list[tuple[EC2InstanceData, list]] = [
active_instances_to_tasks: list[tuple[EC2InstanceData, list]] = [
(i.ec2_instance, []) for i in cluster.active_nodes
]
pending_instance_to_tasks: list[tuple[EC2InstanceData, list]] = [
pending_instances_to_tasks: list[tuple[EC2InstanceData, list]] = [
(i, []) for i in cluster.pending_ec2s
]
needed_new_instance_types_for_tasks: list[tuple[EC2InstanceType, list]] = []
for task in pending_tasks:
if await auto_scaling_mode.try_assigning_task_to_pending_instances(
app,
task,
active_instance_to_tasks,
type_to_instance_map,
notify_progress=False,
):
continue
if await auto_scaling_mode.try_assigning_task_to_pending_instances(
app,
task,
pending_instance_to_tasks,
type_to_instance_map,
notify_progress=True,
):
continue
task_defined_ec2_type = await auto_scaling_mode.get_task_defined_instance(
app, task
)
(
filtered_active_instance_to_task,
filtered_pending_instance_to_task,
filtered_needed_new_instance_types_to_task,
) = filter_by_task_defined_instance(
task_defined_ec2_type,
active_instances_to_tasks,
pending_instances_to_tasks,
needed_new_instance_types_for_tasks,
)

if auto_scaling_mode.try_assigning_task_to_instance_types(
task, needed_new_instance_types_for_tasks
# try to assign the task to one of the active, pending or net created instances
if (
await auto_scaling_mode.try_assigning_task_to_instances(
app,
task,
filtered_active_instance_to_task,
type_to_instance_map,
notify_progress=False,
)
or await auto_scaling_mode.try_assigning_task_to_instances(
app,
task,
filtered_pending_instance_to_task,
type_to_instance_map,
notify_progress=True,
)
or auto_scaling_mode.try_assigning_task_to_instance_types(
task, filtered_needed_new_instance_types_to_task
)
):
continue

# so we need to find what we can create now
try:
# we need a new instance, let's find one
best_ec2_instance = ec2.find_best_fitting_ec2_instance(
available_ec2_types,
auto_scaling_mode.get_max_resources_from_task(task),
score_type=ec2.closest_instance_policy,
)
needed_new_instance_types_for_tasks.append((best_ec2_instance, [task]))
# check if exact instance type is needed first
if task_defined_ec2_type:
defined_ec2 = find_selected_instance_type_for_task(
task_defined_ec2_type, available_ec2_types, auto_scaling_mode, task
)
needed_new_instance_types_for_tasks.append((defined_ec2, [task]))
else:
# we go for best fitting type
best_ec2_instance = utils_ec2.find_best_fitting_ec2_instance(
available_ec2_types,
auto_scaling_mode.get_max_resources_from_task(task),
score_type=utils_ec2.closest_instance_policy,
)
needed_new_instance_types_for_tasks.append((best_ec2_instance, [task]))
except Ec2InstanceNotFoundError:
_logger.exception(
"Task %s needs more resources than any EC2 instance "
"can provide with the current configuration. Please check.",
"can provide with the current configuration. Please check!",
f"{task}",
)
except Ec2InstanceInvalidError:
_logger.exception("Unexpected error:")

num_instances_per_type = collections.defaultdict(
int, collections.Counter(t for t, _ in needed_new_instance_types_for_tasks)
Expand All @@ -324,7 +361,7 @@ async def _find_needed_instances(
# check if some are already pending
remaining_pending_instances = [
instance
for instance, assigned_tasks in pending_instance_to_tasks
for instance, assigned_tasks in pending_instances_to_tasks
if not assigned_tasks
]
if len(remaining_pending_instances) < (
Expand Down Expand Up @@ -438,16 +475,19 @@ async def _deactivate_empty_nodes(
active_empty_nodes: list[AssociatedInstance] = []
active_non_empty_nodes: list[AssociatedInstance] = []
for instance in cluster.active_nodes:
if (
await auto_scaling_mode.compute_node_used_resources(
try:
node_used_resources = await auto_scaling_mode.compute_node_used_resources(
app,
instance,
)
== Resources.create_as_empty()
):
active_empty_nodes.append(instance)
else:
active_non_empty_nodes.append(instance)
if node_used_resources == Resources.create_as_empty():
active_empty_nodes.append(instance)
else:
active_non_empty_nodes.append(instance)
except DaskWorkerNotFoundError: # noqa: PERF203
_logger.exception(
"EC2 node instance is not registered to dask-scheduler! TIP: Needs investigation"
)

# drain this empty nodes
await asyncio.gather(
Expand All @@ -462,7 +502,7 @@ async def _deactivate_empty_nodes(
)
if active_empty_nodes:
_logger.info(
"The following nodes set to drain: '%s'",
"following nodes set to drain: '%s'",
f"{[node.node.Description.Hostname for node in active_empty_nodes if node.node.Description]}",
)
return dataclasses.replace(
Expand All @@ -486,17 +526,14 @@ async def _find_terminateable_instances(
terminateable_nodes: list[AssociatedInstance] = []

for instance in cluster.drained_nodes:
# NOTE: AWS price is hourly based (e.g. same price for a machine used 2 minutes or 1 hour, so we wait until 55 minutes)
elapsed_time_since_launched = (
datetime.datetime.now(datetime.timezone.utc)
- instance.ec2_instance.launch_time
)
elapsed_time_since_full_hour = elapsed_time_since_launched % datetime.timedelta(
hours=1
assert instance.node.UpdatedAt # nosec
node_last_updated = arrow.get(instance.node.UpdatedAt).datetime
elapsed_time_since_drained = (
datetime.datetime.now(datetime.timezone.utc) - node_last_updated
)
if (
elapsed_time_since_full_hour
>= app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_TERMINATION
elapsed_time_since_drained
> app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_TERMINATION
):
# let's terminate that one
terminateable_nodes.append(instance)
Expand All @@ -522,6 +559,7 @@ async def _try_scale_down_cluster(app: FastAPI, cluster: Cluster) -> Cluster:
f"{[i.node.Description.Hostname for i in terminateable_instances if i.node.Description]}",
)
# since these nodes are being terminated, remove them from the swarm

await utils_docker.remove_nodes(
get_docker_client(app),
[i.node for i in terminateable_instances],
Expand Down
Loading

0 comments on commit 0e2a8b9

Please sign in to comment.