Skip to content

Commit

Permalink
@pcrespov review: but I prefer dataclass
Browse files Browse the repository at this point in the history
  • Loading branch information
sanderegg committed Nov 14, 2023
1 parent d20e242 commit 9f1e5c8
Show file tree
Hide file tree
Showing 8 changed files with 127 additions and 82 deletions.
14 changes: 12 additions & 2 deletions services/autoscaling/src/simcore_service_autoscaling/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,18 @@
from aws_library.ec2.models import EC2InstanceData, EC2InstanceType, Resources
from models_library.generated_models.docker_rest_api import Node

AssignedTasksToInstance: TypeAlias = tuple[EC2InstanceData, list, Resources]
AssignedTasksToInstanceType: TypeAlias = tuple[EC2InstanceType, list]

@dataclass(frozen=True, kw_only=True)
class AssignedTasksToInstance:
instance: EC2InstanceData
available_resources: Resources
assigned_tasks: list


@dataclass(frozen=True, kw_only=True)
class AssignedTasksToInstanceType:
instance_type: EC2InstanceType
assigned_tasks: list


@dataclass(frozen=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,19 +332,27 @@ async def _find_needed_instances(
) -> dict[EC2InstanceType, int]:
# 1. check first the pending task needs
active_instances_to_tasks: list[AssignedTasksToInstance] = [
(
i.ec2_instance,
[],
i.ec2_instance.resources
AssignedTasksToInstance(
instance=i.ec2_instance,
assigned_tasks=[],
available_resources=i.ec2_instance.resources
- await auto_scaling_mode.compute_node_used_resources(app, i),
)
for i in cluster.active_nodes
]
pending_instances_to_tasks: list[AssignedTasksToInstance] = [
(i, [], i.resources) for i in cluster.pending_ec2s
AssignedTasksToInstance(
instance=i, assigned_tasks=[], available_resources=i.resources
)
for i in cluster.pending_ec2s
]
drained_instances_to_tasks: list[AssignedTasksToInstance] = [
(i.ec2_instance, [], i.ec2_instance.resources) for i in cluster.drained_nodes
AssignedTasksToInstance(
instance=i.ec2_instance,
assigned_tasks=[],
available_resources=i.ec2_instance.resources,
)
for i in cluster.drained_nodes
]
needed_new_instance_types_for_tasks: list[AssignedTasksToInstanceType] = []
for task in pending_tasks:
Expand Down Expand Up @@ -377,15 +385,23 @@ async def _find_needed_instances(
defined_ec2 = find_selected_instance_type_for_task(
task_defined_ec2_type, available_ec2_types, auto_scaling_mode, task
)
needed_new_instance_types_for_tasks.append((defined_ec2, [task]))
needed_new_instance_types_for_tasks.append(
AssignedTasksToInstanceType(
instance_type=defined_ec2, assigned_tasks=[task]
)
)
else:
# we go for best fitting type
best_ec2_instance = utils_ec2.find_best_fitting_ec2_instance(
available_ec2_types,
auto_scaling_mode.get_max_resources_from_task(task),
score_type=utils_ec2.closest_instance_policy,
)
needed_new_instance_types_for_tasks.append((best_ec2_instance, [task]))
needed_new_instance_types_for_tasks.append(
AssignedTasksToInstanceType(
instance_type=best_ec2_instance, assigned_tasks=[task]
)
)
except Ec2InstanceNotFoundError:
_logger.exception(
"Task %s needs more resources than any EC2 instance "
Expand All @@ -396,7 +412,10 @@ async def _find_needed_instances(
_logger.exception("Unexpected error:")

num_instances_per_type = collections.defaultdict(
int, collections.Counter(t for t, _ in needed_new_instance_types_for_tasks)
int,
collections.Counter(
t.instance_type for t in needed_new_instance_types_for_tasks
),
)

# 2. check the buffer needs
Expand All @@ -410,9 +429,7 @@ async def _find_needed_instances(
) > 0:
# check if some are already pending
remaining_pending_instances = [
instance
for instance, assigned_tasks, _ in pending_instances_to_tasks
if not assigned_tasks
i.instance for i in pending_instances_to_tasks if not i.assigned_tasks
]
if len(remaining_pending_instances) < (
app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging
from collections.abc import Iterable

from aws_library.ec2.models import EC2InstanceData, EC2InstanceType, Resources
from aws_library.ec2.models import EC2InstanceData, Resources
from fastapi import FastAPI
from models_library.docker import (
DOCKER_TASK_EC2_INSTANCE_TYPE_PLACEMENT_CONSTRAINT_KEY,
Expand All @@ -15,7 +15,12 @@
from types_aiobotocore_ec2.literals import InstanceTypeType

from ..core.settings import get_application_settings
from ..models import AssignedTasksToInstance, AssociatedInstance, DaskTask
from ..models import (
AssignedTasksToInstance,
AssignedTasksToInstanceType,
AssociatedInstance,
DaskTask,
)
from ..utils import computational_scaling as utils
from ..utils import utils_docker, utils_ec2
from . import dask
Expand Down Expand Up @@ -79,7 +84,7 @@ async def try_assigning_task_to_instances(
@staticmethod
def try_assigning_task_to_instance_types(
pending_task,
instance_types_to_tasks: list[tuple[EC2InstanceType, list]],
instance_types_to_tasks: list[AssignedTasksToInstanceType],
) -> bool:
return utils.try_assigning_task_to_instance_types(
pending_task, instance_types_to_tasks
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,17 +114,15 @@ def _instance_type_by_type_name(
def _instance_type_map_by_type_name(
mapping: AssignedTasksToInstanceType, *, type_name: InstanceTypeType | None
) -> bool:
ec2_type, _ = mapping
return _instance_type_by_type_name(ec2_type, type_name=type_name)
return _instance_type_by_type_name(mapping.instance_type, type_name=type_name)


def _instance_data_map_by_type_name(
mapping: AssignedTasksToInstance, *, type_name: InstanceTypeType | None
) -> bool:
if type_name is None:
return True
ec2_data, *_ = mapping
return bool(ec2_data.type == type_name)
return bool(mapping.instance.type == type_name)


def filter_by_task_defined_instance(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
import datetime
import logging
from collections.abc import Iterable
from typing import Final, TypeAlias
from typing import Final

from aws_library.ec2.models import EC2InstanceData, EC2InstanceType, Resources
from aws_library.ec2.models import Resources
from dask_task_models_library.constants import DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY
from fastapi import FastAPI
from servicelib.utils_formatting import timedelta_as_minute_second
from types_aiobotocore_ec2.literals import InstanceTypeType

from ..core.settings import get_application_settings
from ..models import AssociatedInstance, DaskTask
from ..models import (
AssignedTasksToInstance,
AssignedTasksToInstanceType,
AssociatedInstance,
DaskTask,
)

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -51,15 +56,10 @@ def try_assigning_task_to_node(
return False


AssignedDaskTasksToInstance: TypeAlias = tuple[
EC2InstanceData, list[DaskTask], Resources
]


async def try_assigning_task_to_instances(
app: FastAPI,
pending_task: DaskTask,
instances_to_tasks: list[AssignedDaskTasksToInstance],
instances_to_tasks: list[AssignedTasksToInstance],
*,
notify_progress: bool,
) -> bool:
Expand All @@ -68,23 +68,23 @@ async def try_assigning_task_to_instances(
instance_max_time_to_start = (
app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_START_TIME
)
for (
instance,
instance_assigned_tasks,
instance_available_resources,
) in instances_to_tasks:
for assigned_tasks_to_instance in instances_to_tasks:
tasks_needed_resources = _compute_tasks_needed_resources(
instance_assigned_tasks
assigned_tasks_to_instance.assigned_tasks
)
if (
instance_available_resources - tasks_needed_resources
assigned_tasks_to_instance.available_resources - tasks_needed_resources
) >= get_max_resources_from_dask_task(pending_task):
instance_assigned_tasks.append(pending_task)
assigned_tasks_to_instance.assigned_tasks.append(pending_task)
if notify_progress:
now = datetime.datetime.now(datetime.timezone.utc)
time_since_launch = now - instance.launch_time
time_since_launch = (
now - assigned_tasks_to_instance.instance.launch_time
)
estimated_time_to_completion = (
instance.launch_time + instance_max_time_to_start - now
assigned_tasks_to_instance.instance.launch_time
+ instance_max_time_to_start
- now
)
_logger.info(
"LOG: %s",
Expand All @@ -102,16 +102,19 @@ async def try_assigning_task_to_instances(

def try_assigning_task_to_instance_types(
pending_task: DaskTask,
instance_types_to_tasks: list[tuple[EC2InstanceType, list[DaskTask]]],
instance_types_to_tasks: list[AssignedTasksToInstanceType],
) -> bool:
for instance, instance_assigned_tasks in instance_types_to_tasks:
instance_total_resource = Resources(cpus=instance.cpus, ram=instance.ram)
for assigned_tasks_to_instance_type in instance_types_to_tasks:
instance_total_resource = Resources(
cpus=assigned_tasks_to_instance_type.instance_type.cpus,
ram=assigned_tasks_to_instance_type.instance_type.ram,
)
tasks_needed_resources = _compute_tasks_needed_resources(
instance_assigned_tasks
assigned_tasks_to_instance_type.assigned_tasks
)
if (
instance_total_resource - tasks_needed_resources
) >= get_max_resources_from_dask_task(pending_task):
instance_assigned_tasks.append(pending_task)
assigned_tasks_to_instance_type.assigned_tasks.append(pending_task)
return True
return False
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
import datetime
import logging
from collections.abc import Iterable
from typing import TypeAlias

from aws_library.ec2.models import EC2InstanceData, EC2InstanceType, Resources
from aws_library.ec2.models import Resources
from fastapi import FastAPI
from models_library.generated_models.docker_rest_api import Task
from servicelib.utils_formatting import timedelta_as_minute_second

from ..core.settings import get_application_settings
from ..models import AssociatedInstance
from ..models import (
AssignedTasksToInstance,
AssignedTasksToInstanceType,
AssociatedInstance,
)
from . import utils_docker
from .rabbitmq import log_tasks_message, progress_tasks_message

Expand All @@ -35,28 +38,28 @@ def try_assigning_task_to_node(

def try_assigning_task_to_instance_types(
pending_task: Task,
instance_types_to_tasks: list[tuple[EC2InstanceType, list[Task]]],
instance_types_to_tasks: list[AssignedTasksToInstanceType],
) -> bool:
for instance, instance_assigned_tasks in instance_types_to_tasks:
instance_total_resource = Resources(cpus=instance.cpus, ram=instance.ram)
for assigned_tasks_to_instance_type in instance_types_to_tasks:
instance_total_resource = Resources(
cpus=assigned_tasks_to_instance_type.instance_type.cpus,
ram=assigned_tasks_to_instance_type.instance_type.ram,
)
tasks_needed_resources = utils_docker.compute_tasks_needed_resources(
instance_assigned_tasks
assigned_tasks_to_instance_type.assigned_tasks
)
if (
instance_total_resource - tasks_needed_resources
) >= utils_docker.get_max_resources_from_docker_task(pending_task):
instance_assigned_tasks.append(pending_task)
assigned_tasks_to_instance_type.assigned_tasks.append(pending_task)
return True
return False


AssignedDockerTasksToInstance: TypeAlias = tuple[EC2InstanceData, list[Task], Resources]


async def try_assigning_task_to_instances(
app: FastAPI,
pending_task: Task,
instances_to_tasks: list[AssignedDockerTasksToInstance],
instances_to_tasks: list[AssignedTasksToInstance],
*,
notify_progress: bool,
) -> bool:
Expand All @@ -65,23 +68,23 @@ async def try_assigning_task_to_instances(
instance_max_time_to_start = (
app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_START_TIME
)
for (
instance,
instance_assigned_tasks,
instance_available_resources,
) in instances_to_tasks:
for assigned_tasks_to_instance in instances_to_tasks:
tasks_needed_resources = utils_docker.compute_tasks_needed_resources(
instance_assigned_tasks
assigned_tasks_to_instance.assigned_tasks
)
if (
instance_available_resources - tasks_needed_resources
assigned_tasks_to_instance.available_resources - tasks_needed_resources
) >= utils_docker.get_max_resources_from_docker_task(pending_task):
instance_assigned_tasks.append(pending_task)
assigned_tasks_to_instance.assigned_tasks.append(pending_task)
if notify_progress:
now = datetime.datetime.now(datetime.timezone.utc)
time_since_launch = now - instance.launch_time
time_since_launch = (
now - assigned_tasks_to_instance.instance.launch_time
)
estimated_time_to_completion = (
instance.launch_time + instance_max_time_to_start - now
assigned_tasks_to_instance.instance.launch_time
+ instance_max_time_to_start
- now
)

await log_tasks_message(
Expand Down
Loading

0 comments on commit 9f1e5c8

Please sign in to comment.