diff --git a/services/autoscaling/src/simcore_service_autoscaling/models.py b/services/autoscaling/src/simcore_service_autoscaling/models.py index 63bcccb633c..d9c3bf3978d 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/models.py +++ b/services/autoscaling/src/simcore_service_autoscaling/models.py @@ -4,8 +4,18 @@ from aws_library.ec2.models import EC2InstanceData, EC2InstanceType, Resources from models_library.generated_models.docker_rest_api import Node -AssignedTasksToInstance: TypeAlias = tuple[EC2InstanceData, list, Resources] -AssignedTasksToInstanceType: TypeAlias = tuple[EC2InstanceType, list] + +@dataclass(frozen=True, kw_only=True) +class AssignedTasksToInstance: + instance: EC2InstanceData + available_resources: Resources + assigned_tasks: list + + +@dataclass(frozen=True, kw_only=True) +class AssignedTasksToInstanceType: + instance_type: EC2InstanceType + assigned_tasks: list @dataclass(frozen=True) diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py index be6bfe3ee05..08cd815c5ad 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py @@ -332,19 +332,27 @@ async def _find_needed_instances( ) -> dict[EC2InstanceType, int]: # 1. check first the pending task needs active_instances_to_tasks: list[AssignedTasksToInstance] = [ - ( - i.ec2_instance, - [], - i.ec2_instance.resources + AssignedTasksToInstance( + instance=i.ec2_instance, + assigned_tasks=[], + available_resources=i.ec2_instance.resources - await auto_scaling_mode.compute_node_used_resources(app, i), ) for i in cluster.active_nodes ] pending_instances_to_tasks: list[AssignedTasksToInstance] = [ - (i, [], i.resources) for i in cluster.pending_ec2s + AssignedTasksToInstance( + instance=i, assigned_tasks=[], available_resources=i.resources + ) + for i in cluster.pending_ec2s ] drained_instances_to_tasks: list[AssignedTasksToInstance] = [ - (i.ec2_instance, [], i.ec2_instance.resources) for i in cluster.drained_nodes + AssignedTasksToInstance( + instance=i.ec2_instance, + assigned_tasks=[], + available_resources=i.ec2_instance.resources, + ) + for i in cluster.drained_nodes ] needed_new_instance_types_for_tasks: list[AssignedTasksToInstanceType] = [] for task in pending_tasks: @@ -377,7 +385,11 @@ async def _find_needed_instances( defined_ec2 = find_selected_instance_type_for_task( task_defined_ec2_type, available_ec2_types, auto_scaling_mode, task ) - needed_new_instance_types_for_tasks.append((defined_ec2, [task])) + needed_new_instance_types_for_tasks.append( + AssignedTasksToInstanceType( + instance_type=defined_ec2, assigned_tasks=[task] + ) + ) else: # we go for best fitting type best_ec2_instance = utils_ec2.find_best_fitting_ec2_instance( @@ -385,7 +397,11 @@ async def _find_needed_instances( auto_scaling_mode.get_max_resources_from_task(task), score_type=utils_ec2.closest_instance_policy, ) - needed_new_instance_types_for_tasks.append((best_ec2_instance, [task])) + needed_new_instance_types_for_tasks.append( + AssignedTasksToInstanceType( + instance_type=best_ec2_instance, assigned_tasks=[task] + ) + ) except Ec2InstanceNotFoundError: _logger.exception( "Task %s needs more resources than any EC2 instance " @@ -396,7 +412,10 @@ async def _find_needed_instances( _logger.exception("Unexpected error:") num_instances_per_type = collections.defaultdict( - int, collections.Counter(t for t, _ in needed_new_instance_types_for_tasks) + int, + collections.Counter( + t.instance_type for t in needed_new_instance_types_for_tasks + ), ) # 2. check the buffer needs @@ -410,9 +429,7 @@ async def _find_needed_instances( ) > 0: # check if some are already pending remaining_pending_instances = [ - instance - for instance, assigned_tasks, _ in pending_instances_to_tasks - if not assigned_tasks + i.instance for i in pending_instances_to_tasks if not i.assigned_tasks ] if len(remaining_pending_instances) < ( app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_computational.py b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_computational.py index 25cfc7e5122..9ddb7df5c14 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_computational.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_computational.py @@ -2,7 +2,7 @@ import logging from collections.abc import Iterable -from aws_library.ec2.models import EC2InstanceData, EC2InstanceType, Resources +from aws_library.ec2.models import EC2InstanceData, Resources from fastapi import FastAPI from models_library.docker import ( DOCKER_TASK_EC2_INSTANCE_TYPE_PLACEMENT_CONSTRAINT_KEY, @@ -15,7 +15,12 @@ from types_aiobotocore_ec2.literals import InstanceTypeType from ..core.settings import get_application_settings -from ..models import AssignedTasksToInstance, AssociatedInstance, DaskTask +from ..models import ( + AssignedTasksToInstance, + AssignedTasksToInstanceType, + AssociatedInstance, + DaskTask, +) from ..utils import computational_scaling as utils from ..utils import utils_docker, utils_ec2 from . import dask @@ -79,7 +84,7 @@ async def try_assigning_task_to_instances( @staticmethod def try_assigning_task_to_instance_types( pending_task, - instance_types_to_tasks: list[tuple[EC2InstanceType, list]], + instance_types_to_tasks: list[AssignedTasksToInstanceType], ) -> bool: return utils.try_assigning_task_to_instance_types( pending_task, instance_types_to_tasks diff --git a/services/autoscaling/src/simcore_service_autoscaling/utils/auto_scaling_core.py b/services/autoscaling/src/simcore_service_autoscaling/utils/auto_scaling_core.py index 59e4836695d..106b68ea352 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/utils/auto_scaling_core.py +++ b/services/autoscaling/src/simcore_service_autoscaling/utils/auto_scaling_core.py @@ -114,8 +114,7 @@ def _instance_type_by_type_name( def _instance_type_map_by_type_name( mapping: AssignedTasksToInstanceType, *, type_name: InstanceTypeType | None ) -> bool: - ec2_type, _ = mapping - return _instance_type_by_type_name(ec2_type, type_name=type_name) + return _instance_type_by_type_name(mapping.instance_type, type_name=type_name) def _instance_data_map_by_type_name( @@ -123,8 +122,7 @@ def _instance_data_map_by_type_name( ) -> bool: if type_name is None: return True - ec2_data, *_ = mapping - return bool(ec2_data.type == type_name) + return bool(mapping.instance.type == type_name) def filter_by_task_defined_instance( diff --git a/services/autoscaling/src/simcore_service_autoscaling/utils/computational_scaling.py b/services/autoscaling/src/simcore_service_autoscaling/utils/computational_scaling.py index 66413816811..f7ff8253716 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/utils/computational_scaling.py +++ b/services/autoscaling/src/simcore_service_autoscaling/utils/computational_scaling.py @@ -1,16 +1,21 @@ import datetime import logging from collections.abc import Iterable -from typing import Final, TypeAlias +from typing import Final -from aws_library.ec2.models import EC2InstanceData, EC2InstanceType, Resources +from aws_library.ec2.models import Resources from dask_task_models_library.constants import DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY from fastapi import FastAPI from servicelib.utils_formatting import timedelta_as_minute_second from types_aiobotocore_ec2.literals import InstanceTypeType from ..core.settings import get_application_settings -from ..models import AssociatedInstance, DaskTask +from ..models import ( + AssignedTasksToInstance, + AssignedTasksToInstanceType, + AssociatedInstance, + DaskTask, +) _logger = logging.getLogger(__name__) @@ -51,15 +56,10 @@ def try_assigning_task_to_node( return False -AssignedDaskTasksToInstance: TypeAlias = tuple[ - EC2InstanceData, list[DaskTask], Resources -] - - async def try_assigning_task_to_instances( app: FastAPI, pending_task: DaskTask, - instances_to_tasks: list[AssignedDaskTasksToInstance], + instances_to_tasks: list[AssignedTasksToInstance], *, notify_progress: bool, ) -> bool: @@ -68,23 +68,23 @@ async def try_assigning_task_to_instances( instance_max_time_to_start = ( app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_START_TIME ) - for ( - instance, - instance_assigned_tasks, - instance_available_resources, - ) in instances_to_tasks: + for assigned_tasks_to_instance in instances_to_tasks: tasks_needed_resources = _compute_tasks_needed_resources( - instance_assigned_tasks + assigned_tasks_to_instance.assigned_tasks ) if ( - instance_available_resources - tasks_needed_resources + assigned_tasks_to_instance.available_resources - tasks_needed_resources ) >= get_max_resources_from_dask_task(pending_task): - instance_assigned_tasks.append(pending_task) + assigned_tasks_to_instance.assigned_tasks.append(pending_task) if notify_progress: now = datetime.datetime.now(datetime.timezone.utc) - time_since_launch = now - instance.launch_time + time_since_launch = ( + now - assigned_tasks_to_instance.instance.launch_time + ) estimated_time_to_completion = ( - instance.launch_time + instance_max_time_to_start - now + assigned_tasks_to_instance.instance.launch_time + + instance_max_time_to_start + - now ) _logger.info( "LOG: %s", @@ -102,16 +102,19 @@ async def try_assigning_task_to_instances( def try_assigning_task_to_instance_types( pending_task: DaskTask, - instance_types_to_tasks: list[tuple[EC2InstanceType, list[DaskTask]]], + instance_types_to_tasks: list[AssignedTasksToInstanceType], ) -> bool: - for instance, instance_assigned_tasks in instance_types_to_tasks: - instance_total_resource = Resources(cpus=instance.cpus, ram=instance.ram) + for assigned_tasks_to_instance_type in instance_types_to_tasks: + instance_total_resource = Resources( + cpus=assigned_tasks_to_instance_type.instance_type.cpus, + ram=assigned_tasks_to_instance_type.instance_type.ram, + ) tasks_needed_resources = _compute_tasks_needed_resources( - instance_assigned_tasks + assigned_tasks_to_instance_type.assigned_tasks ) if ( instance_total_resource - tasks_needed_resources ) >= get_max_resources_from_dask_task(pending_task): - instance_assigned_tasks.append(pending_task) + assigned_tasks_to_instance_type.assigned_tasks.append(pending_task) return True return False diff --git a/services/autoscaling/src/simcore_service_autoscaling/utils/dynamic_scaling.py b/services/autoscaling/src/simcore_service_autoscaling/utils/dynamic_scaling.py index e43176da44d..ab6f6756d43 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/utils/dynamic_scaling.py +++ b/services/autoscaling/src/simcore_service_autoscaling/utils/dynamic_scaling.py @@ -1,15 +1,18 @@ import datetime import logging from collections.abc import Iterable -from typing import TypeAlias -from aws_library.ec2.models import EC2InstanceData, EC2InstanceType, Resources +from aws_library.ec2.models import Resources from fastapi import FastAPI from models_library.generated_models.docker_rest_api import Task from servicelib.utils_formatting import timedelta_as_minute_second from ..core.settings import get_application_settings -from ..models import AssociatedInstance +from ..models import ( + AssignedTasksToInstance, + AssignedTasksToInstanceType, + AssociatedInstance, +) from . import utils_docker from .rabbitmq import log_tasks_message, progress_tasks_message @@ -35,28 +38,28 @@ def try_assigning_task_to_node( def try_assigning_task_to_instance_types( pending_task: Task, - instance_types_to_tasks: list[tuple[EC2InstanceType, list[Task]]], + instance_types_to_tasks: list[AssignedTasksToInstanceType], ) -> bool: - for instance, instance_assigned_tasks in instance_types_to_tasks: - instance_total_resource = Resources(cpus=instance.cpus, ram=instance.ram) + for assigned_tasks_to_instance_type in instance_types_to_tasks: + instance_total_resource = Resources( + cpus=assigned_tasks_to_instance_type.instance_type.cpus, + ram=assigned_tasks_to_instance_type.instance_type.ram, + ) tasks_needed_resources = utils_docker.compute_tasks_needed_resources( - instance_assigned_tasks + assigned_tasks_to_instance_type.assigned_tasks ) if ( instance_total_resource - tasks_needed_resources ) >= utils_docker.get_max_resources_from_docker_task(pending_task): - instance_assigned_tasks.append(pending_task) + assigned_tasks_to_instance_type.assigned_tasks.append(pending_task) return True return False -AssignedDockerTasksToInstance: TypeAlias = tuple[EC2InstanceData, list[Task], Resources] - - async def try_assigning_task_to_instances( app: FastAPI, pending_task: Task, - instances_to_tasks: list[AssignedDockerTasksToInstance], + instances_to_tasks: list[AssignedTasksToInstance], *, notify_progress: bool, ) -> bool: @@ -65,23 +68,23 @@ async def try_assigning_task_to_instances( instance_max_time_to_start = ( app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_START_TIME ) - for ( - instance, - instance_assigned_tasks, - instance_available_resources, - ) in instances_to_tasks: + for assigned_tasks_to_instance in instances_to_tasks: tasks_needed_resources = utils_docker.compute_tasks_needed_resources( - instance_assigned_tasks + assigned_tasks_to_instance.assigned_tasks ) if ( - instance_available_resources - tasks_needed_resources + assigned_tasks_to_instance.available_resources - tasks_needed_resources ) >= utils_docker.get_max_resources_from_docker_task(pending_task): - instance_assigned_tasks.append(pending_task) + assigned_tasks_to_instance.assigned_tasks.append(pending_task) if notify_progress: now = datetime.datetime.now(datetime.timezone.utc) - time_since_launch = now - instance.launch_time + time_since_launch = ( + now - assigned_tasks_to_instance.instance.launch_time + ) estimated_time_to_completion = ( - instance.launch_time + instance_max_time_to_start - now + assigned_tasks_to_instance.instance.launch_time + + instance_max_time_to_start + - now ) await log_tasks_message( diff --git a/services/autoscaling/tests/unit/test_utils_computational_scaling.py b/services/autoscaling/tests/unit/test_utils_computational_scaling.py index 1d9c50757fa..fa06a5a18b8 100644 --- a/services/autoscaling/tests/unit/test_utils_computational_scaling.py +++ b/services/autoscaling/tests/unit/test_utils_computational_scaling.py @@ -14,6 +14,8 @@ from pydantic import ByteSize, parse_obj_as from pytest_mock import MockerFixture from simcore_service_autoscaling.models import ( + AssignedTasksToInstance, + AssignedTasksToInstanceType, AssociatedInstance, DaskTask, DaskTaskResources, @@ -22,7 +24,6 @@ from simcore_service_autoscaling.utils.computational_scaling import ( _DEFAULT_MAX_CPU, _DEFAULT_MAX_RAM, - AssignedDaskTasksToInstance, get_max_resources_from_dask_task, try_assigning_task_to_instance_types, try_assigning_task_to_instances, @@ -152,8 +153,12 @@ async def test_try_assigning_task_to_instances( ): task = fake_task(required_resources={"CPU": 2}) ec2_instance = fake_ec2_instance_data() - pending_instance_to_tasks: list[AssignedDaskTasksToInstance] = [ - (ec2_instance, [], Resources(cpus=4, ram=ByteSize(1024**2))) + pending_instance_to_tasks: list[AssignedTasksToInstance] = [ + AssignedTasksToInstance( + instance=ec2_instance, + assigned_tasks=[], + available_resources=Resources(cpus=4, ram=ByteSize(1024**2)), + ) ] # calling once should allow to add that task to the instance @@ -166,7 +171,7 @@ async def test_try_assigning_task_to_instances( ) is True ) - assert pending_instance_to_tasks[0][1] == [task] + assert pending_instance_to_tasks[0].assigned_tasks == [task] # calling a second time as well should allow to add that task to the instance assert ( await try_assigning_task_to_instances( @@ -177,7 +182,7 @@ async def test_try_assigning_task_to_instances( ) is True ) - assert pending_instance_to_tasks[0][1] == [task, task] + assert pending_instance_to_tasks[0].assigned_tasks == [task, task] # calling a third time should fail assert ( await try_assigning_task_to_instances( @@ -188,7 +193,7 @@ async def test_try_assigning_task_to_instances( ) is False ) - assert pending_instance_to_tasks[0][1] == [task, task] + assert pending_instance_to_tasks[0].assigned_tasks == [task, task] def test_try_assigning_task_to_instance_types_with_empty_types( @@ -206,16 +211,16 @@ def test_try_assigning_task_to_instance_types( fake_instance_type = EC2InstanceType( name=faker.name(), cpus=6, ram=parse_obj_as(ByteSize, "2GiB") ) - instance_type_to_tasks: list[tuple[EC2InstanceType, list[DaskTask]]] = [ - (fake_instance_type, []) + instance_type_to_tasks: list[AssignedTasksToInstanceType] = [ + AssignedTasksToInstanceType(instance_type=fake_instance_type, assigned_tasks=[]) ] # now this should work 3 times assert try_assigning_task_to_instance_types(task, instance_type_to_tasks) is True - assert instance_type_to_tasks[0][1] == [task] + assert instance_type_to_tasks[0].assigned_tasks == [task] assert try_assigning_task_to_instance_types(task, instance_type_to_tasks) is True - assert instance_type_to_tasks[0][1] == [task, task] + assert instance_type_to_tasks[0].assigned_tasks == [task, task] assert try_assigning_task_to_instance_types(task, instance_type_to_tasks) is True - assert instance_type_to_tasks[0][1] == [task, task, task] + assert instance_type_to_tasks[0].assigned_tasks == [task, task, task] # now it should fail assert try_assigning_task_to_instance_types(task, instance_type_to_tasks) is False - assert instance_type_to_tasks[0][1] == [task, task, task] + assert instance_type_to_tasks[0].assigned_tasks == [task, task, task] diff --git a/services/autoscaling/tests/unit/test_utils_dynamic_scaling.py b/services/autoscaling/tests/unit/test_utils_dynamic_scaling.py index 5712d4aa7b7..ab3b3c55e3f 100644 --- a/services/autoscaling/tests/unit/test_utils_dynamic_scaling.py +++ b/services/autoscaling/tests/unit/test_utils_dynamic_scaling.py @@ -13,8 +13,8 @@ from models_library.generated_models.docker_rest_api import Task from pydantic import ByteSize from pytest_mock import MockerFixture +from simcore_service_autoscaling.models import AssignedTasksToInstance from simcore_service_autoscaling.utils.dynamic_scaling import ( - AssignedDockerTasksToInstance, try_assigning_task_to_instances, ) @@ -56,8 +56,12 @@ async def test_try_assigning_task_to_instances( Spec={"Resources": {"Reservations": {"NanoCPUs": 2 * 1e9}}} ) fake_instance = fake_ec2_instance_data() - pending_instance_to_tasks: list[AssignedDockerTasksToInstance] = [ - (fake_instance, [], Resources(cpus=4, ram=ByteSize(1024**3))) + pending_instance_to_tasks: list[AssignedTasksToInstance] = [ + AssignedTasksToInstance( + instance=fake_instance, + assigned_tasks=[], + available_resources=Resources(cpus=4, ram=ByteSize(1024**3)), + ) ] # calling once should allow to add that task to the instance