Skip to content

Commit

Permalink
♻️ webserver: fixes mypy issues in director_v2 plugin (#4186)
Browse files Browse the repository at this point in the history
  • Loading branch information
matusdrobuliak66 authored May 8, 2023
1 parent fe4dee6 commit 386ef67
Show file tree
Hide file tree
Showing 47 changed files with 301 additions and 313 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from .db import setup_db
from .diagnostics.plugin import setup_diagnostics
from .director.plugin import setup_director
from .director_v2 import setup_director_v2
from .director_v2.plugin import setup_director_v2
from .email import setup_email
from .exporter.plugin import setup_exporter
from .garbage_collector import setup_garbage_collector
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from .catalog_settings import CatalogSettings
from .diagnostics.settings import DiagnosticsSettings
from .director.settings import DirectorSettings
from .director_v2_settings import DirectorV2Settings
from .director_v2.settings import DirectorV2Settings
from .exporter.settings import ExporterSettings
from .garbage_collector_settings import GarbageCollectorSettings
from .invitations.settings import InvitationsSettings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@
from servicelib.json_serialization import json_dumps
from servicelib.request_keys import RQT_USERID_KEY

from .. import director_v2_api
from .._meta import api_version_prefix
from ..director_v2_exceptions import (
from ..director_v2 import api as director_v2_api
from ..director_v2._models import ClusterCreate, ClusterPatch, ClusterPing
from ..director_v2.exceptions import (
ClusterAccessForbidden,
ClusterNotFoundError,
ClusterPingError,
DirectorServiceError,
)
from ..director_v2_models import ClusterCreate, ClusterPatch, ClusterPing
from ..login.decorators import login_required
from ..security.decorators import permission_required

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
from aiohttp import web
from servicelib.aiohttp.application_setup import ModuleCategory, app_module_setup

from .. import director_v2
from .._constants import APP_SETTINGS_KEY
from ..director_v2 import plugin as director_v2
from . import handlers

log = logging.getLogger(__name__)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@
from servicelib.aiohttp.client_session import get_client_session
from servicelib.utils import logged_gather

from .. import catalog_client, db, director_v2_api, storage_api
from .. import catalog_client, db, storage_api
from .._meta import API_VERSION, APP_NAME, api_version_prefix
from ..director_v2 import api as director_v2_api
from ..login.decorators import login_required
from ..security.decorators import permission_required
from ..utils import get_task_info, get_tracemalloc_info
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from abc import ABC, abstractmethod
from typing import List, Optional, Tuple

from aiohttp import web
from models_library.projects import ProjectID
Expand Down Expand Up @@ -34,21 +33,22 @@ async def get_runnable_projects_ids(
self,
request: web.Request,
project_uuid: ProjectID,
) -> List[ProjectID]:
) -> list[ProjectID]:
...

@abstractmethod
async def get_or_create_runnable_projects(
self,
request: web.Request,
project_uuid: ProjectID,
) -> Tuple[List[ProjectID], List[CommitID]]:
) -> tuple[list[ProjectID], list[CommitID]]:

...


def get_project_run_policy(app: web.Application) -> Optional[AbstractProjectRunPolicy]:
return app.get(_APP_PROJECT_RUN_POLICY_KEY)
def get_project_run_policy(app: web.Application) -> AbstractProjectRunPolicy | None:
app_: AbstractProjectRunPolicy | None = app.get(_APP_PROJECT_RUN_POLICY_KEY)
return app_


def set_project_run_policy(app: web.Application, policy_obj: AbstractProjectRunPolicy):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,23 @@

import asyncio
import logging
from typing import Any, Optional, Union
from typing import Any, Union

import aiohttp
from aiohttp import ClientTimeout, web
from tenacity._asyncio import AsyncRetrying
from aiohttp import ClientSession, ClientTimeout, web
from tenacity import retry
from tenacity.before_sleep import before_sleep_log
from tenacity.stop import stop_after_attempt
from tenacity.wait import wait_random
from yarl import URL

from .director_v2_exceptions import DirectorServiceError
from .director_v2_settings import get_client_session
from .exceptions import DirectorServiceError
from .settings import get_client_session

log = logging.getLogger(__name__)


SERVICE_HEALTH_CHECK_TIMEOUT = ClientTimeout(total=2, connect=1) # type:ignore
SERVICE_HEALTH_CHECK_TIMEOUT = ClientTimeout(total=2, connect=1)

