Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Temporary fix for long running tasks issue in API server using IP addr #5623

Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
2a24a0e
fix
bisgaard-itis Apr 5, 2024
1af4cfc
merge master into 5622-tmp-fix-for-long-running-tasks-api-server
bisgaard-itis Apr 5, 2024
ea95902
tmp_client -> long_running_task_client
bisgaard-itis Apr 5, 2024
94b002d
merge master into 5622-tmp-fix-for-long-running-tasks-api-server
bisgaard-itis Apr 19, 2024
ff46124
merge master into 5622-tp-fix-for-long-running-tasks-api-server
bisgaard-itis Apr 19, 2024
04d569f
use hateos with ip address
bisgaard-itis Apr 19, 2024
4c6da22
fix webserver unit tests
bisgaard-itis Apr 19, 2024
d71e305
minor changes
bisgaard-itis Apr 19, 2024
f6ed219
fix mock
bisgaard-itis Apr 19, 2024
e9a21bf
fix test
bisgaard-itis Apr 22, 2024
c36fbd7
merge master into 5622-tmp-fix-for-long-running-tasks-api-server
bisgaard-itis Apr 22, 2024
be4fcb8
merge master into 5622-tmp-fix-for-long-running-tasks-api-server
bisgaard-itis Apr 22, 2024
1dc365f
@pcrespov impose lifespan on long running tasks client
bisgaard-itis Apr 22, 2024
49405c1
make pylint happy
bisgaard-itis Apr 22, 2024
f95de1f
another attempt at making pylint happy
bisgaard-itis Apr 22, 2024
c043d51
fix minor bug
bisgaard-itis Apr 22, 2024
2853284
@sanderegg HttpUrl -> AnyHttpUrl
bisgaard-itis Apr 22, 2024
8f8cb59
merge master into 5622-tmp-fix-for-long-running-tasks-api-server
bisgaard-itis Apr 23, 2024
b47a2b4
extract Pathname for long running tasks
odeimaiz Apr 23, 2024
21e9214
minor
odeimaiz Apr 23, 2024
40d62d3
minor
odeimaiz Apr 23, 2024
b7ac6f3
Merge pull request #1 from odeimaiz/odeimaiz-5622-tmp-fix-for-long-ru…
bisgaard-itis Apr 23, 2024
f2dc8d7
merge master into 5622-tmp-fix-for-long-running-tasks-api-server
bisgaard-itis Apr 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from typing import Any, AsyncGenerator, Callable

from aiohttp import web
from pydantic import PositiveFloat
from pydantic import AnyHttpUrl, PositiveFloat
from servicelib.aiohttp import status
from servicelib.json_serialization import json_dumps

