From 98987f762c3e295efd51181cd3f2cade20d2b6f0 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Tue, 7 Nov 2023 15:10:20 +0100 Subject: [PATCH] refactor and simplify --- .../modules/auto_scaling_mode_base.py | 4 +- .../auto_scaling_mode_computational.py | 11 ++-- .../modules/auto_scaling_mode_dynamic.py | 20 ++++--- .../utils/computational_scaling.py | 56 +++---------------- .../utils/dynamic_scaling.py | 14 +++-- 5 files changed, 35 insertions(+), 70 deletions(-) diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_base.py b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_base.py index ffb897cd222b..c7ad74bc2b55 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_base.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_base.py @@ -47,7 +47,7 @@ def try_assigning_task_to_node( async def try_assigning_task_to_instances( app: FastAPI, pending_task, - list_of_pending_instance_to_tasks: Iterable[tuple[EC2InstanceData, list]], + instances_to_tasks: Iterable[tuple[EC2InstanceData, list]], type_to_instance_map: dict[str, EC2InstanceType], *, notify_progress: bool @@ -58,7 +58,7 @@ async def try_assigning_task_to_instances( @abstractmethod def try_assigning_task_to_instance_types( pending_task, - list_of_instance_to_tasks: Iterable[tuple[EC2InstanceType, list]], + instance_types_to_tasks: Iterable[tuple[EC2InstanceType, list]], ) -> bool: ... diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_computational.py b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_computational.py index 7665c28bc8b1..fa375734cb16 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_computational.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_computational.py @@ -1,5 +1,6 @@ import collections import logging +from collections.abc import Iterable from fastapi import FastAPI from models_library.docker import ( @@ -61,7 +62,7 @@ async def list_unrunnable_tasks(app: FastAPI) -> list[DaskTask]: @staticmethod def try_assigning_task_to_node( task: DaskTask, - instance_to_tasks: list[tuple[AssociatedInstance, list[DaskTask]]], + instance_to_tasks: Iterable[tuple[AssociatedInstance, list[DaskTask]]], ) -> bool: return utils.try_assigning_task_to_node(task, instance_to_tasks) @@ -69,7 +70,7 @@ def try_assigning_task_to_node( async def try_assigning_task_to_instances( app: FastAPI, pending_task, - list_of_pending_instance_to_tasks: list[tuple[EC2InstanceData, list]], + instances_to_tasks: Iterable[tuple[EC2InstanceData, list]], type_to_instance_map: dict[str, EC2InstanceType], *, notify_progress: bool @@ -77,7 +78,7 @@ async def try_assigning_task_to_instances( return await utils.try_assigning_task_to_instances( app, pending_task, - list_of_pending_instance_to_tasks, + instances_to_tasks, type_to_instance_map, notify_progress=notify_progress, ) @@ -85,10 +86,10 @@ async def try_assigning_task_to_instances( @staticmethod def try_assigning_task_to_instance_types( pending_task, - list_of_instance_to_tasks: list[tuple[EC2InstanceType, list]], + instance_types_to_tasks: Iterable[tuple[EC2InstanceType, list]], ) -> bool: return utils.try_assigning_task_to_instance_types( - pending_task, list_of_instance_to_tasks + pending_task, instance_types_to_tasks ) @staticmethod diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_dynamic.py b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_dynamic.py index 531788aa906c..6a2d5814c855 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_dynamic.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_dynamic.py @@ -1,3 +1,5 @@ +from collections.abc import Iterable + from fastapi import FastAPI from models_library.docker import DockerLabelKey from models_library.generated_models.docker_rest_api import Node, Task @@ -46,15 +48,15 @@ async def list_unrunnable_tasks(app: FastAPI) -> list[Task]: @staticmethod def try_assigning_task_to_node( - task, instance_to_tasks: list[tuple[AssociatedInstance, list]] + task, instances_to_tasks: Iterable[tuple[AssociatedInstance, list]] ) -> bool: - return utils.try_assigning_task_to_node(task, instance_to_tasks) + return utils.try_assigning_task_to_node(task, instances_to_tasks) @staticmethod async def try_assigning_task_to_instances( app: FastAPI, pending_task, - list_of_pending_instance_to_tasks: list[tuple[EC2InstanceData, list]], + instances_to_tasks: Iterable[tuple[EC2InstanceData, list]], type_to_instance_map: dict[str, EC2InstanceType], *, notify_progress: bool @@ -62,7 +64,7 @@ async def try_assigning_task_to_instances( return await utils.try_assigning_task_to_pending_instances( app, pending_task, - list_of_pending_instance_to_tasks, + instances_to_tasks, type_to_instance_map, notify_progress=notify_progress, ) @@ -70,10 +72,10 @@ async def try_assigning_task_to_instances( @staticmethod def try_assigning_task_to_instance_types( pending_task, - list_of_instance_to_tasks: list[tuple[EC2InstanceType, list]], + instance_types_to_tasks: Iterable[tuple[EC2InstanceType, list]], ) -> bool: return utils.try_assigning_task_to_instances( - pending_task, list_of_instance_to_tasks + pending_task, instance_types_to_tasks ) @staticmethod @@ -83,8 +85,10 @@ async def log_message_from_tasks( await log_tasks_message(app, tasks, message, level=level) @staticmethod - async def progress_message_from_tasks(app: FastAPI, tasks: list, progress: float): - await progress_tasks_message(app, tasks, progress=1.0) + async def progress_message_from_tasks( + app: FastAPI, tasks: list, progress: float + ) -> None: + await progress_tasks_message(app, tasks, progress=progress) @staticmethod def get_max_resources_from_task(task) -> Resources: diff --git a/services/autoscaling/src/simcore_service_autoscaling/utils/computational_scaling.py b/services/autoscaling/src/simcore_service_autoscaling/utils/computational_scaling.py index 745a88a37c7d..cb9a164aeafa 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/utils/computational_scaling.py +++ b/services/autoscaling/src/simcore_service_autoscaling/utils/computational_scaling.py @@ -1,6 +1,6 @@ import datetime import logging -from typing import Final +from typing import Final, Iterable from dask_task_models_library.constants import DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY from fastapi import FastAPI @@ -43,22 +43,9 @@ def _compute_tasks_needed_resources(tasks: list[DaskTask]) -> Resources: def try_assigning_task_to_node( pending_task: DaskTask, - instance_to_tasks: list[tuple[AssociatedInstance, list[DaskTask]]], + instance_to_tasks: Iterable[tuple[AssociatedInstance, list[DaskTask]]], ) -> bool: - filtered_list_of_instance_to_tasks = iter(instance_to_tasks) - task_instance_restriction = get_task_instance_restriction(pending_task) - if task_instance_restriction: - - def _by_instance_type( - instance_to_task: tuple[AssociatedInstance, list[DaskTask]] - ) -> bool: - ass_instance, _ = instance_to_task - return bool(ass_instance.ec2_instance.type == task_instance_restriction) - - filtered_list_of_instance_to_tasks = filter( - _by_instance_type, instance_to_tasks - ) - for instance, node_assigned_tasks in filtered_list_of_instance_to_tasks: + for instance, node_assigned_tasks in instance_to_tasks: instance_total_resource = utils_docker.get_node_total_resources(instance.node) tasks_needed_resources = _compute_tasks_needed_resources(node_assigned_tasks) if ( @@ -72,7 +59,7 @@ def _by_instance_type( async def try_assigning_task_to_instances( app: FastAPI, pending_task: DaskTask, - list_of_pending_instance_to_tasks: list[tuple[EC2InstanceData, list[DaskTask]]], + instances_to_tasks: Iterable[tuple[EC2InstanceData, list[DaskTask]]], type_to_instance_map: dict[str, EC2InstanceType], *, notify_progress: bool, @@ -82,22 +69,7 @@ async def try_assigning_task_to_instances( instance_max_time_to_start = ( app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_START_TIME ) - - filtered_list_of_instance_to_tasks = iter(list_of_pending_instance_to_tasks) - task_instance_restriction = get_task_instance_restriction(pending_task) - if task_instance_restriction: - - def _by_instance_type( - instance_to_task: tuple[EC2InstanceData, list[DaskTask]] - ) -> bool: - instance_data, _ = instance_to_task - return bool(instance_data.type == task_instance_restriction) - - filtered_list_of_instance_to_tasks = filter( - _by_instance_type, list_of_pending_instance_to_tasks - ) - - for instance, instance_assigned_tasks in filtered_list_of_instance_to_tasks: + for instance, instance_assigned_tasks in instances_to_tasks: instance_type = type_to_instance_map[instance.type] instance_total_resources = Resources( cpus=instance_type.cpus, ram=instance_type.ram @@ -131,23 +103,9 @@ def _by_instance_type( def try_assigning_task_to_instance_types( pending_task: DaskTask, - list_of_instance_to_tasks: list[tuple[EC2InstanceType, list[DaskTask]]], + instance_types_to_tasks: Iterable[tuple[EC2InstanceType, list[DaskTask]]], ) -> bool: - filtered_list_of_instance_to_tasks = iter(list_of_instance_to_tasks) - task_instance_restriction = get_task_instance_restriction(pending_task) - if task_instance_restriction: - - def _by_instance_type( - instance_to_task: tuple[EC2InstanceType, list[DaskTask]] - ) -> bool: - instance_type, _ = instance_to_task - return bool(instance_type.name == task_instance_restriction) - - filtered_list_of_instance_to_tasks = filter( - _by_instance_type, list_of_instance_to_tasks - ) - - for instance, instance_assigned_tasks in filtered_list_of_instance_to_tasks: + for instance, instance_assigned_tasks in instance_types_to_tasks: instance_total_resource = Resources(cpus=instance.cpus, ram=instance.ram) tasks_needed_resources = _compute_tasks_needed_resources( instance_assigned_tasks diff --git a/services/autoscaling/src/simcore_service_autoscaling/utils/dynamic_scaling.py b/services/autoscaling/src/simcore_service_autoscaling/utils/dynamic_scaling.py index 5c166e28323f..549b59bb38cf 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/utils/dynamic_scaling.py +++ b/services/autoscaling/src/simcore_service_autoscaling/utils/dynamic_scaling.py @@ -1,5 +1,6 @@ import datetime import logging +from collections.abc import Iterable from fastapi import FastAPI from models_library.generated_models.docker_rest_api import Task @@ -14,9 +15,10 @@ def try_assigning_task_to_node( - pending_task: Task, instance_to_tasks: list[tuple[AssociatedInstance, list[Task]]] + pending_task: Task, + instances_to_tasks: Iterable[tuple[AssociatedInstance, list[Task]]], ) -> bool: - for instance, node_assigned_tasks in instance_to_tasks: + for instance, node_assigned_tasks in instances_to_tasks: instance_total_resource = utils_docker.get_node_total_resources(instance.node) tasks_needed_resources = utils_docker.compute_tasks_needed_resources( node_assigned_tasks @@ -31,9 +33,9 @@ def try_assigning_task_to_node( def try_assigning_task_to_instances( pending_task: Task, - list_of_instance_to_tasks: list[tuple[EC2InstanceType, list[Task]]], + instance_types_to_tasks: Iterable[tuple[EC2InstanceType, list[Task]]], ) -> bool: - for instance, instance_assigned_tasks in list_of_instance_to_tasks: + for instance, instance_assigned_tasks in instance_types_to_tasks: instance_total_resource = Resources(cpus=instance.cpus, ram=instance.ram) tasks_needed_resources = utils_docker.compute_tasks_needed_resources( instance_assigned_tasks @@ -49,7 +51,7 @@ def try_assigning_task_to_instances( async def try_assigning_task_to_pending_instances( app: FastAPI, pending_task: Task, - list_of_instances_to_tasks: list[tuple[EC2InstanceData, list[Task]]], + instances_to_tasks: Iterable[tuple[EC2InstanceData, list[Task]]], type_to_instance_map: dict[str, EC2InstanceType], *, notify_progress: bool, @@ -59,7 +61,7 @@ async def try_assigning_task_to_pending_instances( instance_max_time_to_start = ( app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_START_TIME ) - for instance, instance_assigned_tasks in list_of_instances_to_tasks: + for instance, instance_assigned_tasks in instances_to_tasks: instance_type = type_to_instance_map[instance.type] instance_total_resources = Resources( cpus=instance_type.cpus, ram=instance_type.ram