From 1df9630008e37e1b86cc812cb66b55d4af8aec64 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Thu, 18 Jan 2024 21:14:17 +0100 Subject: [PATCH 01/17] missing types --- .../modules/comp_scheduler/base_scheduler.py | 2 +- .../modules/comp_scheduler/dask_scheduler.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/base_scheduler.py b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/base_scheduler.py index b3208c7db6d..f80b45ffab9 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/base_scheduler.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/base_scheduler.py @@ -551,7 +551,7 @@ async def _start_tasks( project_id: ProjectID, scheduled_tasks: dict[NodeID, CompTaskAtDB], pipeline_params: ScheduledPipelineParams, - ) -> list: + ) -> list[list[tuple[NodeID, str]] | BaseException]: ... @abstractmethod diff --git a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/dask_scheduler.py b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/dask_scheduler.py index 7750822c0bb..1b8dfa03026 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/dask_scheduler.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/dask_scheduler.py @@ -105,7 +105,7 @@ async def _start_tasks( project_id: ProjectID, scheduled_tasks: dict[NodeID, CompTaskAtDB], pipeline_params: ScheduledPipelineParams, - ) -> list: + ) -> list[list[tuple[NodeID, str]] | BaseException]: # now transfer the pipeline to the dask scheduler async with _cluster_dask_client(user_id, pipeline_params, self) as client: # Change the tasks state to PENDING From ded8690bc084ed70aba392e9317f7db9cecc8364 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Thu, 18 Jan 2024 21:17:02 +0100 Subject: [PATCH 02/17] describe bug --- .../modules/comp_scheduler/dask_scheduler.py | 1 + 1 file changed, 1 insertion(+) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/dask_scheduler.py b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/dask_scheduler.py index 1b8dfa03026..cdd6839d2c1 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/dask_scheduler.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/dask_scheduler.py @@ -132,6 +132,7 @@ async def _start_tasks( for node_id, task in scheduled_tasks.items() ), return_exceptions=True, + # TODO: fix this. the problem comes from this swallowing of exceptions ) # update the database so we do have the correct job_ids there From 70f74a1487866dd8f7fd5366999e15150125d7a3 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Fri, 19 Jan 2024 08:28:19 +0100 Subject: [PATCH 03/17] doc --- .../director-v2/src/simcore_service_director_v2/utils/dask.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/services/director-v2/src/simcore_service_director_v2/utils/dask.py b/services/director-v2/src/simcore_service_director_v2/utils/dask.py index e96bd146364..839d97286b3 100644 --- a/services/director-v2/src/simcore_service_director_v2/utils/dask.py +++ b/services/director-v2/src/simcore_service_director_v2/utils/dask.py @@ -305,6 +305,10 @@ def compute_task_labels( run_metadata: RunMetadataDict, node_requirements: NodeRequirements, ) -> ContainerLabelsDict: + """ + Raises: + ValidationError + """ product_name = run_metadata.get("product_name", UNDEFINED_DOCKER_LABEL) standard_simcore_labels = StandardSimcoreDockerLabels.construct( user_id=user_id, From fd4e633e5fb8ab5df1c0250032b0069b29a38241 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Fri, 19 Jan 2024 08:41:54 +0100 Subject: [PATCH 04/17] refactor and add doc --- .../modules/dask_client.py | 35 ++++++++++++++++--- 1 file changed, 30 insertions(+), 5 deletions(-) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dask_client.py b/services/director-v2/src/simcore_service_director_v2/modules/dask_client.py index a2a3ed99780..24aec880daf 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dask_client.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dask_client.py @@ -16,7 +16,7 @@ from copy import deepcopy from dataclasses import dataclass, field from http.client import HTTPException -from typing import Any +from typing import Any, TypeAlias import distributed from dask_task_models_library.container_tasks.docker import DockerBasicAuth @@ -36,6 +36,7 @@ from distributed.scheduler import TaskStateState as DaskSchedulerTaskState from fastapi import FastAPI from models_library.api_schemas_directorv2.clusters import ClusterDetails, Scheduler +from models_library.basic_types import IDStr from models_library.clusters import ClusterAuthentication, ClusterID, ClusterTypeInModel from models_library.projects import ProjectID from models_library.projects_nodes_io import NodeID @@ -95,6 +96,14 @@ _UserCallbackInSepThread = Callable[[], None] +DaskJobID: TypeAlias = IDStr + + +@dataclass(frozen=True, kw_only=True, slots=True) +class PublishedComputationTask: + node_id: NodeID + job_id: DaskJobID + @dataclass class DaskClient: @@ -191,9 +200,23 @@ async def send_computation_tasks( remote_fct: ContainerRemoteFct | None = None, metadata: RunMetadataDict, hardware_info: HardwareInfo, - ) -> list[tuple[NodeID, str]]: + ) -> list[PublishedComputationTask]: """actually sends the function remote_fct to be remotely executed. if None is kept then the default - function that runs container will be started.""" + function that runs container will be started. + + Raises: + - ComputationalBackendNoS3AccessError when storage is not accessible + - ComputationalSchedulerChangedError when expected scheduler changed + - ComputationalBackendNotConnectedError when scheduler is not connected/running + - MissingComputationalResourcesError (only for internal cluster) + - InsuficientComputationalResourcesError (only for internal cluster) + - PortsValidationError (not sure when this happens - node ports) + - S3InvalidStore (node ports, invalid S3 store) + - NodeportsException (node ports, generic exception) + - StorageServerIssue storage server is not responding + - ClientResponseError + - ValidationError + """ def _comp_sidecar_fct( *, @@ -215,7 +238,7 @@ def _comp_sidecar_fct( if remote_fct is None: remote_fct = _comp_sidecar_fct - list_of_node_id_to_job_id: list[tuple[NodeID, str]] = [] + list_of_node_id_to_job_id: list[PublishedComputationTask] = [] for node_id, node_image in tasks.items(): job_id = dask_utils.generate_dask_job_id( service_key=node_image.name, @@ -345,7 +368,9 @@ def _comp_sidecar_fct( # NOTE: the callback is running in a secondary thread, and takes a future as arg task_future.add_done_callback(lambda _: callback()) - list_of_node_id_to_job_id.append((node_id, job_id)) + list_of_node_id_to_job_id.append( + PublishedComputationTask(node_id=node_id, job_id=DaskJobID(job_id)) + ) await dask_utils.wrap_client_async_routine( self.backend.client.publish_dataset(task_future, name=job_id) ) From 00f0a011f6baba5d011f327167b26924f2006251 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Fri, 19 Jan 2024 08:50:18 +0100 Subject: [PATCH 05/17] adapt test --- .../tests/unit/test_modules_dask_client.py | 149 ++++++++++-------- 1 file changed, 87 insertions(+), 62 deletions(-) diff --git a/services/director-v2/tests/unit/test_modules_dask_client.py b/services/director-v2/tests/unit/test_modules_dask_client.py index 99022798fd2..702f803d147 100644 --- a/services/director-v2/tests/unit/test_modules_dask_client.py +++ b/services/director-v2/tests/unit/test_modules_dask_client.py @@ -549,6 +549,7 @@ def fake_sidecar_fct( assert node_params.node_requirements.ram assert "product_name" in comp_run_metadata assert "simcore_user_agent" in comp_run_metadata + assert image_params.fake_tasks[node_id].node_requirements is not None node_id_to_job_ids = await dask_client.send_computation_tasks( user_id=user_id, project_id=project_id, @@ -576,12 +577,14 @@ def fake_sidecar_fct( ) assert node_id_to_job_ids assert len(node_id_to_job_ids) == 1 - node_id, job_id = node_id_to_job_ids[0] - assert node_id in image_params.fake_tasks + published_computation_task = node_id_to_job_ids[0] + assert published_computation_task.node_id in image_params.fake_tasks # check status goes to PENDING/STARTED await _assert_wait_for_task_status( - job_id, dask_client, expected_status=DaskClientTaskState.PENDING_OR_STARTED + published_computation_task.job_id, + dask_client, + expected_status=DaskClientTaskState.PENDING_OR_STARTED, ) # using the event we let the remote fct continue @@ -593,23 +596,28 @@ def fake_sidecar_fct( # check the task status await _assert_wait_for_task_status( - job_id, dask_client, expected_status=DaskClientTaskState.SUCCESS + published_computation_task.job_id, + dask_client, + expected_status=DaskClientTaskState.SUCCESS, ) # check the results - task_result = await dask_client.get_task_result(job_id) + task_result = await dask_client.get_task_result(published_computation_task.job_id) assert isinstance(task_result, TaskOutputData) assert task_result.get("some_output_key") == 123 # now release the results - await dask_client.release_task_result(job_id) + await dask_client.release_task_result(published_computation_task.job_id) # check the status now await _assert_wait_for_task_status( - job_id, dask_client, expected_status=DaskClientTaskState.LOST, timeout=60 + published_computation_task.job_id, + dask_client, + expected_status=DaskClientTaskState.LOST, + timeout=60, ) with pytest.raises(ComputationalBackendTaskNotFoundError): - await dask_client.get_task_result(job_id) + await dask_client.get_task_result(published_computation_task.job_id) async def test_computation_task_is_persisted_on_dask_scheduler( @@ -650,7 +658,7 @@ def fake_sidecar_fct( return TaskOutputData.parse_obj({"some_output_key": 123}) # NOTE: We pass another fct so it can run in our localy created dask cluster - node_id_to_job_ids = await dask_client.send_computation_tasks( + published_computation_task = await dask_client.send_computation_tasks( user_id=user_id, project_id=project_id, cluster_id=cluster_id, @@ -660,32 +668,33 @@ def fake_sidecar_fct( metadata=comp_run_metadata, hardware_info=empty_hardware_info, ) - assert node_id_to_job_ids - assert len(node_id_to_job_ids) == 1 - node_id, job_id = node_id_to_job_ids[0] + assert published_computation_task + assert len(published_computation_task) == 1 await _assert_wait_for_cb_call( mocked_user_completed_cb, timeout=_ALLOW_TIME_FOR_GATEWAY_TO_CREATE_WORKERS ) # check the task status await _assert_wait_for_task_status( - job_id, dask_client, expected_status=DaskClientTaskState.SUCCESS + published_computation_task[0].job_id, + dask_client, + expected_status=DaskClientTaskState.SUCCESS, ) assert node_id in image_params.fake_tasks # creating a new future shows that it is not done???? - assert not distributed.Future(job_id).done() + assert not distributed.Future(published_computation_task[0].job_id).done() # as the task is published on the dask-scheduler when sending, it shall still be published on the dask scheduler list_of_persisted_datasets = await dask_client.backend.client.list_datasets() # type: ignore assert list_of_persisted_datasets assert isinstance(list_of_persisted_datasets, tuple) assert len(list_of_persisted_datasets) == 1 - assert job_id in list_of_persisted_datasets - assert list_of_persisted_datasets[0] == job_id + assert published_computation_task[0].job_id in list_of_persisted_datasets + assert list_of_persisted_datasets[0] == published_computation_task[0].job_id # get the persisted future from the scheduler back - task_future = await dask_client.backend.client.get_dataset(name=job_id) # type: ignore + task_future = await dask_client.backend.client.get_dataset(name=published_computation_task.job_id) # type: ignore assert task_future assert isinstance(task_future, distributed.Future) - assert task_future.key == job_id + assert task_future.key == published_computation_task[0].job_id # NOTE: the future was persisted BEFORE the computation was completed.. therefore it is not updated # this is a bit weird, but it is so, this assertion demonstrates it. we need to await the results. assert not task_future.done() @@ -695,7 +704,7 @@ def fake_sidecar_fct( assert isinstance(task_result, TaskOutputData) assert task_result.get("some_output_key") == 123 # try to create another future and this one is already done - assert distributed.Future(job_id).done() + assert distributed.Future(published_computation_task[0].job_id).done() async def test_abort_computation_tasks( @@ -740,7 +749,7 @@ def fake_remote_fct( return TaskOutputData.parse_obj({"some_output_key": 123}) - node_id_to_job_ids = await dask_client.send_computation_tasks( + published_computation_task = await dask_client.send_computation_tasks( user_id=user_id, project_id=project_id, cluster_id=cluster_id, @@ -750,12 +759,14 @@ def fake_remote_fct( metadata=comp_run_metadata, hardware_info=empty_hardware_info, ) - assert node_id_to_job_ids - assert len(node_id_to_job_ids) == 1 - node_id, job_id = node_id_to_job_ids[0] - assert node_id in image_params.fake_tasks + assert published_computation_task + assert len(published_computation_task) == 1 + + assert published_computation_task[0].node_id in image_params.fake_tasks await _assert_wait_for_task_status( - job_id, dask_client, DaskClientTaskState.PENDING_OR_STARTED + published_computation_task[0].job_id, + dask_client, + DaskClientTaskState.PENDING_OR_STARTED, ) # we wait to be sure the remote fct is started @@ -764,20 +775,23 @@ def fake_remote_fct( # now let's abort the computation cancel_event = await distributed.Event( - name=TaskCancelEventName.format(job_id), client=dask_client.backend.client + name=TaskCancelEventName.format(published_computation_task[0].job_id), + client=dask_client.backend.client, ) - await dask_client.abort_computation_task(job_id) + await dask_client.abort_computation_task(published_computation_task[0].job_id) assert await cancel_event.is_set() # type: ignore await _assert_wait_for_cb_call(mocked_user_completed_cb) - await _assert_wait_for_task_status(job_id, dask_client, DaskClientTaskState.ABORTED) + await _assert_wait_for_task_status( + published_computation_task[0].job_id, dask_client, DaskClientTaskState.ABORTED + ) # getting the results should throw the cancellation error with pytest.raises(TaskCancelledError): - await dask_client.get_task_result(job_id) + await dask_client.get_task_result(published_computation_task[0].job_id) # after releasing the results, the task shall be UNKNOWN - await dask_client.release_task_result(job_id) + await dask_client.release_task_result(published_computation_task[0].job_id) # NOTE: this change of status takes a very long time to happen and is not relied upon so we skip it since it # makes the test fail a lot for no gain (it's kept here in case it ever becomes an issue) # await _assert_wait_for_task_status( @@ -808,7 +822,7 @@ def fake_failing_sidecar_fct( err_msg = "sadly we are failing to execute anything cause we are dumb..." raise ValueError(err_msg) - node_id_to_job_ids = await dask_client.send_computation_tasks( + published_computation_task = await dask_client.send_computation_tasks( user_id=user_id, project_id=project_id, cluster_id=cluster_id, @@ -818,10 +832,10 @@ def fake_failing_sidecar_fct( metadata=comp_run_metadata, hardware_info=empty_hardware_info, ) - assert node_id_to_job_ids - assert len(node_id_to_job_ids) == 1 - node_id, job_id = node_id_to_job_ids[0] - assert node_id in gpu_image.fake_tasks + assert published_computation_task + assert len(published_computation_task) == 1 + + assert published_computation_task[0].node_id in gpu_image.fake_tasks # this waits for the computation to run await _assert_wait_for_cb_call( @@ -830,15 +844,17 @@ def fake_failing_sidecar_fct( # the computation status is FAILED await _assert_wait_for_task_status( - job_id, dask_client, expected_status=DaskClientTaskState.ERRED + published_computation_task[0].job_id, + dask_client, + expected_status=DaskClientTaskState.ERRED, ) with pytest.raises( ValueError, match="sadly we are failing to execute anything cause we are dumb...", ): - await dask_client.get_task_result(job_id) + await dask_client.get_task_result(published_computation_task[0].job_id) assert len(await dask_client.backend.client.list_datasets()) > 0 # type: ignore - await dask_client.release_task_result(job_id) + await dask_client.release_task_result(published_computation_task[0].job_id) assert len(await dask_client.backend.client.list_datasets()) == 0 # type: ignore @@ -1079,7 +1095,7 @@ def fake_remote_fct( raise ValueError(err_msg) return TaskOutputData.parse_obj({"some_output_key": 123}) - node_id_to_job_ids = await dask_client.send_computation_tasks( + published_computation_task = await dask_client.send_computation_tasks( user_id=user_id, project_id=project_id, cluster_id=cluster_id, @@ -1089,16 +1105,18 @@ def fake_remote_fct( metadata=comp_run_metadata, hardware_info=empty_hardware_info, ) - assert node_id_to_job_ids - assert len(node_id_to_job_ids) == 1 - node_id, job_id = node_id_to_job_ids[0] - assert node_id in cpu_image.fake_tasks + assert published_computation_task + assert len(published_computation_task) == 1 + + assert published_computation_task[0].node_id in cpu_image.fake_tasks # let's get a dask future for the task here so dask will not remove the task from the scheduler at the end - computation_future = distributed.Future(key=job_id) + computation_future = distributed.Future(key=published_computation_task[0].job_id) assert computation_future await _assert_wait_for_task_status( - job_id, dask_client, DaskClientTaskState.PENDING_OR_STARTED + published_computation_task[0].job_id, + dask_client, + DaskClientTaskState.PENDING_OR_STARTED, ) # let the remote fct run through now @@ -1106,15 +1124,15 @@ def fake_remote_fct( await start_event.set() # type: ignore # it will become successful hopefuly await _assert_wait_for_task_status( - job_id, + published_computation_task[0].job_id, dask_client, DaskClientTaskState.ERRED if fail_remote_fct else DaskClientTaskState.SUCCESS, ) # release the task results - await dask_client.release_task_result(job_id) + await dask_client.release_task_result(published_computation_task[0].job_id) # the task is still present since we hold a future here await _assert_wait_for_task_status( - job_id, + published_computation_task[0].job_id, dask_client, DaskClientTaskState.ERRED if fail_remote_fct else DaskClientTaskState.SUCCESS, ) @@ -1122,7 +1140,10 @@ def fake_remote_fct( # removing the future will let dask eventually delete the task from its memory, so its status becomes undefined del computation_future await _assert_wait_for_task_status( - job_id, dask_client, DaskClientTaskState.LOST, timeout=60 + published_computation_task[0].job_id, + dask_client, + DaskClientTaskState.LOST, + timeout=60, ) @@ -1166,7 +1187,7 @@ def fake_remote_fct( return TaskOutputData.parse_obj({"some_output_key": 123}) # run the computation - node_id_to_job_ids = await dask_client.send_computation_tasks( + published_computation_task = await dask_client.send_computation_tasks( user_id=user_id, project_id=project_id, cluster_id=cluster_id, @@ -1176,11 +1197,11 @@ def fake_remote_fct( metadata=comp_run_metadata, hardware_info=empty_hardware_info, ) - assert node_id_to_job_ids - assert len(node_id_to_job_ids) == 1 - node_id, job_id = node_id_to_job_ids[0] - assert node_id in cpu_image.fake_tasks - computation_future = distributed.Future(job_id) + assert published_computation_task + assert len(published_computation_task) == 1 + + assert published_computation_task[0].node_id in cpu_image.fake_tasks + computation_future = distributed.Future(published_computation_task[0].job_id) print("--> waiting for job to finish...") await distributed.wait(computation_future, timeout=_ALLOW_TIME_FOR_GATEWAY_TO_CREATE_WORKERS) # type: ignore assert computation_future.done() @@ -1242,7 +1263,7 @@ def fake_sidecar_fct( return TaskOutputData.parse_obj({"some_output_key": 123}) # NOTE: We pass another fct so it can run in our localy created dask cluster - node_id_to_job_ids = await dask_client.send_computation_tasks( + published_computation_task = await dask_client.send_computation_tasks( user_id=user_id, project_id=project_id, cluster_id=cluster_id, @@ -1254,14 +1275,16 @@ def fake_sidecar_fct( metadata=comp_run_metadata, hardware_info=empty_hardware_info, ) - assert node_id_to_job_ids - assert len(node_id_to_job_ids) == 1 - node_id, job_id = node_id_to_job_ids[0] - assert node_id in image_params.fake_tasks + assert published_computation_task + assert len(published_computation_task) == 1 + + assert published_computation_task[0].node_id in image_params.fake_tasks # check status goes to PENDING/STARTED await _assert_wait_for_task_status( - job_id, dask_client, expected_status=DaskClientTaskState.PENDING_OR_STARTED + published_computation_task[0].job_id, + dask_client, + expected_status=DaskClientTaskState.PENDING_OR_STARTED, ) # check we have one worker using the resources @@ -1290,7 +1313,9 @@ def fake_sidecar_fct( # wait for the task to complete await _assert_wait_for_task_status( - job_id, dask_client, expected_status=DaskClientTaskState.SUCCESS + published_computation_task[0].job_id, + dask_client, + expected_status=DaskClientTaskState.SUCCESS, ) # check the resources are released From cfb913c54156b88c0a62e84969b412704bf9e706 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Fri, 19 Jan 2024 10:25:20 +0100 Subject: [PATCH 06/17] ongoing --- .../modules/comp_scheduler/dask_scheduler.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/dask_scheduler.py b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/dask_scheduler.py index cdd6839d2c1..dac541cc116 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/dask_scheduler.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/dask_scheduler.py @@ -30,7 +30,7 @@ from ...models.comp_runs import RunMetadataDict from ...models.comp_tasks import CompTaskAtDB from ...models.dask_subsystem import DaskClientTaskState -from ...modules.dask_client import DaskClient +from ...modules.dask_client import DaskClient, PublishedComputationTask from ...modules.dask_clients_pool import DaskClientsPool from ...modules.db.repositories.clusters import ClustersRepository from ...modules.db.repositories.comp_runs import CompRunsRepository @@ -106,6 +106,10 @@ async def _start_tasks( scheduled_tasks: dict[NodeID, CompTaskAtDB], pipeline_params: ScheduledPipelineParams, ) -> list[list[tuple[NodeID, str]] | BaseException]: + """ + Raises: + + """ # now transfer the pipeline to the dask scheduler async with _cluster_dask_client(user_id, pipeline_params, self) as client: # Change the tasks state to PENDING @@ -116,9 +120,7 @@ async def _start_tasks( RunningState.PENDING, ) # each task is started independently - results: list[ - list[tuple[NodeID, str]] | BaseException - ] = await asyncio.gather( + results: list[list[PublishedComputationTask]] = await asyncio.gather( *( client.send_computation_tasks( user_id=user_id, @@ -131,18 +133,16 @@ async def _start_tasks( ) for node_id, task in scheduled_tasks.items() ), - return_exceptions=True, - # TODO: fix this. the problem comes from this swallowing of exceptions ) # update the database so we do have the correct job_ids there + # for task_sents in result await asyncio.gather( *[ comp_tasks_repo.update_project_task_job_id( project_id, tasks_sent[0][0], tasks_sent[0][1] ) for tasks_sent in results - if not isinstance(tasks_sent, BaseException) ] ) return results From 84dfad39cfa21fd3f909596f6e894c88660df9ab Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Fri, 19 Jan 2024 10:52:10 +0100 Subject: [PATCH 07/17] might be the fix --- .../modules/comp_scheduler/base_scheduler.py | 124 ++++++++---------- .../modules/comp_scheduler/dask_scheduler.py | 15 +-- .../modules/dask_client.py | 1 + 3 files changed, 63 insertions(+), 77 deletions(-) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/base_scheduler.py b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/base_scheduler.py index f80b45ffab9..11c8720cda7 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/base_scheduler.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/base_scheduler.py @@ -13,7 +13,6 @@ import asyncio import datetime import logging -import traceback from abc import ABC, abstractmethod from dataclasses import dataclass, field from typing import Final @@ -551,7 +550,7 @@ async def _start_tasks( project_id: ProjectID, scheduled_tasks: dict[NodeID, CompTaskAtDB], pipeline_params: ScheduledPipelineParams, - ) -> list[list[tuple[NodeID, str]] | BaseException]: + ) -> None: ... @abstractmethod @@ -729,77 +728,49 @@ async def _schedule_tasks_to_start( return comp_tasks try: - results = await self._start_tasks( + await self._start_tasks( user_id=user_id, project_id=project_id, scheduled_tasks=tasks_ready_to_start, pipeline_params=pipeline_params, ) + except TaskSchedulingError as exc: + _logger.exception( + "Project '%s''s task '%s' could not be scheduled", + exc.project_id, + exc.node_id, + ) + await CompTasksRepository.instance( + self.db_engine + ).update_project_tasks_state( + project_id, + [exc.node_id], + RunningState.FAILED, + exc.get_errors(), + optional_progress=1.0, + optional_stopped=arrow.utcnow().datetime, + ) + comp_tasks[NodeIDStr(f"{exc.node_id}")].state = RunningState.FAILED + except ( + ComputationalBackendNotConnectedError, + ComputationalSchedulerChangedError, + ): + _logger.exception( + "Issue with computational backend. Tasks are set back " + "to WAITING_FOR_CLUSTER state until scheduler comes back!", + ) + await CompTasksRepository.instance( + self.db_engine + ).update_project_tasks_state( + project_id, + list(tasks_ready_to_start.keys()), + RunningState.WAITING_FOR_CLUSTER, + ) + for task in tasks_ready_to_start: + comp_tasks[ + NodeIDStr(f"{task}") + ].state = RunningState.WAITING_FOR_CLUSTER - # Handling errors raised when _start_tasks(...) - for r, t in zip(results, tasks_ready_to_start, strict=True): - if isinstance(r, TaskSchedulingError): - _logger.error( - "Project '%s''s task '%s' could not be scheduled due to the following: %s", - r.project_id, - r.node_id, - f"{r}", - ) - - await CompTasksRepository.instance( - self.db_engine - ).update_project_tasks_state( - project_id, - [r.node_id], - RunningState.FAILED, - r.get_errors(), - optional_progress=1.0, - optional_stopped=arrow.utcnow().datetime, - ) - comp_tasks[NodeIDStr(f"{t}")].state = RunningState.FAILED - elif isinstance( - r, - ComputationalBackendNotConnectedError - | ComputationalSchedulerChangedError, - ): - _logger.error( - "Issue with computational backend: %s. Tasks are set back " - "to WAITING_FOR_CLUSTER state until scheduler comes back!", - r, - ) - # we should try re-connecting. - # in the meantime we cannot schedule tasks on the scheduler, - # let's put these tasks back to WAITING_FOR_CLUSTER, so they might be re-submitted later - await CompTasksRepository.instance( - self.db_engine - ).update_project_tasks_state( - project_id, - list(tasks_ready_to_start.keys()), - RunningState.WAITING_FOR_CLUSTER, - ) - comp_tasks[ - NodeIDStr(f"{t}") - ].state = RunningState.WAITING_FOR_CLUSTER - elif isinstance(r, Exception): - _logger.error( - "Unexpected error for %s with %s on %s happened when scheduling %s:\n%s\n%s", - f"{user_id=}", - f"{project_id=}", - f"{pipeline_params.cluster_id=}", - f"{tasks_ready_to_start.keys()=}", - f"{r}", - "".join(traceback.format_tb(r.__traceback__)), - ) - await CompTasksRepository.instance( - self.db_engine - ).update_project_tasks_state( - project_id, - [t], - RunningState.FAILED, - optional_progress=1.0, - optional_stopped=arrow.utcnow().datetime, - ) - comp_tasks[NodeIDStr(f"{t}")].state = RunningState.FAILED except ComputationalBackendOnDemandNotReadyError as exc: _logger.info( "The on demand computational backend is not ready yet: %s", exc @@ -842,6 +813,25 @@ async def _schedule_tasks_to_start( ) for task in comp_tasks.values(): task.state = RunningState.FAILED + except Exception: + _logger.exception( + "Unexpected error for %s with %s on %s happened when scheduling %s:", + f"{user_id=}", + f"{project_id=}", + f"{pipeline_params.cluster_id=}", + f"{tasks_ready_to_start.keys()=}", + ) + await CompTasksRepository.instance( + self.db_engine + ).update_project_tasks_state( + project_id, + list(tasks_ready_to_start.keys()), + RunningState.FAILED, + optional_progress=1.0, + optional_stopped=arrow.utcnow().datetime, + ) + for task in comp_tasks.values(): + task.state = RunningState.FAILED return comp_tasks diff --git a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/dask_scheduler.py b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/dask_scheduler.py index dac541cc116..ceec877c6f0 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/dask_scheduler.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/dask_scheduler.py @@ -105,7 +105,7 @@ async def _start_tasks( project_id: ProjectID, scheduled_tasks: dict[NodeID, CompTaskAtDB], pipeline_params: ScheduledPipelineParams, - ) -> list[list[tuple[NodeID, str]] | BaseException]: + ) -> None: """ Raises: @@ -136,16 +136,11 @@ async def _start_tasks( ) # update the database so we do have the correct job_ids there - # for task_sents in result - await asyncio.gather( - *[ - comp_tasks_repo.update_project_task_job_id( - project_id, tasks_sent[0][0], tasks_sent[0][1] + for task_sents in results: + for task in task_sents: + await comp_tasks_repo.update_project_task_job_id( + project_id, task.node_id, task.job_id ) - for tasks_sent in results - ] - ) - return results async def _get_tasks_status( self, diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dask_client.py b/services/director-v2/src/simcore_service_director_v2/modules/dask_client.py index 24aec880daf..4fc7d07610a 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dask_client.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dask_client.py @@ -210,6 +210,7 @@ async def send_computation_tasks( - ComputationalBackendNotConnectedError when scheduler is not connected/running - MissingComputationalResourcesError (only for internal cluster) - InsuficientComputationalResourcesError (only for internal cluster) + - TaskSchedulingError - PortsValidationError (not sure when this happens - node ports) - S3InvalidStore (node ports, invalid S3 store) - NodeportsException (node ports, generic exception) From 671a892f66a62660354a339be5a26fa953e71806 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Fri, 19 Jan 2024 11:04:12 +0100 Subject: [PATCH 08/17] fix tests --- services/director-v2/tests/unit/test_modules_dask_client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/services/director-v2/tests/unit/test_modules_dask_client.py b/services/director-v2/tests/unit/test_modules_dask_client.py index 702f803d147..7f31b1fd1ac 100644 --- a/services/director-v2/tests/unit/test_modules_dask_client.py +++ b/services/director-v2/tests/unit/test_modules_dask_client.py @@ -679,7 +679,7 @@ def fake_sidecar_fct( dask_client, expected_status=DaskClientTaskState.SUCCESS, ) - assert node_id in image_params.fake_tasks + assert published_computation_task[0].node_id in image_params.fake_tasks # creating a new future shows that it is not done???? assert not distributed.Future(published_computation_task[0].job_id).done() @@ -691,7 +691,7 @@ def fake_sidecar_fct( assert published_computation_task[0].job_id in list_of_persisted_datasets assert list_of_persisted_datasets[0] == published_computation_task[0].job_id # get the persisted future from the scheduler back - task_future = await dask_client.backend.client.get_dataset(name=published_computation_task.job_id) # type: ignore + task_future = await dask_client.backend.client.get_dataset(name=published_computation_task[0].job_id) # type: ignore assert task_future assert isinstance(task_future, distributed.Future) assert task_future.key == published_computation_task[0].job_id From 970af5d01e19deff26c6ba8b93480d7c5cab95d4 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Fri, 19 Jan 2024 11:08:29 +0100 Subject: [PATCH 09/17] fix test --- .../test_modules_comp_scheduler_dask_scheduler.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py b/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py index 3e12b32fd01..af199cc59f8 100644 --- a/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py +++ b/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py @@ -72,6 +72,10 @@ from simcore_service_director_v2.modules.comp_scheduler.dask_scheduler import ( DaskScheduler, ) +from simcore_service_director_v2.modules.dask_client import ( + DaskJobID, + PublishedComputationTask, +) from simcore_service_director_v2.utils.comp_scheduler import COMPLETED_STATES from simcore_service_director_v2.utils.dask_client_utils import TaskHandlers from starlette.testclient import TestClient @@ -564,11 +568,14 @@ def _mock_send_computation_tasks( async def _send_computation_tasks( *args, tasks: dict[NodeID, Image], **kwargs - ) -> list[tuple[NodeID, str]]: + ) -> list[PublishedComputationTask]: for node_id in tasks: assert NodeID(f"{node_id}") in node_id_to_job_id_map return [ - (NodeID(f"{node_id}"), node_id_to_job_id_map[NodeID(f"{node_id}")]) + PublishedComputationTask( + node_id=NodeID(f"{node_id}"), + job_id=DaskJobID(node_id_to_job_id_map[NodeID(f"{node_id}")]), + ) for node_id in tasks ] # type: ignore From ed9854132d3148590cac2a5de4e185be4e32fdc9 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Fri, 19 Jan 2024 14:11:15 +0100 Subject: [PATCH 10/17] refactor --- .../models/dask_subsystem.py | 5 + .../modules/comp_scheduler/base_scheduler.py | 48 +-- .../modules/dask_client.py | 279 ++++++++++-------- .../simcore_service_director_v2/utils/dask.py | 7 +- 4 files changed, 193 insertions(+), 146 deletions(-) diff --git a/services/director-v2/src/simcore_service_director_v2/models/dask_subsystem.py b/services/director-v2/src/simcore_service_director_v2/models/dask_subsystem.py index 34a270cba88..d6e4a0d267a 100644 --- a/services/director-v2/src/simcore_service_director_v2/models/dask_subsystem.py +++ b/services/director-v2/src/simcore_service_director_v2/models/dask_subsystem.py @@ -1,4 +1,5 @@ from enum import Enum +from typing import TypeAlias # NOTE: mypy fails with src/simcore_service_director_v2/modules/dask_client.py:101:5: error: Dict entry 0 has incompatible type "str": "auto"; expected "Any": "DaskClientTaskState" [dict-item] @@ -11,3 +12,7 @@ class DaskClientTaskState(str, Enum): ERRED = "ERRED" ABORTED = "ABORTED" SUCCESS = "SUCCESS" + + +DaskJobID: TypeAlias = str +DaskResources: TypeAlias = dict[str, int | float] diff --git a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/base_scheduler.py b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/base_scheduler.py index 11c8720cda7..5d248a26443 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/base_scheduler.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/base_scheduler.py @@ -734,23 +734,6 @@ async def _schedule_tasks_to_start( scheduled_tasks=tasks_ready_to_start, pipeline_params=pipeline_params, ) - except TaskSchedulingError as exc: - _logger.exception( - "Project '%s''s task '%s' could not be scheduled", - exc.project_id, - exc.node_id, - ) - await CompTasksRepository.instance( - self.db_engine - ).update_project_tasks_state( - project_id, - [exc.node_id], - RunningState.FAILED, - exc.get_errors(), - optional_progress=1.0, - optional_stopped=arrow.utcnow().datetime, - ) - comp_tasks[NodeIDStr(f"{exc.node_id}")].state = RunningState.FAILED except ( ComputationalBackendNotConnectedError, ComputationalSchedulerChangedError, @@ -790,8 +773,10 @@ async def _schedule_tasks_to_start( list(tasks_ready_to_start.keys()), RunningState.WAITING_FOR_CLUSTER, ) - for task in comp_tasks.values(): - task.state = RunningState.WAITING_FOR_CLUSTER + for task in tasks_ready_to_start: + comp_tasks[ + NodeIDStr(f"{task}") + ].state = RunningState.WAITING_FOR_CLUSTER except ClustersKeeperNotAvailableError: _logger.exception("Unexpected error while starting tasks:") await publish_project_log( @@ -811,8 +796,25 @@ async def _schedule_tasks_to_start( optional_progress=1.0, optional_stopped=arrow.utcnow().datetime, ) - for task in comp_tasks.values(): - task.state = RunningState.FAILED + for task in tasks_ready_to_start: + comp_tasks[NodeIDStr(f"{task}")].state = RunningState.FAILED + except TaskSchedulingError as exc: + _logger.exception( + "Project '%s''s task '%s' could not be scheduled", + exc.project_id, + exc.node_id, + ) + await CompTasksRepository.instance( + self.db_engine + ).update_project_tasks_state( + project_id, + [exc.node_id], + RunningState.FAILED, + exc.get_errors(), + optional_progress=1.0, + optional_stopped=arrow.utcnow().datetime, + ) + comp_tasks[NodeIDStr(f"{exc.node_id}")].state = RunningState.FAILED except Exception: _logger.exception( "Unexpected error for %s with %s on %s happened when scheduling %s:", @@ -830,8 +832,8 @@ async def _schedule_tasks_to_start( optional_progress=1.0, optional_stopped=arrow.utcnow().datetime, ) - for task in comp_tasks.values(): - task.state = RunningState.FAILED + for task in tasks_ready_to_start: + comp_tasks[NodeIDStr(f"{task}")].state = RunningState.FAILED return comp_tasks diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dask_client.py b/services/director-v2/src/simcore_service_director_v2/modules/dask_client.py index 4fc7d07610a..2d955fedf61 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dask_client.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dask_client.py @@ -16,19 +16,25 @@ from copy import deepcopy from dataclasses import dataclass, field from http.client import HTTPException -from typing import Any, TypeAlias +from typing import Any import distributed +from aiohttp import ClientResponseError from dask_task_models_library.container_tasks.docker import DockerBasicAuth from dask_task_models_library.container_tasks.errors import TaskCancelledError from dask_task_models_library.container_tasks.io import ( TaskCancelEventName, + TaskInputData, TaskOutputData, + TaskOutputDataSchema, ) from dask_task_models_library.container_tasks.protocol import ( + ContainerEnvsDict, + ContainerLabelsDict, ContainerRemoteFct, ContainerTaskParameters, LogFileUploadURL, + TaskOwner, ) from dask_task_models_library.resource_constraints import ( create_ec2_resource_constraint_key, @@ -36,16 +42,16 @@ from distributed.scheduler import TaskStateState as DaskSchedulerTaskState from fastapi import FastAPI from models_library.api_schemas_directorv2.clusters import ClusterDetails, Scheduler -from models_library.basic_types import IDStr from models_library.clusters import ClusterAuthentication, ClusterID, ClusterTypeInModel from models_library.projects import ProjectID from models_library.projects_nodes_io import NodeID from models_library.resource_tracker import HardwareInfo from models_library.users import UserID -from pydantic import parse_obj_as +from pydantic import ValidationError, parse_obj_as from pydantic.networks import AnyUrl from servicelib.logging_utils import log_catch from settings_library.s3 import S3Settings +from simcore_sdk.node_ports_common.exceptions import NodeportsException from simcore_sdk.node_ports_v2 import FileLinkType from tenacity._asyncio import AsyncRetrying from tenacity.before_sleep import before_sleep_log @@ -56,11 +62,12 @@ ComputationalBackendNoS3AccessError, ComputationalBackendTaskNotFoundError, ComputationalBackendTaskResultsNotReadyError, + TaskSchedulingError, ) from ..core.settings import AppSettings, ComputationalBackendSettings from ..models.comp_runs import RunMetadataDict from ..models.comp_tasks import Image -from ..models.dask_subsystem import DaskClientTaskState +from ..models.dask_subsystem import DaskClientTaskState, DaskJobID, DaskResources from ..modules.storage import StorageClient from ..utils import dask as dask_utils from ..utils.dask_client_utils import ( @@ -96,8 +103,6 @@ _UserCallbackInSepThread = Callable[[], None] -DaskJobID: TypeAlias = IDStr - @dataclass(frozen=True, kw_only=True, slots=True) class PublishedComputationTask: @@ -189,36 +194,23 @@ def register_handlers(self, task_handlers: TaskHandlers) -> None: for dask_sub, handler in _event_consumer_map ] - async def send_computation_tasks( + async def _publish_in_dask( # noqa: PLR0913 # pylint: disable=too-many-arguments self, *, - user_id: UserID, - project_id: ProjectID, - cluster_id: ClusterID, - tasks: dict[NodeID, Image], - callback: _UserCallbackInSepThread, remote_fct: ContainerRemoteFct | None = None, - metadata: RunMetadataDict, - hardware_info: HardwareInfo, - ) -> list[PublishedComputationTask]: - """actually sends the function remote_fct to be remotely executed. if None is kept then the default - function that runs container will be started. - - Raises: - - ComputationalBackendNoS3AccessError when storage is not accessible - - ComputationalSchedulerChangedError when expected scheduler changed - - ComputationalBackendNotConnectedError when scheduler is not connected/running - - MissingComputationalResourcesError (only for internal cluster) - - InsuficientComputationalResourcesError (only for internal cluster) - - TaskSchedulingError - - PortsValidationError (not sure when this happens - node ports) - - S3InvalidStore (node ports, invalid S3 store) - - NodeportsException (node ports, generic exception) - - StorageServerIssue storage server is not responding - - ClientResponseError - - ValidationError - """ - + node_image: Image, + input_data: TaskInputData, + output_data_keys: TaskOutputDataSchema, + log_file_url: AnyUrl, + task_envs: ContainerEnvsDict, + task_labels: ContainerLabelsDict, + task_owner: TaskOwner, + s3_settings: S3Settings | None, + dask_resources: DaskResources, + node_id: NodeID, + job_id: DaskJobID, + callback: _UserCallbackInSepThread, + ) -> PublishedComputationTask: def _comp_sidecar_fct( *, task_parameters: ContainerTaskParameters, @@ -239,6 +231,77 @@ def _comp_sidecar_fct( if remote_fct is None: remote_fct = _comp_sidecar_fct + try: + assert self.app.state # nosec + assert self.app.state.settings # nosec + settings: AppSettings = self.app.state.settings + task_future = self.backend.client.submit( + remote_fct, + task_parameters=ContainerTaskParameters( + image=node_image.name, + tag=node_image.tag, + input_data=input_data, + output_data_keys=output_data_keys, + command=node_image.command, + envs=task_envs, + labels=task_labels, + boot_mode=node_image.boot_mode, + task_owner=task_owner, + ), + docker_auth=DockerBasicAuth( + server_address=settings.DIRECTOR_V2_DOCKER_REGISTRY.resolved_registry_url, + username=settings.DIRECTOR_V2_DOCKER_REGISTRY.REGISTRY_USER, + password=settings.DIRECTOR_V2_DOCKER_REGISTRY.REGISTRY_PW, + ), + log_file_url=log_file_url, + s3_settings=s3_settings, + key=job_id, + resources=dask_resources, + retries=0, + ) + # NOTE: the callback is running in a secondary thread, and takes a future as arg + task_future.add_done_callback(lambda _: callback()) + + await dask_utils.wrap_client_async_routine( + self.backend.client.publish_dataset(task_future, name=job_id) + ) + + _logger.debug( + "Dask task %s started [%s]", + f"{task_future.key=}", + f"{node_image.command=}", + ) + return PublishedComputationTask(node_id=node_id, job_id=DaskJobID(job_id)) + except Exception: + # Dask raises a base Exception here in case of connection error, this will raise a more precise one + dask_utils.check_scheduler_status(self.backend.client) + # if the connection is good, then the problem is different, so we re-raise + raise + + async def send_computation_tasks( + self, + *, + user_id: UserID, + project_id: ProjectID, + cluster_id: ClusterID, + tasks: dict[NodeID, Image], + callback: _UserCallbackInSepThread, + remote_fct: ContainerRemoteFct | None = None, + metadata: RunMetadataDict, + hardware_info: HardwareInfo, + ) -> list[PublishedComputationTask]: + """actually sends the function remote_fct to be remotely executed. if None is kept then the default + function that runs container will be started. + + Raises: + - ComputationalBackendNoS3AccessError when storage is not accessible + - ComputationalSchedulerChangedError when expected scheduler changed + - ComputationalBackendNotConnectedError when scheduler is not connected/running + - MissingComputationalResourcesError (only for internal cluster) + - InsuficientComputationalResourcesError (only for internal cluster) + - TaskSchedulingError when any other error happens + """ + list_of_node_id_to_job_id: list[PublishedComputationTask] = [] for node_id, node_image in tasks.items(): job_id = dask_utils.generate_dask_job_id( @@ -292,100 +355,74 @@ def _comp_sidecar_fct( except HTTPException as err: raise ComputationalBackendNoS3AccessError from err - # This instance is created only once so it can be reused in calls below - node_ports = await dask_utils.create_node_ports( - db_engine=self.app.state.engine, - user_id=user_id, - project_id=project_id, - node_id=node_id, - ) - # NOTE: for download there is no need to go with S3 links - input_data = await dask_utils.compute_input_data( - project_id=project_id, - node_id=node_id, - node_ports=node_ports, - file_link_type=FileLinkType.PRESIGNED, - ) - output_data_keys = await dask_utils.compute_output_data_schema( - user_id=user_id, - project_id=project_id, - node_id=node_id, - node_ports=node_ports, - file_link_type=self.tasks_file_link_type, - ) - log_file_url = await dask_utils.compute_service_log_file_upload_link( - user_id, - project_id, - node_id, - file_link_type=self.tasks_file_link_type, - ) - task_labels = dask_utils.compute_task_labels( - user_id=user_id, - project_id=project_id, - node_id=node_id, - run_metadata=metadata, - node_requirements=node_image.node_requirements, - ) - task_envs = await dask_utils.compute_task_envs( - self.app, - user_id=user_id, - project_id=project_id, - node_id=node_id, - node_image=node_image, - metadata=metadata, - ) - task_owner = dask_utils.compute_task_owner( - user_id, project_id, node_id, metadata.get("project_metadata", {}) - ) - try: - assert self.app.state # nosec - assert self.app.state.settings # nosec - settings: AppSettings = self.app.state.settings - task_future = self.backend.client.submit( - remote_fct, - task_parameters=ContainerTaskParameters( - image=node_image.name, - tag=node_image.tag, + # This instance is created only once so it can be reused in calls below + node_ports = await dask_utils.create_node_ports( + db_engine=self.app.state.engine, + user_id=user_id, + project_id=project_id, + node_id=node_id, + ) + # NOTE: for download there is no need to go with S3 links + input_data = await dask_utils.compute_input_data( + project_id=project_id, + node_id=node_id, + node_ports=node_ports, + file_link_type=FileLinkType.PRESIGNED, + ) + output_data_keys = await dask_utils.compute_output_data_schema( + user_id=user_id, + project_id=project_id, + node_id=node_id, + node_ports=node_ports, + file_link_type=self.tasks_file_link_type, + ) + log_file_url = await dask_utils.compute_service_log_file_upload_link( + user_id, + project_id, + node_id, + file_link_type=self.tasks_file_link_type, + ) + task_labels = dask_utils.compute_task_labels( + user_id=user_id, + project_id=project_id, + node_id=node_id, + run_metadata=metadata, + node_requirements=node_image.node_requirements, + ) + task_envs = await dask_utils.compute_task_envs( + self.app, + user_id=user_id, + project_id=project_id, + node_id=node_id, + node_image=node_image, + metadata=metadata, + ) + task_owner = dask_utils.compute_task_owner( + user_id, project_id, node_id, metadata.get("project_metadata", {}) + ) + list_of_node_id_to_job_id.append( + await self._publish_in_dask( + remote_fct=remote_fct, + node_image=node_image, input_data=input_data, output_data_keys=output_data_keys, - command=node_image.command, - envs=task_envs, - labels=task_labels, - boot_mode=node_image.boot_mode, + log_file_url=log_file_url, + task_envs=task_envs, + task_labels=task_labels, task_owner=task_owner, - ), - docker_auth=DockerBasicAuth( - server_address=settings.DIRECTOR_V2_DOCKER_REGISTRY.resolved_registry_url, - username=settings.DIRECTOR_V2_DOCKER_REGISTRY.REGISTRY_USER, - password=settings.DIRECTOR_V2_DOCKER_REGISTRY.REGISTRY_PW, - ), - log_file_url=log_file_url, - s3_settings=s3_settings, - key=job_id, - resources=dask_resources, - retries=0, - ) - # NOTE: the callback is running in a secondary thread, and takes a future as arg - task_future.add_done_callback(lambda _: callback()) - - list_of_node_id_to_job_id.append( - PublishedComputationTask(node_id=node_id, job_id=DaskJobID(job_id)) - ) - await dask_utils.wrap_client_async_routine( - self.backend.client.publish_dataset(task_future, name=job_id) + s3_settings=s3_settings, + dask_resources=dask_resources, + node_id=node_id, + job_id=job_id, + callback=callback, + ) ) + except (NodeportsException, ValidationError, ClientResponseError) as exc: + raise TaskSchedulingError( + project_id=project_id, node_id=node_id, msg={exc} + ) from exc - _logger.debug( - "Dask task %s started [%s]", - f"{task_future.key=}", - f"{node_image.command=}", - ) - except Exception: - # Dask raises a base Exception here in case of connection error, this will raise a more precise one - dask_utils.check_scheduler_status(self.backend.client) - # if the connection is good, then the problem is different, so we re-raise - raise return list_of_node_id_to_job_id async def get_tasks_status(self, job_ids: list[str]) -> list[DaskClientTaskState]: diff --git a/services/director-v2/src/simcore_service_director_v2/utils/dask.py b/services/director-v2/src/simcore_service_director_v2/utils/dask.py index 839d97286b3..b2e335d0624 100644 --- a/services/director-v2/src/simcore_service_director_v2/utils/dask.py +++ b/services/director-v2/src/simcore_service_director_v2/utils/dask.py @@ -51,6 +51,7 @@ ) from ..models.comp_runs import ProjectMetadataDict, RunMetadataDict from ..models.comp_tasks import Image +from ..models.dask_subsystem import DaskJobID from ..modules.osparc_variables_substitutions import ( resolve_and_substitute_session_variables_in_specs, substitute_vendor_secrets_in_specs, @@ -82,13 +83,15 @@ def generate_dask_job_id( user_id: UserID, project_id: ProjectID, node_id: NodeID, -) -> str: +) -> DaskJobID: """creates a dask job id: The job ID shall contain the user_id, project_id, node_id Also, it must be unique and it is shown in the Dask scheduler dashboard website """ - return f"{service_key}:{service_version}:userid_{user_id}:projectid_{project_id}:nodeid_{node_id}:uuid_{uuid4()}" + return DaskJobID( + f"{service_key}:{service_version}:userid_{user_id}:projectid_{project_id}:nodeid_{node_id}:uuid_{uuid4()}" + ) _JOB_ID_PARTS: Final[int] = 6 From a5a648dc1cf85598f68a1bde75c2c2522cd8cfd9 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Fri, 19 Jan 2024 15:16:42 +0100 Subject: [PATCH 11/17] fixing waiting for clusters indefinitely --- .../modules/comp_scheduler/base_scheduler.py | 4 +++- .../modules/db/repositories/comp_tasks/_core.py | 7 +++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/base_scheduler.py b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/base_scheduler.py index 5d248a26443..06c9a4393c0 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/base_scheduler.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/base_scheduler.py @@ -691,7 +691,9 @@ async def _schedule_tasks_to_stop( ) -> None: # get any running task and stop them comp_tasks_repo = CompTasksRepository.instance(self.db_engine) - await comp_tasks_repo.mark_project_published_tasks_as_aborted(project_id) + await comp_tasks_repo.mark_project_published_waiting_for_cluster_tasks_as_aborted( + project_id + ) # stop any remaining running task, these are already submitted tasks_to_stop = [t for t in comp_tasks.values() if t.state in PROCESSING_STATES] await self._stop_tasks(user_id, tasks_to_stop, pipeline_params) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_tasks/_core.py b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_tasks/_core.py index 080bd61bc85..2845dcee8b6 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_tasks/_core.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_tasks/_core.py @@ -191,7 +191,7 @@ async def _update_task( assert row # nosec return CompTaskAtDB.from_orm(row) - async def mark_project_published_tasks_as_aborted( + async def mark_project_published_waiting_for_cluster_tasks_as_aborted( self, project_id: ProjectID ) -> None: # block all pending tasks, so the sidecars stop taking them @@ -201,7 +201,10 @@ async def mark_project_published_tasks_as_aborted( .where( (comp_tasks.c.project_id == f"{project_id}") & (comp_tasks.c.node_class == NodeClass.COMPUTATIONAL) - & (comp_tasks.c.state == StateType.PUBLISHED) + & ( + (comp_tasks.c.state == StateType.PUBLISHED) + | (comp_tasks.c.state == StateType.WAITING_FOR_CLUSTER) + ) ) .values( state=StateType.ABORTED, progress=1.0, end=arrow.utcnow().datetime From 5a6117e02897daa62a5822db1a438b00d960b5b7 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Fri, 19 Jan 2024 15:32:32 +0100 Subject: [PATCH 12/17] linter --- .../modules/comp_scheduler/base_scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/base_scheduler.py b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/base_scheduler.py index 06c9a4393c0..f42d3f722f8 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/base_scheduler.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/base_scheduler.py @@ -817,7 +817,7 @@ async def _schedule_tasks_to_start( optional_stopped=arrow.utcnow().datetime, ) comp_tasks[NodeIDStr(f"{exc.node_id}")].state = RunningState.FAILED - except Exception: + except Exception: # pylint: disable=broad-exception-caught _logger.exception( "Unexpected error for %s with %s on %s happened when scheduling %s:", f"{user_id=}", From 7dd12c615a522b9b93270f42f4c6bb4198ddb097 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Fri, 19 Jan 2024 15:34:17 +0100 Subject: [PATCH 13/17] mypy --- .../src/simcore_service_director_v2/modules/dask_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dask_client.py b/services/director-v2/src/simcore_service_director_v2/modules/dask_client.py index 2d955fedf61..1908fe9aa46 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dask_client.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dask_client.py @@ -420,7 +420,7 @@ async def send_computation_tasks( ) except (NodeportsException, ValidationError, ClientResponseError) as exc: raise TaskSchedulingError( - project_id=project_id, node_id=node_id, msg={exc} + project_id=project_id, node_id=node_id, msg=f"{exc}" ) from exc return list_of_node_id_to_job_id From 240219e9ae0dcdb652ccf9bfbfa0f9174d46eb18 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Fri, 19 Jan 2024 15:46:53 +0100 Subject: [PATCH 14/17] fix test --- .../modules/comp_scheduler/base_scheduler.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/base_scheduler.py b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/base_scheduler.py index f42d3f722f8..42beb5b6ef5 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/base_scheduler.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/base_scheduler.py @@ -662,7 +662,7 @@ async def _schedule_pipeline( user_id, project_id, iteration, RunningState.ABORTED ) self.scheduled_pipelines.pop((user_id, project_id, iteration), None) - except DaskClientAcquisisitonError: + except (DaskClientAcquisisitonError, ClustersKeeperNotAvailableError): _logger.exception( "Unexpected error while connecting with computational backend, aborting pipeline" ) @@ -800,6 +800,7 @@ async def _schedule_tasks_to_start( ) for task in tasks_ready_to_start: comp_tasks[NodeIDStr(f"{task}")].state = RunningState.FAILED + raise except TaskSchedulingError as exc: _logger.exception( "Project '%s''s task '%s' could not be scheduled", @@ -817,7 +818,7 @@ async def _schedule_tasks_to_start( optional_stopped=arrow.utcnow().datetime, ) comp_tasks[NodeIDStr(f"{exc.node_id}")].state = RunningState.FAILED - except Exception: # pylint: disable=broad-exception-caught + except Exception: _logger.exception( "Unexpected error for %s with %s on %s happened when scheduling %s:", f"{user_id=}", @@ -836,6 +837,7 @@ async def _schedule_tasks_to_start( ) for task in tasks_ready_to_start: comp_tasks[NodeIDStr(f"{task}")].state = RunningState.FAILED + raise return comp_tasks From e0ef4a42851359f47cff78057802372a07d60c3a Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Fri, 19 Jan 2024 15:49:33 +0100 Subject: [PATCH 15/17] ruff --- .../modules/comp_scheduler/base_scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/base_scheduler.py b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/base_scheduler.py index 42beb5b6ef5..bff129e75d7 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/base_scheduler.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/base_scheduler.py @@ -698,7 +698,7 @@ async def _schedule_tasks_to_stop( tasks_to_stop = [t for t in comp_tasks.values() if t.state in PROCESSING_STATES] await self._stop_tasks(user_id, tasks_to_stop, pipeline_params) - async def _schedule_tasks_to_start( + async def _schedule_tasks_to_start( # noqa: C901 self, user_id: UserID, project_id: ProjectID, From 876b1030010fa13b00cf8992d2a1b0f0b98d38f4 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Fri, 19 Jan 2024 15:50:02 +0100 Subject: [PATCH 16/17] remove doc --- .../modules/comp_scheduler/dask_scheduler.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/dask_scheduler.py b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/dask_scheduler.py index ceec877c6f0..e5deb2cfca2 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/dask_scheduler.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/dask_scheduler.py @@ -106,10 +106,6 @@ async def _start_tasks( scheduled_tasks: dict[NodeID, CompTaskAtDB], pipeline_params: ScheduledPipelineParams, ) -> None: - """ - Raises: - - """ # now transfer the pipeline to the dask scheduler async with _cluster_dask_client(user_id, pipeline_params, self) as client: # Change the tasks state to PENDING From fbba568679940550dd9340195e0347252e153dde Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Fri, 19 Jan 2024 15:54:02 +0100 Subject: [PATCH 17/17] optimize --- .../modules/comp_scheduler/dask_scheduler.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/dask_scheduler.py b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/dask_scheduler.py index e5deb2cfca2..3890ee1f7ad 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/dask_scheduler.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/dask_scheduler.py @@ -132,11 +132,15 @@ async def _start_tasks( ) # update the database so we do have the correct job_ids there - for task_sents in results: - for task in task_sents: - await comp_tasks_repo.update_project_task_job_id( + await asyncio.gather( + *( + comp_tasks_repo.update_project_task_job_id( project_id, task.node_id, task.job_id ) + for task_sents in results + for task in task_sents + ) + ) async def _get_tasks_status( self,