Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨Computational autoscaling: find out which EC2 type is necessary #4975

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
bcb6110
ensure an issue while taking down does not prevent other to go down
sanderegg Nov 3, 2023
31faa78
adding acceptance test for computational autoscaling, up and down
sanderegg Nov 3, 2023
7a65157
missing update
sanderegg Nov 3, 2023
d9804c8
only call node removal if necessary
sanderegg Nov 3, 2023
5493066
test now checks scaling down as well
sanderegg Nov 3, 2023
9235068
correcty check scaling down
sanderegg Nov 3, 2023
0900e75
add pytest marker
sanderegg Nov 3, 2023
7145b7a
rename
sanderegg Nov 3, 2023
a6a56f8
added dependency to dask-task-models-lib
sanderegg Nov 3, 2023
b1f071c
added constant for EC2 instance type restriction
sanderegg Nov 3, 2023
50e6a00
parametrizing
sanderegg Nov 3, 2023
12be2f2
adding instance type restrictions
sanderegg Nov 3, 2023
d2b4cb9
rename file
sanderegg Nov 6, 2023
57e6ecf
check for exact type name
sanderegg Nov 6, 2023
106525f
fix path
sanderegg Nov 6, 2023
ea9df87
test disallowed types are not allowed
sanderegg Nov 6, 2023
67cdcb2
find out actual ec2 types
sanderegg Nov 6, 2023
22a1aa8
renamed fixture
sanderegg Nov 6, 2023
aeddfa9
list types
sanderegg Nov 6, 2023
0582862
ensure best fitting is disabled if explicit is on
sanderegg Nov 6, 2023
58c6e35
renaming
sanderegg Nov 6, 2023
dd50670
refactor
sanderegg Nov 6, 2023
8c21d3e
functional test
sanderegg Nov 6, 2023
3f27876
added function to find EC2 assignment for dynamic services
sanderegg Nov 6, 2023
29756bd
mypy
sanderegg Nov 6, 2023
81afe61
added label to restrict ec2 type
sanderegg Nov 7, 2023
999d833
fixed test
sanderegg Nov 7, 2023
aa83141
clean
sanderegg Nov 7, 2023
bb20787
now receives app and can check dynamic services for defined ec2
sanderegg Nov 7, 2023
1d926c3
adapted tests
sanderegg Nov 7, 2023
0125a11
rename
sanderegg Nov 7, 2023
f62817b
ensure labels for ec2 type are set on new machines
sanderegg Nov 7, 2023
d749671
previous filter of the types
sanderegg Nov 7, 2023
80f51b6
refactor and simplify
sanderegg Nov 7, 2023
1fd6244
clean
sanderegg Nov 7, 2023
05ba129
refactor
sanderegg Nov 7, 2023
89400f2
adapt test
sanderegg Nov 7, 2023
2110ea4
refactor
sanderegg Nov 7, 2023
ac4e5ed
@pcrespov review: no need to unpack
sanderegg Nov 8, 2023
8a75ee5
added assertion to make it clearer
sanderegg Nov 8, 2023
9d0a3ed
mypy
sanderegg Nov 8, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from typing import Final

DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY: Final[str] = "EC2-INSTANCE-TYPE"
5 changes: 5 additions & 0 deletions packages/models-library/src/models_library/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ class DockerGenericTag(ConstrainedStr):
_UNDEFINED_LABEL_VALUE_INT: Final[str] = "0"


DOCKER_TASK_EC2_INSTANCE_TYPE_PLACEMENT_CONSTRAINT_KEY: Final[
DockerLabelKey
] = parse_obj_as(DockerLabelKey, "ec2-instance-type")


