Skip to content

Commit

Permalink
✨ Long running tasks: allow retrieving tasks by user (#3268)
Browse files Browse the repository at this point in the history
  • Loading branch information
sanderegg authored Aug 24, 2022
1 parent 38b6073 commit 72ac74d
Show file tree
Hide file tree
Showing 28 changed files with 1,415 additions and 706 deletions.
15 changes: 15 additions & 0 deletions api/specs/webserver/openapi-tasks.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,19 @@
paths:
/tasks:
get:
operationId: list_tasks
tags:
- tasks
responses:
"200":
description: Returns the list of active tasks (running and/or done)
content:
application/json:
schema:
type: array
items:
$ref: "./components/schemas/task.yaml#/TaskEnveloped"

/tasks/{task_id}:
parameters:
- name: task_id
Expand Down
4 changes: 2 additions & 2 deletions api/specs/webserver/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -326,8 +326,8 @@ paths:
$ref: "./openapi-clusters.yaml#/paths/director_v2_clusters_cluster_id_details"

# TASKS --------------------------------------------------------------------------
# /tasks:
# $ref: "./openapi-tasks.yaml#/paths/~1tasks"
/tasks:
$ref: "./openapi-tasks.yaml#/paths/~1tasks"

/tasks/{task_id}:
$ref: "./openapi-tasks.yaml#/paths/~1tasks~1{task_id}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,9 @@
from pydantic import PositiveFloat

MINUTE: Final[PositiveFloat] = 60
APP_LONG_RUNNING_TASKS_MANAGER_KEY: Final[str] = f"{__name__ }.long_running_tasks"
APP_LONG_RUNNING_TASKS_MANAGER_KEY: Final[
str
] = f"{__name__ }.long_running_tasks.tasks_manager"
RQT_LONG_RUNNING_TASKS_CONTEXT_KEY: Final[
str
] = f"{__name__}.long_running_tasks.context"
Original file line number Diff line number Diff line change
@@ -1,8 +1,21 @@
from typing import Any

from aiohttp import web

from ...long_running_tasks._task import TasksManager
from ._constants import APP_LONG_RUNNING_TASKS_MANAGER_KEY
from ._constants import (
APP_LONG_RUNNING_TASKS_MANAGER_KEY,
RQT_LONG_RUNNING_TASKS_CONTEXT_KEY,
)


def get_tasks_manager(app: web.Application) -> TasksManager:
return app[APP_LONG_RUNNING_TASKS_MANAGER_KEY]


def get_task_context(request: web.Request) -> dict[str, Any]:
return request[RQT_LONG_RUNNING_TASKS_CONTEXT_KEY]


def create_task_name_from_request(request: web.Request) -> str:
return f"{request.method} {request.rel_url}"
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@

from ...json_serialization import json_dumps
from ...long_running_tasks._errors import TaskNotCompletedError, TaskNotFoundError
from ...long_running_tasks._models import TaskId, TaskStatus
from ...long_running_tasks._models import TaskGet, TaskId, TaskStatus
from ...long_running_tasks._task import TrackedTask
from ...mimetype_constants import MIMETYPE_APPLICATION_JSON
from ._dependencies import get_tasks_manager
from ._dependencies import get_task_context, get_tasks_manager

log = logging.getLogger(__name__)
routes = web.RouteTableDef()
Expand All @@ -18,37 +19,73 @@ class _PathParam(BaseModel):
task_id: TaskId


@routes.get("", name="list_tasks")
async def list_tasks(request: web.Request) -> web.Response:
tasks_manager = get_tasks_manager(request.app)
task_context = get_task_context(request)
tracked_tasks: list[TrackedTask] = tasks_manager.list_tasks(
with_task_context=task_context
)

return web.json_response(
{
"data": [
TaskGet(
task_id=t.task_id,
task_name=t.task_name,
status_href=f"{request.app.router['get_task_status'].url_for(task_id=t.task_id)}",
result_href=f"{request.app.router['get_task_result'].url_for(task_id=t.task_id)}",
abort_href=f"{request.app.router['cancel_and_delete_task'].url_for(task_id=t.task_id)}",
)
for t in tracked_tasks
]
},
dumps=json_dumps,
)


@routes.get("/{task_id}", name="get_task_status")
async def get_task_status(request: web.Request) -> web.Response:
path_params = parse_request_path_parameters_as(_PathParam, request)
tasks_manager = get_tasks_manager(request.app)
task_context = get_task_context(request)

task_status: TaskStatus = tasks_manager.get_task_status(task_id=path_params.task_id)
task_status: TaskStatus = tasks_manager.get_task_status(
task_id=path_params.task_id, with_task_context=task_context
)
return web.json_response({"data": task_status}, dumps=json_dumps)


@routes.get("/{task_id}/result", name="get_task_result")
async def get_task_result(request: web.Request) -> web.Response:
path_params = parse_request_path_parameters_as(_PathParam, request)
tasks_manager = get_tasks_manager(request.app)
task_context = get_task_context(request)

# NOTE: this might raise an exception that will be catched by the _error_handlers
try:
task_result = tasks_manager.get_task_result(task_id=path_params.task_id)
task_result = tasks_manager.get_task_result(
task_id=path_params.task_id, with_task_context=task_context
)
# NOTE: this will fail if the task failed for some reason....
await tasks_manager.remove_task(path_params.task_id, reraise_errors=False)
await tasks_manager.remove_task(
path_params.task_id, with_task_context=task_context, reraise_errors=False
)
return task_result
except (TaskNotFoundError, TaskNotCompletedError):
raise
except Exception:
# the task shall be removed in this case
await tasks_manager.remove_task(path_params.task_id, reraise_errors=False)
await tasks_manager.remove_task(
path_params.task_id, with_task_context=task_context, reraise_errors=False
)
raise


@routes.delete("/{task_id}", name="cancel_and_delete_task")
async def cancel_and_delete_task(request: web.Request) -> web.Response:
path_params = parse_request_path_parameters_as(_PathParam, request)
tasks_manager = get_tasks_manager(request.app)
await tasks_manager.remove_task(path_params.task_id)
task_context = get_task_context(request)
await tasks_manager.remove_task(path_params.task_id, with_task_context=task_context)
raise web.HTTPNoContent(content_type=MIMETYPE_APPLICATION_JSON)
Original file line number Diff line number Diff line change
@@ -1,27 +1,48 @@
import logging
from typing import AsyncGenerator
from functools import wraps
from typing import AsyncGenerator, Callable

from aiohttp import web
from pydantic import PositiveFloat

from ...long_running_tasks._task import TasksManager
from ._constants import APP_LONG_RUNNING_TASKS_MANAGER_KEY, MINUTE
from ..typing_extension import Handler
from ._constants import (
APP_LONG_RUNNING_TASKS_MANAGER_KEY,
MINUTE,
RQT_LONG_RUNNING_TASKS_CONTEXT_KEY,
)
from ._error_handlers import base_long_running_error_handler
from ._routes import routes

log = logging.getLogger(__name__)


def no_ops_decorator(handler: Handler):
return handler


def no_task_context_decorator(handler: Handler):
@wraps(handler)
async def _wrap(request: web.Request):
request[RQT_LONG_RUNNING_TASKS_CONTEXT_KEY] = {}
return await handler(request)

return _wrap


def setup(
app: web.Application,
*,
router_prefix: str,
handler_check_decorator: Callable = no_ops_decorator,
task_request_context_decorator: Callable = no_task_context_decorator,
stale_task_check_interval_s: PositiveFloat = 1 * MINUTE,
stale_task_detect_timeout_s: PositiveFloat = 5 * MINUTE,
) -> None:
"""
- `router_prefix` APIs are mounted on `/task/...`, this
will change them to be mounted as `{router_prefix}/task/...`
- `router_prefix` APIs are mounted on `/...`, this
will change them to be mounted as `{router_prefix}/...`
- `stale_task_check_interval_s` interval at which the
TaskManager checks for tasks which are no longer being
actively monitored by a client
Expand All @@ -35,10 +56,9 @@ async def on_startup(app: web.Application) -> AsyncGenerator[None, None]:
app.router.add_route(
route.method, # type: ignore
f"{router_prefix}{route.path}", # type: ignore
route.handler, # type: ignore
handler_check_decorator(task_request_context_decorator(route.handler)), # type: ignore
**route.kwargs, # type: ignore
)
# app.router.add_routes(routes)

# add components to state
app[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,27 @@
TaskCancelledError,
TaskId,
TaskProgress,
TaskProtocol,
TasksManager,
TaskStatus,
start_task,
)
from ._dependencies import get_tasks_manager
from ._dependencies import create_task_name_from_request, get_tasks_manager
from ._routes import TaskGet
from ._server import setup

__all__: tuple[str, ...] = (
"create_task_name_from_request",
"get_tasks_manager",
"setup",
"start_task",
"TaskAlreadyRunningError",
"TaskCancelledError",
"TaskId",
"TaskGet",
"TasksManager",
"TaskProgress",
"TaskProtocol",
"TaskStatus",
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def _before_sleep_log(
"""Before call strategy that logs to some logger the attempt."""

def log_it(retry_state: "RetryCallState") -> None:
assert retry_state.outcome # nosec
if retry_state.outcome.failed:
ex = retry_state.outcome.exception()
verb, value = "raised", f"{ex.__class__.__name__}: {ex}"
Expand All @@ -46,6 +47,7 @@ def log_it(retry_state: "RetryCallState") -> None:
verb, value = "returned", retry_state.outcome.result()
local_exc_info = False # exc_info does not apply when no exception

assert retry_state.next_action # nosec
logger.warning(
"Retrying '%s %s %s' in %s seconds as it %s %s. %s",
request_function.__name__,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@
from starlette.requests import Request
from starlette.responses import JSONResponse

from ...long_running_tasks._errors import BaseLongRunningError, TaskNotFoundError
from ...long_running_tasks._errors import (
BaseLongRunningError,
TaskNotCompletedError,
TaskNotFoundError,
)


async def base_long_running_error_handler(
Expand All @@ -12,7 +16,7 @@ async def base_long_running_error_handler(
error_fields = dict(code=exception.code, message=f"{exception}")
status_code = (
status.HTTP_404_NOT_FOUND
if isinstance(exception, TaskNotFoundError)
if isinstance(exception, (TaskNotFoundError, TaskNotCompletedError))
else status.HTTP_400_BAD_REQUEST
)
return JSONResponse(content=jsonable_encoder(error_fields), status_code=status_code)
Original file line number Diff line number Diff line change
@@ -1,14 +1,32 @@
from fastapi import APIRouter, Depends, Request, status

from ...long_running_tasks._errors import TaskNotCompletedError, TaskNotFoundError
from ...long_running_tasks._models import TaskId, TaskResult, TaskStatus
from ...long_running_tasks._models import TaskGet, TaskId, TaskResult, TaskStatus
from ...long_running_tasks._task import TasksManager
from ..requests_decorators import cancel_on_disconnect
from ._dependencies import get_tasks_manager

router = APIRouter(prefix="/task")


@router.get("", response_model=list[TaskGet])
@cancel_on_disconnect
async def list_tasks(
request: Request, tasks_manager: TasksManager = Depends(get_tasks_manager)
) -> list[TaskGet]:
assert request # nosec
return [
TaskGet(
task_id=t.task_id,
task_name=t.task_name,
status_href="",
result_href="",
abort_href="",
)
for t in tasks_manager.list_tasks(with_task_context=None)
]


@router.get(
"/{task_id}",
responses={
Expand All @@ -22,7 +40,7 @@ async def get_task_status(
tasks_manager: TasksManager = Depends(get_tasks_manager),
) -> TaskStatus:
assert request # nosec
return tasks_manager.get_task_status(task_id=task_id)
return tasks_manager.get_task_status(task_id=task_id, with_task_context=None)


@router.get(
Expand All @@ -44,13 +62,17 @@ async def get_task_result(
# TODO: refactor this to use same as in https://github.com/ITISFoundation/osparc-simcore/issues/3265
try:
task_result = tasks_manager.get_task_result_old(task_id=task_id)
await tasks_manager.remove_task(task_id, reraise_errors=False)
await tasks_manager.remove_task(
task_id, with_task_context=None, reraise_errors=False
)
return task_result
except (TaskNotFoundError, TaskNotCompletedError):
raise
except Exception:
# the task shall be removed in this case
await tasks_manager.remove_task(task_id, reraise_errors=False)
await tasks_manager.remove_task(
task_id, with_task_context=None, reraise_errors=False
)
raise


Expand All @@ -70,4 +92,4 @@ async def cancel_and_delete_task(
tasks_manager: TasksManager = Depends(get_tasks_manager),
) -> None:
assert request # nosec
await tasks_manager.remove_task(task_id)
await tasks_manager.remove_task(task_id, with_task_context=None)
Loading

0 comments on commit 72ac74d

Please sign in to comment.