Skip to content

Commit

Permalink
refactor and simplify
Browse files Browse the repository at this point in the history
  • Loading branch information
sanderegg committed Nov 7, 2023
1 parent 01a35c3 commit 98987f7
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def try_assigning_task_to_node(
async def try_assigning_task_to_instances(
app: FastAPI,
pending_task,
list_of_pending_instance_to_tasks: Iterable[tuple[EC2InstanceData, list]],
instances_to_tasks: Iterable[tuple[EC2InstanceData, list]],
type_to_instance_map: dict[str, EC2InstanceType],
*,
notify_progress: bool
Expand All @@ -58,7 +58,7 @@ async def try_assigning_task_to_instances(
@abstractmethod
def try_assigning_task_to_instance_types(
pending_task,
list_of_instance_to_tasks: Iterable[tuple[EC2InstanceType, list]],
instance_types_to_tasks: Iterable[tuple[EC2InstanceType, list]],
) -> bool:
...

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import collections
import logging
from collections.abc import Iterable

from fastapi import FastAPI
from models_library.docker import (
Expand Down Expand Up @@ -61,34 +62,34 @@ async def list_unrunnable_tasks(app: FastAPI) -> list[DaskTask]:
@staticmethod
def try_assigning_task_to_node(
task: DaskTask,
instance_to_tasks: list[tuple[AssociatedInstance, list[DaskTask]]],
instance_to_tasks: Iterable[tuple[AssociatedInstance, list[DaskTask]]],
) -> bool:
return utils.try_assigning_task_to_node(task, instance_to_tasks)

@staticmethod
async def try_assigning_task_to_instances(
app: FastAPI,
pending_task,
list_of_pending_instance_to_tasks: list[tuple[EC2InstanceData, list]],
instances_to_tasks: Iterable[tuple[EC2InstanceData, list]],
type_to_instance_map: dict[str, EC2InstanceType],
*,
notify_progress: bool
) -> bool:
return await utils.try_assigning_task_to_instances(
app,
pending_task,
list_of_pending_instance_to_tasks,
instances_to_tasks,
type_to_instance_map,
notify_progress=notify_progress,
)

@staticmethod
def try_assigning_task_to_instance_types(
pending_task,
list_of_instance_to_tasks: list[tuple[EC2InstanceType, list]],
instance_types_to_tasks: Iterable[tuple[EC2InstanceType, list]],
) -> bool:
return utils.try_assigning_task_to_instance_types(
pending_task, list_of_instance_to_tasks
pending_task, instance_types_to_tasks
)

@staticmethod
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from collections.abc import Iterable

from fastapi import FastAPI
from models_library.docker import DockerLabelKey
from models_library.generated_models.docker_rest_api import Node, Task
Expand Down Expand Up @@ -46,34 +48,34 @@ async def list_unrunnable_tasks(app: FastAPI) -> list[Task]:

@staticmethod
def try_assigning_task_to_node(
task, instance_to_tasks: list[tuple[AssociatedInstance, list]]
task, instances_to_tasks: Iterable[tuple[AssociatedInstance, list]]
) -> bool:
return utils.try_assigning_task_to_node(task, instance_to_tasks)
return utils.try_assigning_task_to_node(task, instances_to_tasks)

@staticmethod
async def try_assigning_task_to_instances(
app: FastAPI,
pending_task,
list_of_pending_instance_to_tasks: list[tuple[EC2InstanceData, list]],
instances_to_tasks: Iterable[tuple[EC2InstanceData, list]],
type_to_instance_map: dict[str, EC2InstanceType],
*,
notify_progress: bool
) -> bool:
return await utils.try_assigning_task_to_pending_instances(
app,
pending_task,
list_of_pending_instance_to_tasks,
instances_to_tasks,
type_to_instance_map,
notify_progress=notify_progress,
)

@staticmethod
def try_assigning_task_to_instance_types(
pending_task,
list_of_instance_to_tasks: list[tuple[EC2InstanceType, list]],
instance_types_to_tasks: Iterable[tuple[EC2InstanceType, list]],
) -> bool:
return utils.try_assigning_task_to_instances(
pending_task, list_of_instance_to_tasks
pending_task, instance_types_to_tasks
)

@staticmethod
Expand All @@ -83,8 +85,10 @@ async def log_message_from_tasks(
await log_tasks_message(app, tasks, message, level=level)

@staticmethod
async def progress_message_from_tasks(app: FastAPI, tasks: list, progress: float):
await progress_tasks_message(app, tasks, progress=1.0)
async def progress_message_from_tasks(
app: FastAPI, tasks: list, progress: float
) -> None:
await progress_tasks_message(app, tasks, progress=progress)

@staticmethod
def get_max_resources_from_task(task) -> Resources:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import datetime
import logging
from typing import Final
from typing import Final, Iterable

from dask_task_models_library.constants import DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY
from fastapi import FastAPI
Expand Down Expand Up @@ -43,22 +43,9 @@ def _compute_tasks_needed_resources(tasks: list[DaskTask]) -> Resources:

def try_assigning_task_to_node(
pending_task: DaskTask,
instance_to_tasks: list[tuple[AssociatedInstance, list[DaskTask]]],
instance_to_tasks: Iterable[tuple[AssociatedInstance, list[DaskTask]]],
) -> bool:
filtered_list_of_instance_to_tasks = iter(instance_to_tasks)
task_instance_restriction = get_task_instance_restriction(pending_task)
if task_instance_restriction:

def _by_instance_type(
instance_to_task: tuple[AssociatedInstance, list[DaskTask]]
) -> bool:
ass_instance, _ = instance_to_task
return bool(ass_instance.ec2_instance.type == task_instance_restriction)

filtered_list_of_instance_to_tasks = filter(
_by_instance_type, instance_to_tasks
)
for instance, node_assigned_tasks in filtered_list_of_instance_to_tasks:
for instance, node_assigned_tasks in instance_to_tasks:
instance_total_resource = utils_docker.get_node_total_resources(instance.node)
tasks_needed_resources = _compute_tasks_needed_resources(node_assigned_tasks)
if (
Expand All @@ -72,7 +59,7 @@ def _by_instance_type(
async def try_assigning_task_to_instances(
app: FastAPI,
pending_task: DaskTask,
list_of_pending_instance_to_tasks: list[tuple[EC2InstanceData, list[DaskTask]]],
instances_to_tasks: Iterable[tuple[EC2InstanceData, list[DaskTask]]],
type_to_instance_map: dict[str, EC2InstanceType],
*,
notify_progress: bool,
Expand All @@ -82,22 +69,7 @@ async def try_assigning_task_to_instances(
instance_max_time_to_start = (
app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_START_TIME
)

filtered_list_of_instance_to_tasks = iter(list_of_pending_instance_to_tasks)
task_instance_restriction = get_task_instance_restriction(pending_task)
if task_instance_restriction:

def _by_instance_type(
instance_to_task: tuple[EC2InstanceData, list[DaskTask]]
) -> bool:
instance_data, _ = instance_to_task
return bool(instance_data.type == task_instance_restriction)

filtered_list_of_instance_to_tasks = filter(
_by_instance_type, list_of_pending_instance_to_tasks
)

for instance, instance_assigned_tasks in filtered_list_of_instance_to_tasks:
for instance, instance_assigned_tasks in instances_to_tasks:
instance_type = type_to_instance_map[instance.type]
instance_total_resources = Resources(
cpus=instance_type.cpus, ram=instance_type.ram
Expand Down Expand Up @@ -131,23 +103,9 @@ def _by_instance_type(

def try_assigning_task_to_instance_types(
pending_task: DaskTask,
list_of_instance_to_tasks: list[tuple[EC2InstanceType, list[DaskTask]]],
instance_types_to_tasks: Iterable[tuple[EC2InstanceType, list[DaskTask]]],
) -> bool:
filtered_list_of_instance_to_tasks = iter(list_of_instance_to_tasks)
task_instance_restriction = get_task_instance_restriction(pending_task)
if task_instance_restriction:

def _by_instance_type(
instance_to_task: tuple[EC2InstanceType, list[DaskTask]]
) -> bool:
instance_type, _ = instance_to_task
return bool(instance_type.name == task_instance_restriction)

filtered_list_of_instance_to_tasks = filter(
_by_instance_type, list_of_instance_to_tasks
)

for instance, instance_assigned_tasks in filtered_list_of_instance_to_tasks:
for instance, instance_assigned_tasks in instance_types_to_tasks:
instance_total_resource = Resources(cpus=instance.cpus, ram=instance.ram)
tasks_needed_resources = _compute_tasks_needed_resources(
instance_assigned_tasks
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import datetime
import logging
from collections.abc import Iterable

from fastapi import FastAPI
from models_library.generated_models.docker_rest_api import Task
Expand All @@ -14,9 +15,10 @@


def try_assigning_task_to_node(
pending_task: Task, instance_to_tasks: list[tuple[AssociatedInstance, list[Task]]]
pending_task: Task,
instances_to_tasks: Iterable[tuple[AssociatedInstance, list[Task]]],
) -> bool:
for instance, node_assigned_tasks in instance_to_tasks:
for instance, node_assigned_tasks in instances_to_tasks:
instance_total_resource = utils_docker.get_node_total_resources(instance.node)
tasks_needed_resources = utils_docker.compute_tasks_needed_resources(
node_assigned_tasks
Expand All @@ -31,9 +33,9 @@ def try_assigning_task_to_node(

def try_assigning_task_to_instances(
pending_task: Task,
list_of_instance_to_tasks: list[tuple[EC2InstanceType, list[Task]]],
instance_types_to_tasks: Iterable[tuple[EC2InstanceType, list[Task]]],
) -> bool:
for instance, instance_assigned_tasks in list_of_instance_to_tasks:
for instance, instance_assigned_tasks in instance_types_to_tasks:
instance_total_resource = Resources(cpus=instance.cpus, ram=instance.ram)
tasks_needed_resources = utils_docker.compute_tasks_needed_resources(
instance_assigned_tasks
Expand All @@ -49,7 +51,7 @@ def try_assigning_task_to_instances(
async def try_assigning_task_to_pending_instances(
app: FastAPI,
pending_task: Task,
list_of_instances_to_tasks: list[tuple[EC2InstanceData, list[Task]]],
instances_to_tasks: Iterable[tuple[EC2InstanceData, list[Task]]],
type_to_instance_map: dict[str, EC2InstanceType],
*,
notify_progress: bool,
Expand All @@ -59,7 +61,7 @@ async def try_assigning_task_to_pending_instances(
instance_max_time_to_start = (
app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_START_TIME
)
for instance, instance_assigned_tasks in list_of_instances_to_tasks:
for instance, instance_assigned_tasks in instances_to_tasks:
instance_type = type_to_instance_map[instance.type]
instance_total_resources = Resources(
cpus=instance_type.cpus, ram=instance_type.ram
Expand Down

0 comments on commit 98987f7

Please sign in to comment.