Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛Fix infinite Waiting for cluster when dask-scheduler is restarted #5252

Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from enum import Enum
from typing import TypeAlias


# NOTE: mypy fails with src/simcore_service_director_v2/modules/dask_client.py:101:5: error: Dict entry 0 has incompatible type "str": "auto"; expected "Any": "DaskClientTaskState" [dict-item]
Expand All @@ -11,3 +12,7 @@ class DaskClientTaskState(str, Enum):
ERRED = "ERRED"
ABORTED = "ABORTED"
SUCCESS = "SUCCESS"


DaskJobID: TypeAlias = str
DaskResources: TypeAlias = dict[str, int | float]
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import asyncio
import datetime
import logging
import traceback
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Final
Expand Down Expand Up @@ -551,7 +550,7 @@ async def _start_tasks(
project_id: ProjectID,
scheduled_tasks: dict[NodeID, CompTaskAtDB],
pipeline_params: ScheduledPipelineParams,
) -> list:
) -> None:
...

@abstractmethod
Expand Down Expand Up @@ -663,7 +662,7 @@ async def _schedule_pipeline(
user_id, project_id, iteration, RunningState.ABORTED
)
self.scheduled_pipelines.pop((user_id, project_id, iteration), None)
except DaskClientAcquisisitonError:
except (DaskClientAcquisisitonError, ClustersKeeperNotAvailableError):
_logger.exception(
"Unexpected error while connecting with computational backend, aborting pipeline"
)
Expand Down Expand Up @@ -692,12 +691,14 @@ async def _schedule_tasks_to_stop(
) -> None:
# get any running task and stop them
comp_tasks_repo = CompTasksRepository.instance(self.db_engine)
await comp_tasks_repo.mark_project_published_tasks_as_aborted(project_id)
await comp_tasks_repo.mark_project_published_waiting_for_cluster_tasks_as_aborted(
project_id
)
# stop any remaining running task, these are already submitted
tasks_to_stop = [t for t in comp_tasks.values() if t.state in PROCESSING_STATES]
await self._stop_tasks(user_id, tasks_to_stop, pipeline_params)