def to_simcore_runtime_docker_label_key(key: str) -> DockerLabelKey:
return DockerLabelKey(
f"{_SIMCORE_RUNTIME_DOCKER_LABEL_PREFIX}{key.replace('_', '-').lower()}"
Expand Down
4 changes: 2 additions & 2 deletions services/autoscaling/requirements/_tools.txt
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ tomli==2.0.1
# pip-tools
# pylint
# pyproject-hooks
tomlkit==0.12.1
tomlkit==0.12.2
# via pylint
typing-extensions==4.5.0
typing-extensions==4.8.0
# via
# -c requirements/_base.txt
# -c requirements/_test.txt
Expand Down
1 change: 1 addition & 0 deletions services/autoscaling/requirements/ci.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
../../packages/pytest-simcore
../../packages/service-library[fastapi]
../../packages/settings-library
../../packages/dask-task-models-library

# installs current package
.
1 change: 1 addition & 0 deletions services/autoscaling/requirements/dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
--editable ../../packages/pytest-simcore
--editable ../../packages/service-library[fastapi]
--editable ../../packages/settings-library
--editable ../../packages/dask-task-models-library

# installs current package
--editable .
1 change: 1 addition & 0 deletions services/autoscaling/requirements/prod.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@
../../packages/models-library
../../packages/service-library[fastapi]
../../packages/settings-library
../../packages/dask-task-models-library
# installs current package
.
4 changes: 4 additions & 0 deletions services/autoscaling/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,7 @@ commit_args = --no-verify

[tool:pytest]
asyncio_mode = auto
markers =
slow: marks tests as slow (deselect with '-m "not slow"')
acceptance_test: "marks tests as 'acceptance tests' i.e. does the system do what the user expects? Typically those are workflows."
testit: "marks test to run during development"
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ class Ec2InstanceNotFoundError(AutoscalingRuntimeError):
msg_template: str = "EC2 instance was not found"


class Ec2InstanceInvalidError(AutoscalingRuntimeError):
msg_template: str = "Invalid EC2 defined: {msg}"


class Ec2TooManyInstancesError(AutoscalingRuntimeError):
msg_template: str = (
"The maximum amount of instances {num_instances} is already reached!"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class EC2InstancesSettings(BaseCustomSettings):
)
EC2_INSTANCES_TIME_BEFORE_TERMINATION: datetime.timedelta = Field(
default=datetime.timedelta(minutes=1),
description="Time after which an EC2 instance may be terminated (repeat every hour, min 0, max 59 minutes)"
description="Time after which an EC2 instance may be terminated (0<=T<=59 minutes, is automatically capped)"
"(default to seconds, or see https://pydantic-docs.helpmanual.io/usage/types/#datetime-types for string formating)",
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ def create_as_empty(cls) -> "Resources":
def __ge__(self, other: "Resources") -> bool:
return self.cpus >= other.cpus and self.ram >= other.ram

def __gt__(self, other: "Resources") -> bool:
sanderegg marked this conversation as resolved.
Show resolved Hide resolved
return self.cpus > other.cpus or self.ram > other.ram

def __add__(self, other: "Resources") -> "Resources":
return Resources.construct(
**{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import logging
from typing import cast

import arrow
from fastapi import FastAPI
from models_library.generated_models.docker_rest_api import (
Availability,
Expand All @@ -17,6 +18,8 @@
from types_aiobotocore_ec2.literals import InstanceTypeType

from ..core.errors import (
DaskWorkerNotFoundError,
Ec2InstanceInvalidError,
Ec2InstanceNotFoundError,
Ec2InvalidDnsNameError,
Ec2TooManyInstancesError,
Expand All @@ -29,10 +32,12 @@
EC2InstanceType,
Resources,
)
from ..utils import ec2, utils_docker
from ..utils import utils_docker, utils_ec2
from ..utils.auto_scaling_core import (
associate_ec2_instances_with_nodes,
ec2_startup_script,
filter_by_task_defined_instance,
find_selected_instance_type_for_task,
node_host_name_from_ec2_private_dns,
)
from ..utils.rabbitmq import post_autoscaling_status_message
Expand Down Expand Up @@ -106,7 +111,10 @@ def _node_not_ready(node: Node) -> bool:


async def _cleanup_disconnected_nodes(app: FastAPI, cluster: Cluster) -> Cluster:
await utils_docker.remove_nodes(get_docker_client(app), cluster.disconnected_nodes)
if cluster.disconnected_nodes:
await utils_docker.remove_nodes(
get_docker_client(app), cluster.disconnected_nodes
)
return dataclasses.replace(cluster, disconnected_nodes=[])


Expand All @@ -128,13 +136,13 @@ async def _try_attach_pending_ec2s(
new_node = await utils_docker.tag_node(
get_docker_client(app),
new_node,
tags=auto_scaling_mode.get_new_node_docker_tags(app),
tags=auto_scaling_mode.get_new_node_docker_tags(app, instance_data),
available=False,
)
new_found_instances.append(AssociatedInstance(new_node, instance_data))
else:
still_pending_ec2s.append(instance_data)
except Ec2InvalidDnsNameError:
except Ec2InvalidDnsNameError: # noqa: PERF203
_logger.exception("Unexpected EC2 private dns")
# NOTE: first provision the reserve drained nodes if possible
all_drained_nodes = (
Expand Down Expand Up @@ -177,6 +185,27 @@ def _sort_according_to_allowed_types(instance_type: EC2InstanceType) -> int:
return allowed_instance_types


async def _activate_and_notify(
app: FastAPI,
auto_scaling_mode: BaseAutoscaling,
drained_node: AssociatedInstance,
tasks: list,
) -> list:
await asyncio.gather(
utils_docker.set_node_availability(
get_docker_client(app), drained_node.node, available=True
),
auto_scaling_mode.log_message_from_tasks(
app,
tasks,
"cluster adjusted, service should start shortly...",
level=logging.INFO,
),
auto_scaling_mode.progress_message_from_tasks(app, tasks, progress=1.0),
)
return tasks


async def _activate_drained_nodes(
app: FastAPI,
cluster: Cluster,
Expand Down Expand Up @@ -210,28 +239,12 @@ async def _activate_drained_nodes(
if assigned_tasks
]

async def _activate_and_notify(
drained_node: AssociatedInstance, tasks: list
) -> list:
await asyncio.gather(
*(
utils_docker.set_node_availability(
get_docker_client(app), drained_node.node, available=True
),
auto_scaling_mode.log_message_from_tasks(
app,
tasks,
"cluster adjusted, service should start shortly...",
level=logging.INFO,
),
auto_scaling_mode.progress_message_from_tasks(app, tasks, progress=1.0),
)
)
return tasks

# activate these nodes now
await asyncio.gather(
*(_activate_and_notify(node, tasks) for node, tasks in nodes_to_activate)
*(
_activate_and_notify(app, auto_scaling_mode, node, tasks)
for node, tasks in nodes_to_activate
)
)
new_active_nodes = [node for node, _ in nodes_to_activate]
new_active_node_ids = {node.ec2_instance.id for node in new_active_nodes}
Expand Down Expand Up @@ -263,50 +276,74 @@ async def _find_needed_instances(
type_to_instance_map = {t.name: t for t in available_ec2_types}

# 1. check first the pending task needs
active_instance_to_tasks: list[tuple[EC2InstanceData, list]] = [
active_instances_to_tasks: list[tuple[EC2InstanceData, list]] = [
(i.ec2_instance, []) for i in cluster.active_nodes
]
pending_instance_to_tasks: list[tuple[EC2InstanceData, list]] = [
pending_instances_to_tasks: list[tuple[EC2InstanceData, list]] = [
(i, []) for i in cluster.pending_ec2s
]
needed_new_instance_types_for_tasks: list[tuple[EC2InstanceType, list]] = []
for task in pending_tasks:
if await auto_scaling_mode.try_assigning_task_to_pending_instances(
app,
task,
active_instance_to_tasks,
type_to_instance_map,
notify_progress=False,
):
continue
if await auto_scaling_mode.try_assigning_task_to_pending_instances(
app,
task,
pending_instance_to_tasks,
type_to_instance_map,
notify_progress=True,
):
continue
task_defined_ec2_type = await auto_scaling_mode.get_task_defined_instance(
app, task
)
(
filtered_active_instance_to_task,
filtered_pending_instance_to_task,
filtered_needed_new_instance_types_to_task,
) = filter_by_task_defined_instance(
task_defined_ec2_type,
active_instances_to_tasks,
pending_instances_to_tasks,
needed_new_instance_types_for_tasks,
)

if auto_scaling_mode.try_assigning_task_to_instance_types(
task, needed_new_instance_types_for_tasks
# try to assign the task to one of the active, pending or net created instances
if (
await auto_scaling_mode.try_assigning_task_to_instances(
app,
task,
filtered_active_instance_to_task,
type_to_instance_map,
notify_progress=False,
)
or await auto_scaling_mode.try_assigning_task_to_instances(
app,
task,
filtered_pending_instance_to_task,
type_to_instance_map,
notify_progress=True,
)
or auto_scaling_mode.try_assigning_task_to_instance_types(
sanderegg marked this conversation as resolved.
Show resolved Hide resolved
task, filtered_needed_new_instance_types_to_task
)
):
continue

# so we need to find what we can create now
try:
# we need a new instance, let's find one
best_ec2_instance = ec2.find_best_fitting_ec2_instance(
available_ec2_types,
auto_scaling_mode.get_max_resources_from_task(task),
score_type=ec2.closest_instance_policy,
)
needed_new_instance_types_for_tasks.append((best_ec2_instance, [task]))
# check if exact instance type is needed first
if task_defined_ec2_type:
defined_ec2 = find_selected_instance_type_for_task(
task_defined_ec2_type, available_ec2_types, auto_scaling_mode, task
)
needed_new_instance_types_for_tasks.append((defined_ec2, [task]))
else:
# we go for best fitting type
best_ec2_instance = utils_ec2.find_best_fitting_ec2_instance(
available_ec2_types,
auto_scaling_mode.get_max_resources_from_task(task),
score_type=utils_ec2.closest_instance_policy,
)
needed_new_instance_types_for_tasks.append((best_ec2_instance, [task]))
except Ec2InstanceNotFoundError:
_logger.exception(
"Task %s needs more resources than any EC2 instance "
"can provide with the current configuration. Please check.",
"can provide with the current configuration. Please check!",
f"{task}",
)
except Ec2InstanceInvalidError:
_logger.exception("Unexpected error:")

num_instances_per_type = collections.defaultdict(
int, collections.Counter(t for t, _ in needed_new_instance_types_for_tasks)
Expand All @@ -324,7 +361,7 @@ async def _find_needed_instances(
# check if some are already pending
remaining_pending_instances = [
instance
for instance, assigned_tasks in pending_instance_to_tasks
for instance, assigned_tasks in pending_instances_to_tasks
if not assigned_tasks
]
if len(remaining_pending_instances) < (
Expand Down Expand Up @@ -438,16 +475,19 @@ async def _deactivate_empty_nodes(
active_empty_nodes: list[AssociatedInstance] = []
active_non_empty_nodes: list[AssociatedInstance] = []
for instance in cluster.active_nodes:
if (
await auto_scaling_mode.compute_node_used_resources(
try:
node_used_resources = await auto_scaling_mode.compute_node_used_resources(
app,
instance,
)
== Resources.create_as_empty()
):
active_empty_nodes.append(instance)
else:
active_non_empty_nodes.append(instance)
if node_used_resources == Resources.create_as_empty():
active_empty_nodes.append(instance)
else:
active_non_empty_nodes.append(instance)
except DaskWorkerNotFoundError: # noqa: PERF203
_logger.exception(
"EC2 node instance is not registered to dask-scheduler! TIP: Needs investigation"
)

# drain this empty nodes
await asyncio.gather(
Expand All @@ -462,7 +502,7 @@ async def _deactivate_empty_nodes(
)
if active_empty_nodes:
_logger.info(
"The following nodes set to drain: '%s'",
"following nodes set to drain: '%s'",
f"{[node.node.Description.Hostname for node in active_empty_nodes if node.node.Description]}",
)
return dataclasses.replace(
Expand All @@ -486,17 +526,14 @@ async def _find_terminateable_instances(
terminateable_nodes: list[AssociatedInstance] = []

for instance in cluster.drained_nodes:
# NOTE: AWS price is hourly based (e.g. same price for a machine used 2 minutes or 1 hour, so we wait until 55 minutes)
elapsed_time_since_launched = (
datetime.datetime.now(datetime.timezone.utc)
- instance.ec2_instance.launch_time
)
elapsed_time_since_full_hour = elapsed_time_since_launched % datetime.timedelta(
hours=1
assert instance.node.UpdatedAt # nosec
node_last_updated = arrow.get(instance.node.UpdatedAt).datetime
elapsed_time_since_drained = (
datetime.datetime.now(datetime.timezone.utc) - node_last_updated
)
if (
elapsed_time_since_full_hour
>= app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_TERMINATION
elapsed_time_since_drained
> app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_TERMINATION
):
# let's terminate that one
terminateable_nodes.append(instance)
Expand All @@ -522,6 +559,7 @@ async def _try_scale_down_cluster(app: FastAPI, cluster: Cluster) -> Cluster:
f"{[i.node.Description.Hostname for i in terminateable_instances if i.node.Description]}",
)
# since these nodes are being terminated, remove them from the swarm

await utils_docker.remove_nodes(
get_docker_client(app),
[i.node for i in terminateable_instances],
Expand Down
Loading
Loading