diff --git a/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py b/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py index ac4433ed04a5..abe3452f32ae 100644 --- a/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py +++ b/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py @@ -25,6 +25,7 @@ from dask_task_models_library.container_tasks.errors import TaskCancelledError from dask_task_models_library.container_tasks.events import TaskProgressEvent from dask_task_models_library.container_tasks.io import TaskOutputData +from dask_task_models_library.container_tasks.protocol import TaskOwner from faker import Faker from fastapi.applications import FastAPI from models_library.clusters import DEFAULT_CLUSTER_ID @@ -39,6 +40,7 @@ RabbitResourceTrackingStartedMessage, RabbitResourceTrackingStoppedMessage, ) +from models_library.users import UserID from pydantic import parse_obj_as, parse_raw_as from pytest_mock.plugin import MockerFixture from pytest_simcore.helpers.typing_env import EnvVarsDict @@ -573,8 +575,25 @@ async def _send_computation_tasks( mocked_dask_client.send_computation_tasks.side_effect = _send_computation_tasks -async def _trigger_progress_event(scheduler: BaseCompScheduler, *, job_id: str) -> None: - event = TaskProgressEvent(job_id=job_id, progress=0) +async def _trigger_progress_event( + scheduler: BaseCompScheduler, + *, + job_id: str, + user_id: UserID, + project_id: ProjectID, + node_id: NodeID, +) -> None: + event = TaskProgressEvent( + job_id=job_id, + progress=0, + task_owner=TaskOwner( + user_id=user_id, + project_id=project_id, + node_id=node_id, + parent_project_id=None, + parent_node_id=None, + ), + ) await cast(DaskScheduler, scheduler)._task_progress_change_handler( # noqa: SLF001 event.json() ) @@ -659,7 +678,16 @@ async def _return_1st_task_running(job_ids: list[str]) -> list[DaskClientTaskSta # 3. the "worker" starts processing a task # here we trigger a progress from the worker assert exp_started_task.job_id - await _trigger_progress_event(scheduler, job_id=exp_started_task.job_id) + assert exp_started_task.project_id + assert exp_started_task.node_id + assert published_project.project.prj_owner + await _trigger_progress_event( + scheduler, + job_id=exp_started_task.job_id, + user_id=published_project.project.prj_owner, + project_id=exp_started_task.project_id, + node_id=exp_started_task.node_id, + ) await run_comp_scheduler(scheduler) # comp_run, the comp_task switch to STARTED @@ -809,7 +837,13 @@ async def _return_2nd_task_running(job_ids: list[str]) -> list[DaskClientTaskSta mocked_dask_client.get_tasks_status.side_effect = _return_2nd_task_running # trigger the scheduler, run state should keep to STARTED, task should be as well assert exp_started_task.job_id - await _trigger_progress_event(scheduler, job_id=exp_started_task.job_id) + await _trigger_progress_event( + scheduler, + job_id=exp_started_task.job_id, + user_id=published_project.project.prj_owner, + project_id=exp_started_task.project_id, + node_id=exp_started_task.node_id, + ) await run_comp_scheduler(scheduler) await _assert_comp_run_db(aiopg_engine, published_project, RunningState.STARTED) await _assert_comp_tasks_db( @@ -956,11 +990,22 @@ async def test_task_progress_triggers( # send some progress started_task = expected_pending_tasks[0] assert started_task.job_id + assert published_project.project.prj_owner for progress in [-1, 0, 0.3, 0.5, 1, 1.5, 0.7, 0, 20]: progress_event = TaskProgressEvent( - job_id=started_task.job_id, progress=progress + job_id=started_task.job_id, + progress=progress, + task_owner=TaskOwner( + user_id=published_project.project.prj_owner, + project_id=published_project.project.uuid, + node_id=started_task.node_id, + parent_node_id=None, + parent_project_id=None, + ), ) - await cast(DaskScheduler, scheduler)._task_progress_change_handler( + await cast( + DaskScheduler, scheduler + )._task_progress_change_handler( # noqa: SLF001 progress_event.json() ) # NOTE: not sure whether it should switch to STARTED.. it would make sense @@ -1252,7 +1297,14 @@ async def _return_1st_task_running(job_ids: list[str]) -> list[DaskClientTaskSta mocked_dask_client.get_tasks_status.side_effect = _return_1st_task_running assert exp_started_task.job_id - await _trigger_progress_event(scheduler, job_id=exp_started_task.job_id) + assert published_project.project.prj_owner + await _trigger_progress_event( + scheduler, + job_id=exp_started_task.job_id, + user_id=published_project.project.prj_owner, + project_id=exp_started_task.project_id, + node_id=exp_started_task.node_id, + ) await run_comp_scheduler(scheduler) messages = await _assert_message_received(