Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛Dynamic autoscaling: not scaling beside 1 machine #5026

15 changes: 14 additions & 1 deletion services/autoscaling/src/simcore_service_autoscaling/models.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,23 @@
from dataclasses import dataclass, field
from typing import Any, TypeAlias

from aws_library.ec2.models import EC2InstanceData
from aws_library.ec2.models import EC2InstanceData, EC2InstanceType, Resources
from models_library.generated_models.docker_rest_api import Node


@dataclass(frozen=True, kw_only=True)
class AssignedTasksToInstance:
instance: EC2InstanceData
available_resources: Resources
assigned_tasks: list


@dataclass(frozen=True, kw_only=True)
class AssignedTasksToInstanceType:
instance_type: EC2InstanceType
assigned_tasks: list


@dataclass(frozen=True)
class AssociatedInstance:
node: Node
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,12 @@
Ec2TooManyInstancesError,
)
from ..core.settings import ApplicationSettings, get_application_settings
from ..models import AssociatedInstance, Cluster
from ..models import (
AssignedTasksToInstance,
AssignedTasksToInstanceType,
AssociatedInstance,
Cluster,
)
from ..utils import utils_docker, utils_ec2
from ..utils.auto_scaling_core import (
associate_ec2_instances_with_nodes,
Expand Down Expand Up @@ -268,74 +273,108 @@ async def _activate_drained_nodes(
)


async def _try_assign_tasks_to_instances(
app: FastAPI,
task,
auto_scaling_mode: BaseAutoscaling,
task_defined_ec2_type: InstanceTypeType | None,
active_instances_to_tasks: list[AssignedTasksToInstance],
pending_instances_to_tasks: list[AssignedTasksToInstance],
drained_instances_to_tasks: list[AssignedTasksToInstance],
needed_new_instance_types_for_tasks: list[AssignedTasksToInstanceType],
) -> bool:
(
filtered_active_instance_to_task,
filtered_pending_instance_to_task,
filtered_drained_instances_to_task,
filtered_needed_new_instance_types_to_task,
) = filter_by_task_defined_instance(
task_defined_ec2_type,
active_instances_to_tasks,
pending_instances_to_tasks,
drained_instances_to_tasks,
needed_new_instance_types_for_tasks,
)
# try to assign the task to one of the active, pending or net created instances
if (
await auto_scaling_mode.try_assigning_task_to_instances(
app,
task,
filtered_active_instance_to_task,
notify_progress=False,
)
or await auto_scaling_mode.try_assigning_task_to_instances(
app,
task,
filtered_pending_instance_to_task,
notify_progress=True,
)
or await auto_scaling_mode.try_assigning_task_to_instances(
app,
task,
filtered_drained_instances_to_task,
notify_progress=False,
)
or auto_scaling_mode.try_assigning_task_to_instance_types(
task, filtered_needed_new_instance_types_to_task
)
):
return True
return False


async def _find_needed_instances(
app: FastAPI,
pending_tasks: list,
available_ec2_types: list[EC2InstanceType],
cluster: Cluster,
auto_scaling_mode: BaseAutoscaling,
) -> dict[EC2InstanceType, int]:
type_to_instance_map = {t.name: t for t in available_ec2_types}

# 1. check first the pending task needs
active_instances_to_tasks: list[tuple[EC2InstanceData, list]] = [
(i.ec2_instance, []) for i in cluster.active_nodes
active_instances_to_tasks: list[AssignedTasksToInstance] = [
AssignedTasksToInstance(
instance=i.ec2_instance,
assigned_tasks=[],
available_resources=i.ec2_instance.resources
- await auto_scaling_mode.compute_node_used_resources(app, i),
)
for i in cluster.active_nodes
]
pending_instances_to_tasks: list[tuple[EC2InstanceData, list]] = [
(i, []) for i in cluster.pending_ec2s
pending_instances_to_tasks: list[AssignedTasksToInstance] = [
AssignedTasksToInstance(
instance=i, assigned_tasks=[], available_resources=i.resources
)
for i in cluster.pending_ec2s
]
drained_instances_to_tasks: list[tuple[EC2InstanceData, list]] = [
(i.ec2_instance, []) for i in cluster.drained_nodes
drained_instances_to_tasks: list[AssignedTasksToInstance] = [
AssignedTasksToInstance(
instance=i.ec2_instance,
assigned_tasks=[],
available_resources=i.ec2_instance.resources,
)
for i in cluster.drained_nodes
]
needed_new_instance_types_for_tasks: list[tuple[EC2InstanceType, list]] = []
needed_new_instance_types_for_tasks: list[AssignedTasksToInstanceType] = []
for task in pending_tasks:
task_defined_ec2_type = await auto_scaling_mode.get_task_defined_instance(
app, task
)
(
filtered_active_instance_to_task,
filtered_pending_instance_to_task,
filtered_drained_instances_to_task,
filtered_needed_new_instance_types_to_task,
) = filter_by_task_defined_instance(
_logger.info(
"task %s %s",
task,
f"defines ec2 type as {task_defined_ec2_type}"
if task_defined_ec2_type
else "does NOT define ec2 type",
)
if await _try_assign_tasks_to_instances(
app,
task,
auto_scaling_mode,
task_defined_ec2_type,
active_instances_to_tasks,
pending_instances_to_tasks,
drained_instances_to_tasks,
needed_new_instance_types_for_tasks,
)

# try to assign the task to one of the active, pending or net created instances
_logger.debug(
"Try to assign %s to any active/pending/created instance in the %s",
f"{task}",
f"{cluster=}",
)
if (
await auto_scaling_mode.try_assigning_task_to_instances(
app,
task,
filtered_active_instance_to_task,
type_to_instance_map,
notify_progress=False,
)
or await auto_scaling_mode.try_assigning_task_to_instances(
app,
task,
filtered_pending_instance_to_task,
type_to_instance_map,
notify_progress=True,
)
or await auto_scaling_mode.try_assigning_task_to_instances(
app,
task,
filtered_drained_instances_to_task,
type_to_instance_map,
notify_progress=False,
)
or auto_scaling_mode.try_assigning_task_to_instance_types(
task, filtered_needed_new_instance_types_to_task
)
):
continue

Expand All @@ -346,15 +385,23 @@ async def _find_needed_instances(
defined_ec2 = find_selected_instance_type_for_task(
task_defined_ec2_type, available_ec2_types, auto_scaling_mode, task
)
needed_new_instance_types_for_tasks.append((defined_ec2, [task]))
needed_new_instance_types_for_tasks.append(
AssignedTasksToInstanceType(
instance_type=defined_ec2, assigned_tasks=[task]
)
)
else:
# we go for best fitting type
best_ec2_instance = utils_ec2.find_best_fitting_ec2_instance(
available_ec2_types,
auto_scaling_mode.get_max_resources_from_task(task),
score_type=utils_ec2.closest_instance_policy,
)
needed_new_instance_types_for_tasks.append((best_ec2_instance, [task]))
needed_new_instance_types_for_tasks.append(
AssignedTasksToInstanceType(
instance_type=best_ec2_instance, assigned_tasks=[task]
)
)
except Ec2InstanceNotFoundError:
_logger.exception(
"Task %s needs more resources than any EC2 instance "
Expand All @@ -365,7 +412,10 @@ async def _find_needed_instances(
_logger.exception("Unexpected error:")

num_instances_per_type = collections.defaultdict(
int, collections.Counter(t for t, _ in needed_new_instance_types_for_tasks)
int,
collections.Counter(
t.instance_type for t in needed_new_instance_types_for_tasks
),
)

# 2. check the buffer needs
Expand All @@ -379,9 +429,7 @@ async def _find_needed_instances(
) > 0:
# check if some are already pending
remaining_pending_instances = [
instance
for instance, assigned_tasks in pending_instances_to_tasks
if not assigned_tasks
i.instance for i in pending_instances_to_tasks if not i.assigned_tasks
]
if len(remaining_pending_instances) < (
app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER
Expand Down Expand Up @@ -622,10 +670,14 @@ async def _autoscale_cluster(
) -> Cluster:
# 1. check if we have pending tasks and resolve them by activating some drained nodes
unrunnable_tasks = await auto_scaling_mode.list_unrunnable_tasks(app)
_logger.info("found %s unrunnable tasks", len(unrunnable_tasks))
# 2. try to activate drained nodes to cover some of the tasks
still_unrunnable_tasks, cluster = await _activate_drained_nodes(
app, cluster, unrunnable_tasks, auto_scaling_mode
)
_logger.info(
"still %s unrunnable tasks after node activation", len(still_unrunnable_tasks)
)

# let's check if there are still pending tasks or if the reserve was used
app_settings = get_application_settings(app)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
from abc import ABC, abstractmethod
from collections.abc import Iterable
from dataclasses import dataclass

from aws_library.ec2.models import EC2InstanceData, EC2InstanceType, Resources
from aws_library.ec2.models import EC2InstanceData, Resources
from fastapi import FastAPI
from models_library.docker import DockerLabelKey
from models_library.generated_models.docker_rest_api import Node as DockerNode
from servicelib.logging_utils import LogLevelInt
from types_aiobotocore_ec2.literals import InstanceTypeType

from ..models import AssociatedInstance
from ..models import (
AssignedTasksToInstance,
AssignedTasksToInstanceType,
AssociatedInstance,
)


@dataclass
Expand Down Expand Up @@ -48,8 +51,7 @@ def try_assigning_task_to_node(
async def try_assigning_task_to_instances(
app: FastAPI,
pending_task,
instances_to_tasks: Iterable[tuple[EC2InstanceData, list]],
type_to_instance_map: dict[str, EC2InstanceType],
instances_to_tasks: list[AssignedTasksToInstance],
*,
notify_progress: bool
) -> bool:
Expand All @@ -59,7 +61,7 @@ async def try_assigning_task_to_instances(
@abstractmethod
def try_assigning_task_to_instance_types(
pending_task,
instance_types_to_tasks: Iterable[tuple[EC2InstanceType, list]],
instance_types_to_tasks: list[AssignedTasksToInstanceType],
) -> bool:
...

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging
from collections.abc import Iterable

from aws_library.ec2.models import EC2InstanceData, EC2InstanceType, Resources
from aws_library.ec2.models import EC2InstanceData, Resources
from fastapi import FastAPI
from models_library.docker import (
DOCKER_TASK_EC2_INSTANCE_TYPE_PLACEMENT_CONSTRAINT_KEY,
Expand All @@ -15,7 +15,12 @@
from types_aiobotocore_ec2.literals import InstanceTypeType

from ..core.settings import get_application_settings
from ..models import AssociatedInstance, DaskTask
from ..models import (
AssignedTasksToInstance,
AssignedTasksToInstanceType,
AssociatedInstance,
DaskTask,
)
from ..utils import computational_scaling as utils
from ..utils import utils_docker, utils_ec2
from . import dask
Expand Down Expand Up @@ -65,12 +70,10 @@ def try_assigning_task_to_node(
async def try_assigning_task_to_instances(
app: FastAPI,
pending_task,
instances_to_tasks: Iterable[tuple[EC2InstanceData, list]],
type_to_instance_map: dict[str, EC2InstanceType],
instances_to_tasks: list[AssignedTasksToInstance],
*,
notify_progress: bool
) -> bool:
assert type_to_instance_map # nosec
return await utils.try_assigning_task_to_instances(
app,
pending_task,
Expand All @@ -81,7 +84,7 @@ async def try_assigning_task_to_instances(
@staticmethod
def try_assigning_task_to_instance_types(
pending_task,
instance_types_to_tasks: Iterable[tuple[EC2InstanceType, list]],
instance_types_to_tasks: list[AssignedTasksToInstanceType],
) -> bool:
return utils.try_assigning_task_to_instance_types(
pending_task, instance_types_to_tasks
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
from collections.abc import Iterable

from aws_library.ec2.models import EC2InstanceData, EC2InstanceType, Resources
from aws_library.ec2.models import EC2InstanceData, Resources
from fastapi import FastAPI
from models_library.docker import DockerLabelKey
from models_library.generated_models.docker_rest_api import Node, Task
from servicelib.logging_utils import LogLevelInt
from types_aiobotocore_ec2.literals import InstanceTypeType

from ..core.settings import get_application_settings
from ..models import AssociatedInstance
from ..models import (
AssignedTasksToInstance,
AssignedTasksToInstanceType,
AssociatedInstance,
)
from ..utils import dynamic_scaling as utils
from ..utils import utils_docker, utils_ec2
from ..utils.rabbitmq import log_tasks_message, progress_tasks_message
Expand Down Expand Up @@ -57,25 +61,23 @@ def try_assigning_task_to_node(
async def try_assigning_task_to_instances(
app: FastAPI,
pending_task,
instances_to_tasks: Iterable[tuple[EC2InstanceData, list]],
type_to_instance_map: dict[str, EC2InstanceType],
instances_to_tasks: list[AssignedTasksToInstance],
*,
notify_progress: bool
) -> bool:
return await utils.try_assigning_task_to_pending_instances(
return await utils.try_assigning_task_to_instances(
app,
pending_task,
instances_to_tasks,
type_to_instance_map,
notify_progress=notify_progress,
)

@staticmethod
def try_assigning_task_to_instance_types(
pending_task,
instance_types_to_tasks: Iterable[tuple[EC2InstanceType, list]],
instance_types_to_tasks: list[AssignedTasksToInstanceType],
) -> bool:
return utils.try_assigning_task_to_instances(
return utils.try_assigning_task_to_instance_types(
pending_task, instance_types_to_tasks
)

Expand Down
Loading