async def _schedule_tasks_to_start(
async def _schedule_tasks_to_start( # noqa: C901
self,
user_id: UserID,
project_id: ProjectID,
Expand Down Expand Up @@ -729,77 +730,32 @@ async def _schedule_tasks_to_start(
return comp_tasks

try:
results = await self._start_tasks(
await self._start_tasks(
user_id=user_id,
project_id=project_id,
scheduled_tasks=tasks_ready_to_start,
pipeline_params=pipeline_params,
)
except (
ComputationalBackendNotConnectedError,
ComputationalSchedulerChangedError,
):
_logger.exception(
"Issue with computational backend. Tasks are set back "
"to WAITING_FOR_CLUSTER state until scheduler comes back!",
)
await CompTasksRepository.instance(
self.db_engine
).update_project_tasks_state(
project_id,
list(tasks_ready_to_start.keys()),
RunningState.WAITING_FOR_CLUSTER,
)
for task in tasks_ready_to_start:
comp_tasks[
NodeIDStr(f"{task}")
].state = RunningState.WAITING_FOR_CLUSTER

# Handling errors raised when _start_tasks(...)
for r, t in zip(results, tasks_ready_to_start, strict=True):
if isinstance(r, TaskSchedulingError):
_logger.error(
"Project '%s''s task '%s' could not be scheduled due to the following: %s",
r.project_id,
r.node_id,
f"{r}",
)

await CompTasksRepository.instance(
self.db_engine
).update_project_tasks_state(
project_id,
[r.node_id],
RunningState.FAILED,
r.get_errors(),
optional_progress=1.0,
optional_stopped=arrow.utcnow().datetime,
)
comp_tasks[NodeIDStr(f"{t}")].state = RunningState.FAILED
elif isinstance(
r,
ComputationalBackendNotConnectedError
| ComputationalSchedulerChangedError,
):
_logger.error(
"Issue with computational backend: %s. Tasks are set back "
"to WAITING_FOR_CLUSTER state until scheduler comes back!",
r,
)
# we should try re-connecting.
# in the meantime we cannot schedule tasks on the scheduler,
# let's put these tasks back to WAITING_FOR_CLUSTER, so they might be re-submitted later
await CompTasksRepository.instance(
self.db_engine
).update_project_tasks_state(
project_id,
list(tasks_ready_to_start.keys()),
RunningState.WAITING_FOR_CLUSTER,
)
comp_tasks[
NodeIDStr(f"{t}")
].state = RunningState.WAITING_FOR_CLUSTER
elif isinstance(r, Exception):
_logger.error(
"Unexpected error for %s with %s on %s happened when scheduling %s:\n%s\n%s",
f"{user_id=}",
f"{project_id=}",
f"{pipeline_params.cluster_id=}",
f"{tasks_ready_to_start.keys()=}",
f"{r}",
"".join(traceback.format_tb(r.__traceback__)),
)
await CompTasksRepository.instance(
self.db_engine
).update_project_tasks_state(
project_id,
[t],
RunningState.FAILED,
optional_progress=1.0,
optional_stopped=arrow.utcnow().datetime,
)
comp_tasks[NodeIDStr(f"{t}")].state = RunningState.FAILED
except ComputationalBackendOnDemandNotReadyError as exc:
_logger.info(
"The on demand computational backend is not ready yet: %s", exc
Expand All @@ -819,8 +775,10 @@ async def _schedule_tasks_to_start(
list(tasks_ready_to_start.keys()),
RunningState.WAITING_FOR_CLUSTER,
)
for task in comp_tasks.values():
task.state = RunningState.WAITING_FOR_CLUSTER
for task in tasks_ready_to_start:
comp_tasks[
NodeIDStr(f"{task}")
].state = RunningState.WAITING_FOR_CLUSTER
except ClustersKeeperNotAvailableError:
_logger.exception("Unexpected error while starting tasks:")
await publish_project_log(
Expand All @@ -840,8 +798,46 @@ async def _schedule_tasks_to_start(
optional_progress=1.0,
optional_stopped=arrow.utcnow().datetime,
)
for task in comp_tasks.values():
task.state = RunningState.FAILED
for task in tasks_ready_to_start:
comp_tasks[NodeIDStr(f"{task}")].state = RunningState.FAILED
raise
except TaskSchedulingError as exc:
_logger.exception(
"Project '%s''s task '%s' could not be scheduled",
exc.project_id,
exc.node_id,
)
await CompTasksRepository.instance(
self.db_engine
).update_project_tasks_state(
project_id,
[exc.node_id],
RunningState.FAILED,
exc.get_errors(),
optional_progress=1.0,
optional_stopped=arrow.utcnow().datetime,
)
comp_tasks[NodeIDStr(f"{exc.node_id}")].state = RunningState.FAILED
except Exception:
_logger.exception(
"Unexpected error for %s with %s on %s happened when scheduling %s:",
f"{user_id=}",
f"{project_id=}",
f"{pipeline_params.cluster_id=}",
f"{tasks_ready_to_start.keys()=}",
)
await CompTasksRepository.instance(
self.db_engine
).update_project_tasks_state(
project_id,
list(tasks_ready_to_start.keys()),
RunningState.FAILED,
optional_progress=1.0,
optional_stopped=arrow.utcnow().datetime,
)
for task in tasks_ready_to_start:
comp_tasks[NodeIDStr(f"{task}")].state = RunningState.FAILED
raise

return comp_tasks

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from ...models.comp_runs import RunMetadataDict
from ...models.comp_tasks import CompTaskAtDB
from ...models.dask_subsystem import DaskClientTaskState
from ...modules.dask_client import DaskClient
from ...modules.dask_client import DaskClient, PublishedComputationTask
from ...modules.dask_clients_pool import DaskClientsPool
from ...modules.db.repositories.clusters import ClustersRepository
from ...modules.db.repositories.comp_runs import CompRunsRepository
Expand Down Expand Up @@ -105,7 +105,7 @@ async def _start_tasks(
project_id: ProjectID,
scheduled_tasks: dict[NodeID, CompTaskAtDB],
pipeline_params: ScheduledPipelineParams,
) -> list:
) -> None:
# now transfer the pipeline to the dask scheduler
async with _cluster_dask_client(user_id, pipeline_params, self) as client:
# Change the tasks state to PENDING
Expand All @@ -116,9 +116,7 @@ async def _start_tasks(
RunningState.PENDING,
)
# each task is started independently
results: list[
list[tuple[NodeID, str]] | BaseException
] = await asyncio.gather(
results: list[list[PublishedComputationTask]] = await asyncio.gather(
*(
client.send_computation_tasks(
user_id=user_id,
Expand All @@ -131,20 +129,18 @@ async def _start_tasks(
)
for node_id, task in scheduled_tasks.items()
),
return_exceptions=True,
)

# update the database so we do have the correct job_ids there
await asyncio.gather(
*[
*(
comp_tasks_repo.update_project_task_job_id(
project_id, tasks_sent[0][0], tasks_sent[0][1]
project_id, task.node_id, task.job_id
)
for tasks_sent in results
if not isinstance(tasks_sent, BaseException)
]
for task_sents in results
for task in task_sents
)
)
return results

async def _get_tasks_status(
self,
Expand Down
Loading
Loading