Expand Down Expand Up @@ -62,10 +62,20 @@ async def start_long_running_task(
task_name=task_name,
**task_kwargs,
)
status_url = request_.app.router["get_task_status"].url_for(task_id=task_id)
result_url = request_.app.router["get_task_result"].url_for(task_id=task_id)
abort_url = request_.app.router["cancel_and_delete_task"].url_for(
task_id=task_id
ip_addr, port = request_.transport.get_extra_info(
"sockname"
) # https://docs.python.org/3/library/asyncio-protocol.html#asyncio.BaseTransport.get_extra_info
status_url = AnyHttpUrl(
url=f"http://{ip_addr}:{port}{request_.app.router['get_task_status'].url_for(task_id=task_id)}",
scheme="http",
)
result_url = AnyHttpUrl(
url=f"http://{ip_addr}:{port}{request_.app.router['get_task_result'].url_for(task_id=task_id)}",
scheme="http",
)
abort_url = AnyHttpUrl(
url=f"http://{ip_addr}:{port}{request_.app.router['cancel_and_delete_task'].url_for(task_id=task_id)}",
scheme="http",
)
task_get = TaskGet(
task_id=task_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,20 +132,18 @@ async def long_running_task_request(
async for task_progress in _wait_for_completion(
session,
task.task_id,
url.with_path(task.status_href, encoded=True),
URL(task.status_href),
client_timeout,
):
last_progress = task_progress
yield LRTask(progress=task_progress)
assert last_progress # nosec
yield LRTask(
progress=last_progress,
_result=_task_result(
session, url.with_path(task.result_href, encoded=True)
),
_result=_task_result(session, URL(task.result_href)),
)

except (asyncio.CancelledError, asyncio.TimeoutError):
if task:
await _abort_task(session, url.with_path(task.abort_href, encoded=True))
await _abort_task(session, URL(task.abort_href))
raise
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ class WebserverApi(BaseServiceClientApi):
"""


class LongRunningTasksClient(BaseServiceClientApi):
"Client for requesting status and results of long running tasks"


@dataclass
class AuthSession:
"""
Expand All @@ -114,6 +118,7 @@ class AuthSession:
"""

_api: WebserverApi
_long_running_task_client: LongRunningTasksClient
vtag: str
session_cookies: dict | None = None

Expand All @@ -128,8 +133,13 @@ def create(
assert api # nosec
assert isinstance(api, WebserverApi) # nosec
api.client.headers = product_header
long_running_tasks_client = LongRunningTasksClient.get_instance(app=app)
assert long_running_tasks_client # nosec
assert isinstance(long_running_tasks_client, LongRunningTasksClient) # nosec
long_running_tasks_client.client.headers = product_header
return cls(
_api=api,
_long_running_task_client=long_running_tasks_client,
vtag=app.state.settings.API_SERVER_WEBSERVER.WEBSERVER_VTAG,
session_cookies=session_cookies,
)
Expand All @@ -140,6 +150,10 @@ def create(
def client(self):
return self._api.client

@property
def long_running_task_client(self):
return self._long_running_task_client.client

async def _page_projects(
self,
*,
Expand Down Expand Up @@ -180,9 +194,8 @@ async def _page_projects(
return Page[ProjectGet].parse_raw(resp.text)

async def _wait_for_long_running_task_results(self, data: TaskGet):
# NOTE: /v0 is already included in the http client base_url
status_url = data.status_href.lstrip(f"/{self.vtag}")
result_url = data.result_href.lstrip(f"/{self.vtag}")
status_url = data.status_href
result_url = data.result_href

# GET task status now until done
async for attempt in AsyncRetrying(
Expand All @@ -192,8 +205,8 @@ async def _wait_for_long_running_task_results(self, data: TaskGet):
before_sleep=before_sleep_log(_logger, logging.INFO),
):
with attempt:
get_response = await self.client.get(
status_url, cookies=self.session_cookies
get_response = await self.long_running_task_client.get(
url=status_url, cookies=self.session_cookies
)
get_response.raise_for_status()
task_status = Envelope[TaskStatus].parse_raw(get_response.text).data
Expand All @@ -202,7 +215,7 @@ async def _wait_for_long_running_task_results(self, data: TaskGet):
msg = "Timed out creating project. TIP: Try again, or contact oSparc support if this is happening repeatedly"
raise TryAgain(msg)

result_response = await self.client.get(
result_response = await self.long_running_task_client.get(
f"{result_url}", cookies=self.session_cookies
)
result_response.raise_for_status()
Expand Down Expand Up @@ -574,6 +587,12 @@ def setup(app: FastAPI, settings: WebServerSettings | None = None) -> None:
api_baseurl=settings.api_base_url,
service_name="webserver",
)
setup_client_instance(
app,
LongRunningTasksClient,
api_baseurl="",
service_name="long_running_tasks_client",
)

def _on_startup() -> None:
# normalize & encrypt
Expand Down
6 changes: 3 additions & 3 deletions services/api-server/tests/mocks/create_study_job.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
"data": {
"task_id": "POST%20%2Fv0%2Fprojects%3Ffrom_study%3Dfa07a3ec-f190-11ee-8b7b-0242ac140026%26hidden%3Dtrue.a02d7664-93cb-4a6f-917c-ac5ff06438fe",
"task_name": "POST /v0/projects?from_study=fa07a3ec-f190-11ee-8b7b-0242ac140026&hidden=true",
"status_href": "/v0/tasks/POST%2520%252Fv0%252Fprojects%253Ffrom_study%253Dfa07a3ec-f190-11ee-8b7b-0242ac140026%2526hidden%253Dtrue.a02d7664-93cb-4a6f-917c-ac5ff06438fe",
"result_href": "/v0/tasks/POST%2520%252Fv0%252Fprojects%253Ffrom_study%253Dfa07a3ec-f190-11ee-8b7b-0242ac140026%2526hidden%253Dtrue.a02d7664-93cb-4a6f-917c-ac5ff06438fe/result",
"abort_href": "/v0/tasks/POST%2520%252Fv0%252Fprojects%253Ffrom_study%253Dfa07a3ec-f190-11ee-8b7b-0242ac140026%2526hidden%253Dtrue.a02d7664-93cb-4a6f-917c-ac5ff06438fe"
"status_href": "http://webserver:8080/v0/tasks/POST%2520%252Fv0%252Fprojects%253Ffrom_study%253Dfa07a3ec-f190-11ee-8b7b-0242ac140026%2526hidden%253Dtrue.a02d7664-93cb-4a6f-917c-ac5ff06438fe",
bisgaard-itis marked this conversation as resolved.
Show resolved Hide resolved
"result_href": "http://webserver:8080/v0/tasks/POST%2520%252Fv0%252Fprojects%253Ffrom_study%253Dfa07a3ec-f190-11ee-8b7b-0242ac140026%2526hidden%253Dtrue.a02d7664-93cb-4a6f-917c-ac5ff06438fe/result",
"abort_href": "http://webserver:8080/v0/tasks/POST%2520%252Fv0%252Fprojects%253Ffrom_study%253Dfa07a3ec-f190-11ee-8b7b-0242ac140026%2526hidden%253Dtrue.a02d7664-93cb-4a6f-917c-ac5ff06438fe"
}
},
"status_code": 202
Expand Down
6 changes: 3 additions & 3 deletions services/api-server/tests/mocks/run_study_workflow.json
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,9 @@
"data": {
"task_id": "POST%20%2Fv0%2Fprojects%3Ffrom_study%3Daeab71fe-f71b-11ee-8fca-0242ac140008%26hidden%3Dtrue.194a6216-eb67-4769-8e45-ec19f7076b24",
"task_name": "POST /v0/projects?from_study=aeab71fe-f71b-11ee-8fca-0242ac140008&hidden=true",
"status_href": "/v0/tasks/POST%2520%252Fv0%252Fprojects%253Ffrom_study%253Daeab71fe-f71b-11ee-8fca-0242ac140008%2526hidden%253Dtrue.194a6216-eb67-4769-8e45-ec19f7076b24",
"result_href": "/v0/tasks/POST%2520%252Fv0%252Fprojects%253Ffrom_study%253Daeab71fe-f71b-11ee-8fca-0242ac140008%2526hidden%253Dtrue.194a6216-eb67-4769-8e45-ec19f7076b24/result",
"abort_href": "/v0/tasks/POST%2520%252Fv0%252Fprojects%253Ffrom_study%253Daeab71fe-f71b-11ee-8fca-0242ac140008%2526hidden%253Dtrue.194a6216-eb67-4769-8e45-ec19f7076b24"
"status_href": "http://webserver:8080/v0/tasks/POST%2520%252Fv0%252Fprojects%253Ffrom_study%253Daeab71fe-f71b-11ee-8fca-0242ac140008%2526hidden%253Dtrue.194a6216-eb67-4769-8e45-ec19f7076b24",
"result_href": "http://webserver:8080/v0/tasks/POST%2520%252Fv0%252Fprojects%253Ffrom_study%253Daeab71fe-f71b-11ee-8fca-0242ac140008%2526hidden%253Dtrue.194a6216-eb67-4769-8e45-ec19f7076b24/result",
"abort_href": "http://webserver:8080/v0/tasks/POST%2520%252Fv0%252Fprojects%253Ffrom_study%253Daeab71fe-f71b-11ee-8fca-0242ac140008%2526hidden%253Dtrue.194a6216-eb67-4769-8e45-ec19f7076b24"
}
},
"status_code": 202
Expand Down
5 changes: 3 additions & 2 deletions services/web/server/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from copy import deepcopy
from http import HTTPStatus
from pathlib import Path
from urllib.parse import urlparse

import pytest
import simcore_service_webserver
Expand Down Expand Up @@ -284,7 +285,7 @@ async def _creator(
print(
f"--> waiting for creation {attempt.retry_state.attempt_number}..."
)
result = await client.get(f"{status_url}")
result = await client.get(urlparse(status_url).path)
data, error = await assert_status(result, status.HTTP_200_OK)
assert data
assert not error
Expand All @@ -298,7 +299,7 @@ async def _creator(

# get result GET /{task_id}/result
print("--> getting project creation result...")
result = await client.get(f"{result_url}")
result = await client.get(urlparse(result_url).path)
data, error = await assert_status(result, expected_creation_response)
if error:
assert not data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import asyncio
from typing import Any, Awaitable, Callable
from urllib.parse import urlparse

import pytest
from aiohttp.test_utils import TestClient
Expand Down Expand Up @@ -117,7 +118,7 @@ async def test_copying_large_project_and_aborting_correctly_removes_new_project(
assert len(data) == 1, "there are too many projects in the db!"

# now abort the copy
resp = await client.delete(f"{abort_url}")
resp = await client.delete(urlparse(abort_url).path)
await assert_status(resp, expected.no_content)
# wait to check that the call to storage is "done"
async for attempt in AsyncRetrying(
Expand Down Expand Up @@ -163,7 +164,7 @@ async def test_copying_large_project_and_retrieving_copy_task(
# let the copy start
await asyncio.sleep(2)
# now abort the copy
resp = await client.delete(f"{created_copy_task.abort_href}")
resp = await client.delete(urlparse(created_copy_task.abort_href).path)
await assert_status(resp, expected.no_content)
# wait to check that the call to storage is "done"
async for attempt in AsyncRetrying(
Expand Down
Loading