Skip to content

Commit

Permalink
improve error message
Browse files Browse the repository at this point in the history
  • Loading branch information
sanderegg committed Jun 27, 2023
1 parent 678e249 commit 761bf9c
Showing 1 changed file with 46 additions and 28 deletions.
74 changes: 46 additions & 28 deletions services/director-v2/src/simcore_service_director_v2/utils/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from models_library.projects import ProjectID
from models_library.projects_nodes_io import NodeID, NodeIDStr
from models_library.users import UserID
from pydantic import AnyUrl, ByteSize, ValidationError
from pydantic import AnyUrl, ByteSize, ValidationError, parse_obj_as
from servicelib.json_serialization import json_dumps
from servicelib.logging_utils import log_catch, log_context
from simcore_sdk import node_ports_v2
Expand Down Expand Up @@ -462,6 +462,46 @@ async def check_maximize_workers(cluster: dask_gateway.GatewayCluster | None) ->
await cluster.scale(_LARGE_NUMBER_OF_WORKERS)


def _can_task_run_on_worker(
task_resources: dict[str, Any], worker_resources: dict[str, Any]
) -> bool:
def gen_check(
task_resources: dict[str, Any], worker_resources: dict[str, Any]
) -> Iterable[bool]:
for name, required_value in task_resources.items():
if required_value is None:
yield True
elif worker_has := worker_resources.get(name):
yield worker_has >= required_value
else:
yield False

return all(gen_check(task_resources, worker_resources))


def _cluster_missing_resources(
task_resources: dict[str, Any], cluster_resources: dict[str, Any]
) -> list[str]:
return [r for r in task_resources if r not in cluster_resources]


def _human_readable_resources(resources: dict[str, Any]) -> dict[str, Any]:
human_readable_resources = {}

for res_name, res_value in resources.items():
if "RAM" in res_name:
try:
human_readable_resources[res_name] = parse_obj_as(
ByteSize, res_value
).human_readable()
except ValidationError:
logger.warning("could not parse %s:", f"{res_name=}", res_value)
human_readable_resources[res_name] = res_value
else:
human_readable_resources[res_name] = res_value
return human_readable_resources


def check_if_cluster_is_able_to_run_pipeline(
project_id: ProjectID,
node_id: NodeID,
Expand All @@ -473,33 +513,12 @@ def check_if_cluster_is_able_to_run_pipeline(
logger.debug("Dask scheduler infos: %s", json_dumps(scheduler_info, indent=2))
workers = scheduler_info.get("workers", {})

def can_task_run_on_worker(
task_resources: dict[str, Any], worker_resources: dict[str, Any]
) -> bool:
def gen_check(
task_resources: dict[str, Any], worker_resources: dict[str, Any]
) -> Iterable[bool]:
for name, required_value in task_resources.items():
if required_value is None:
yield True
elif worker_has := worker_resources.get(name):
yield worker_has >= required_value
else:
yield False

return all(gen_check(task_resources, worker_resources))

def cluster_missing_resources(
task_resources: dict[str, Any], cluster_resources: dict[str, Any]
) -> list[str]:
return [r for r in task_resources if r not in cluster_resources]

cluster_resources_counter: collections.Counter = collections.Counter()
can_a_worker_run_task = False
for worker in workers:
worker_resources = workers[worker].get("resources", {})
cluster_resources_counter.update(worker_resources)
if can_task_run_on_worker(task_resources, worker_resources):
if _can_task_run_on_worker(task_resources, worker_resources):
can_a_worker_run_task = True
all_available_resources_in_cluster = dict(cluster_resources_counter)

Expand All @@ -514,7 +533,7 @@ def cluster_missing_resources(
return

# check if we have missing resources
if missing_resources := cluster_missing_resources(
if missing_resources := _cluster_missing_resources(
task_resources, all_available_resources_in_cluster
):
cluster_resources = (
Expand All @@ -535,10 +554,9 @@ def cluster_missing_resources(
raise InsuficientComputationalResourcesError(
project_id=project_id,
node_id=node_id,
msg=f"Service {node_image.name}:{node_image.tag} cannot be scheduled "
f"on cluster {cluster_id}: insuficient resources"
f"cluster has '{all_available_resources_in_cluster}', cluster has no worker with the"
" necessary computational resources for running the service! TIP: contact oSparc support",
msg=f"Insufficient computational resources to run {node_image.name}:{node_image.tag} with {_human_readable_resources( task_resources)} on cluster {cluster_id}."
f"Cluster available workers: {[_human_readable_resources( worker.get('resources', None)) for worker in workers.values()]}"
"TIP: Reduce service required resources or contact oSparc support",
)


Expand Down

0 comments on commit 761bf9c

Please sign in to comment.