DEFAULT_RETRY_POLICY = dict(
wait=wait_random(0, 1),
Expand All @@ -53,46 +53,57 @@ def _get_exception_from(
return DirectorServiceError(status=status_code, reason=reason, url=url)


@retry(**DEFAULT_RETRY_POLICY)
async def _make_request(
session: ClientSession,
method: str,
headers: dict[str, str] | None,
data: Any | None,
expected_status: type[web.HTTPSuccessful],
on_error: _StatusToExceptionMapping | None,
url: URL,
**kwargs,
) -> DataBody | str:
async with session.request(
method, url, headers=headers, json=data, **kwargs
) as response:
payload: dict[str, Any] | list[dict[str, Any]] | None | str = (
await response.json()
if response.content_type == "application/json"
else await response.text()
)

if response.status != expected_status.status_code:
raise _get_exception_from(
response.status, on_error, reason=f"{payload}", url=url
)
return payload


async def request_director_v2(
app: web.Application,
method: str,
url: URL,
*,
expected_status: type[web.HTTPSuccessful] = web.HTTPOk,
headers: Optional[dict[str, str]] = None,
data: Optional[Any] = None,
on_error: Optional[_StatusToExceptionMapping] = None,
headers: dict[str, str] | None = None,
data: Any | None = None,
on_error: _StatusToExceptionMapping | None = None,
**kwargs,
) -> Union[DataBody, str]:
) -> DataBody | str:
"""
helper to make requests to director-v2 API
SEE OAS in services/director-v2/openapi.json
"""
# TODO: deprecate!
session = get_client_session(app)
on_error = on_error or {}

try:
async for attempt in AsyncRetrying(**DEFAULT_RETRY_POLICY):
with attempt:

async with session.request(
method, url, headers=headers, json=data, **kwargs
) as response:
payload = (
await response.json()
if response.content_type == "application/json"
else await response.text()
# FIXME: text should never happen ... perhaps unhandled exception!
)

if response.status != expected_status.status_code:
raise _get_exception_from(
response.status, on_error, reason=f"{payload}", url=url
)
return payload

# TODO: enrich with https://docs.aiohttp.org/en/stable/client_reference.html#hierarchy-of-exceptions
payload: DataBody | str = await _make_request(
session, method, headers, data, expected_status, on_error, url, **kwargs
)
return payload

except asyncio.TimeoutError as err:
raise DirectorServiceError(
status=web.HTTPServiceUnavailable.status_code,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import json
import logging
from typing import Any, Optional
from typing import Any
from uuid import UUID

from aiohttp import web
Expand All @@ -19,16 +19,16 @@
from servicelib.logging_utils import log_decorator
from settings_library.utils_cli import create_json_encoder_wo_secrets

from .director_v2_core_base import DataType, request_director_v2
from .director_v2_exceptions import (
from ._core_base import DataType, request_director_v2
from ._models import ClusterCreate, ClusterPatch, ClusterPing
from .exceptions import (
ClusterAccessForbidden,
ClusterDefinedPingError,
ClusterNotFoundError,
ClusterPingError,
DirectorServiceError,
)
from .director_v2_models import ClusterCreate, ClusterPatch, ClusterPing
from .director_v2_settings import DirectorV2Settings, get_plugin_settings
from .settings import DirectorV2Settings, get_plugin_settings

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -66,7 +66,8 @@ async def start(
},
)
assert isinstance(computation_task_out, dict) # nosec
return computation_task_out["id"]
computation_task_out_id: str = computation_task_out["id"]
return computation_task_out_id

async def stop(self, project_id: ProjectID, user_id: UserID):
await request_director_v2(
Expand All @@ -81,8 +82,9 @@ async def stop(self, project_id: ProjectID, user_id: UserID):
_APP_KEY = f"{__name__}.{ComputationsApi.__name__}"


def get_client(app: web.Application) -> Optional[ComputationsApi]:
return app.get(_APP_KEY)
def get_client(app: web.Application) -> ComputationsApi | None:
app_key: ComputationsApi | None = app.get(_APP_KEY)
return app_key


def set_client(app: web.Application, obj: ComputationsApi):
Expand All @@ -98,7 +100,7 @@ def set_client(app: web.Application, obj: ComputationsApi):
@log_decorator(logger=log)
async def create_or_update_pipeline(
app: web.Application, user_id: UserID, project_id: ProjectID, product_name: str
) -> Optional[DataType]:
) -> DataType | None:
settings: DirectorV2Settings = get_plugin_settings(app)

backend_url = settings.base_url / "computations"
Expand All @@ -117,13 +119,14 @@ async def create_or_update_pipeline(

except DirectorServiceError as exc:
log.error("could not create pipeline from project %s: %s", project_id, exc)
return None


@log_decorator(logger=log)
async def is_pipeline_running(
app: web.Application, user_id: PositiveInt, project_id: UUID
) -> Optional[bool]:
# TODO: make it cheaper by /computations/{project_id}/state. First trial shows
) -> bool | None:
# NOTE: possiblity to make it cheaper by /computations/{project_id}/state. First trial shows
# that the efficiency gain is minimal but should be considered specially if the handler
# gets heavier with time
pipeline = await get_computation_task(app, user_id, project_id)
Expand All @@ -135,13 +138,14 @@ async def is_pipeline_running(
# if statement casts to False
return None

return pipeline.state.is_running()
pipeline_state: bool | None = pipeline.state.is_running()
return pipeline_state


@log_decorator(logger=log)
async def get_computation_task(
app: web.Application, user_id: UserID, project_id: ProjectID
) -> Optional[ComputationTask]:
) -> ComputationTask | None:
settings: DirectorV2Settings = get_plugin_settings(app)
backend_url = (settings.base_url / f"computations/{project_id}").update_query(
user_id=int(user_id)
Expand All @@ -158,10 +162,11 @@ async def get_computation_task(
except DirectorServiceError as exc:
if exc.status == web.HTTPNotFound.status_code:
# the pipeline might not exist and that is ok
return
return None
log.warning(
"getting pipeline for project %s failed: %s.", f"{project_id=}", exc
)
return None


@log_decorator(logger=log)
Expand Down
Loading

0 comments on commit 386ef67

Please sign in to comment.