Skip to content

Commit

Permalink
fixing
Browse files Browse the repository at this point in the history
  • Loading branch information
sanderegg committed Nov 21, 2023
1 parent 812e26f commit 18a09c9
Showing 1 changed file with 59 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from dask_task_models_library.container_tasks.errors import TaskCancelledError
from dask_task_models_library.container_tasks.events import TaskProgressEvent
from dask_task_models_library.container_tasks.io import TaskOutputData
from dask_task_models_library.container_tasks.protocol import TaskOwner
from faker import Faker
from fastapi.applications import FastAPI
from models_library.clusters import DEFAULT_CLUSTER_ID
Expand All @@ -39,6 +40,7 @@
RabbitResourceTrackingStartedMessage,
RabbitResourceTrackingStoppedMessage,
)
from models_library.users import UserID
from pydantic import parse_obj_as, parse_raw_as
from pytest_mock.plugin import MockerFixture
from pytest_simcore.helpers.typing_env import EnvVarsDict
Expand Down Expand Up @@ -573,8 +575,25 @@ async def _send_computation_tasks(
mocked_dask_client.send_computation_tasks.side_effect = _send_computation_tasks


async def _trigger_progress_event(scheduler: BaseCompScheduler, *, job_id: str) -> None:
event = TaskProgressEvent(job_id=job_id, progress=0)
async def _trigger_progress_event(
scheduler: BaseCompScheduler,
*,
job_id: str,
user_id: UserID,
project_id: ProjectID,
node_id: NodeID,
) -> None:
event = TaskProgressEvent(
job_id=job_id,
progress=0,
task_owner=TaskOwner(
user_id=user_id,
project_id=project_id,
node_id=node_id,
parent_project_id=None,
parent_node_id=None,
),
)
await cast(DaskScheduler, scheduler)._task_progress_change_handler( # noqa: SLF001
event.json()
)
Expand Down Expand Up @@ -659,7 +678,16 @@ async def _return_1st_task_running(job_ids: list[str]) -> list[DaskClientTaskSta
# 3. the "worker" starts processing a task
# here we trigger a progress from the worker
assert exp_started_task.job_id
await _trigger_progress_event(scheduler, job_id=exp_started_task.job_id)
assert exp_started_task.project_id
assert exp_started_task.node_id
assert published_project.project.prj_owner
await _trigger_progress_event(
scheduler,
job_id=exp_started_task.job_id,
user_id=published_project.project.prj_owner,
project_id=exp_started_task.project_id,
node_id=exp_started_task.node_id,
)

await run_comp_scheduler(scheduler)
# comp_run, the comp_task switch to STARTED
Expand Down Expand Up @@ -809,7 +837,13 @@ async def _return_2nd_task_running(job_ids: list[str]) -> list[DaskClientTaskSta
mocked_dask_client.get_tasks_status.side_effect = _return_2nd_task_running
# trigger the scheduler, run state should keep to STARTED, task should be as well
assert exp_started_task.job_id
await _trigger_progress_event(scheduler, job_id=exp_started_task.job_id)
await _trigger_progress_event(
scheduler,
job_id=exp_started_task.job_id,
user_id=published_project.project.prj_owner,
project_id=exp_started_task.project_id,
node_id=exp_started_task.node_id,
)
await run_comp_scheduler(scheduler)
await _assert_comp_run_db(aiopg_engine, published_project, RunningState.STARTED)
await _assert_comp_tasks_db(
Expand Down Expand Up @@ -956,11 +990,22 @@ async def test_task_progress_triggers(
# send some progress
started_task = expected_pending_tasks[0]
assert started_task.job_id
assert published_project.project.prj_owner
for progress in [-1, 0, 0.3, 0.5, 1, 1.5, 0.7, 0, 20]:
progress_event = TaskProgressEvent(
job_id=started_task.job_id, progress=progress
job_id=started_task.job_id,
progress=progress,
task_owner=TaskOwner(
user_id=published_project.project.prj_owner,
project_id=published_project.project.uuid,
node_id=started_task.node_id,
parent_node_id=None,
parent_project_id=None,
),
)
await cast(DaskScheduler, scheduler)._task_progress_change_handler(
await cast(
DaskScheduler, scheduler
)._task_progress_change_handler( # noqa: SLF001
progress_event.json()
)
# NOTE: not sure whether it should switch to STARTED.. it would make sense
Expand Down Expand Up @@ -1252,7 +1297,14 @@ async def _return_1st_task_running(job_ids: list[str]) -> list[DaskClientTaskSta

mocked_dask_client.get_tasks_status.side_effect = _return_1st_task_running
assert exp_started_task.job_id
await _trigger_progress_event(scheduler, job_id=exp_started_task.job_id)
assert published_project.project.prj_owner
await _trigger_progress_event(
scheduler,
job_id=exp_started_task.job_id,
user_id=published_project.project.prj_owner,
project_id=exp_started_task.project_id,
node_id=exp_started_task.node_id,
)
await run_comp_scheduler(scheduler)

messages = await _assert_message_received(
Expand Down

0 comments on commit 18a09c9

Please sign in to comment.