From 62b2275cc1505b8034c13d33eca25202d019dd59 Mon Sep 17 00:00:00 2001 From: shadeofblue Date: Wed, 19 Oct 2022 10:23:50 +0200 Subject: [PATCH 1/6] + ServiceRunner's health check task --- yapapi/services/service_runner.py | 66 +++++++++++++++++++++++++++---- 1 file changed, 58 insertions(+), 8 deletions(-) diff --git a/yapapi/services/service_runner.py b/yapapi/services/service_runner.py index 488fb85c0..62f91c998 100644 --- a/yapapi/services/service_runner.py +++ b/yapapi/services/service_runner.py @@ -14,12 +14,16 @@ Type, Union, ) +from typing_extensions import Final + logger = logging.getLogger(__name__) if TYPE_CHECKING: from yapapi.engine import Job +from ya_activity.exceptions import ApiException + from yapapi import events from yapapi.ctx import WorkContext from yapapi.network import Network @@ -35,6 +39,15 @@ Tuple[None, None, None], ] +DEFAULT_HEALTH_CHECK_INTERVAL: Final[float] = 10.0 +DEFAULT_HEALTH_CHECK_RETRIES: Final[int] = 3 + + +class ServiceRunnerError(Exception): + """An error while running a Service.""" + + pass + class ControlSignal(enum.Enum): """Control signal, used to request an instance's state change from the controlling SeviceRunner.""" @@ -43,11 +56,18 @@ class ControlSignal(enum.Enum): class ServiceRunner(AsyncContextManager): - def __init__(self, job: "Job"): + def __init__( + self, + job: "Job", + health_check_interval: float = DEFAULT_HEALTH_CHECK_INTERVAL, + health_check_retries: int = DEFAULT_HEALTH_CHECK_RETRIES, + ): self._job = job self._instances: List[Service] = [] self._instance_tasks: List[asyncio.Task] = [] self._stopped = False + self._health_check_interval = health_check_interval + self._health_check_retries = health_check_retries @property def id(self) -> str: @@ -203,6 +223,26 @@ def _change_state( return instance.state != prev_state + async def _ensure_alive(self, service: Service): + retries_left = self._health_check_retries + exc = None + while True: + if service.is_available and service._ctx: + try: + await service._ctx.get_raw_state() + retries_left = self._health_check_retries + except ApiException as e: + retries_left -= 1 + exc = e + logger.warning("Service health check failed, retries left: %s", retries_left) + if retries_left <= 0: + raise ServiceRunnerError( + "Service health check failed after %s retries with %s", + self._health_check_retries, + exc, + ) + await asyncio.sleep(self._health_check_interval) + async def _run_instance(self, instance: ServiceInstance): loop = asyncio.get_event_loop() @@ -211,6 +251,7 @@ async def _run_instance(self, instance: ServiceInstance): handler = None batch_task: Optional[asyncio.Task] = None signal_task: Optional[asyncio.Task] = None + health_check_task: Optional[asyncio.Task] = None def update_handler(instance: ServiceInstance): nonlocal handler @@ -252,9 +293,12 @@ def change_state(event: Union[ControlSignal, ExcInfo] = (None, None, None)) -> N batch_task = loop.create_task(handler.__anext__()) if signal_task is None: signal_task = loop.create_task(instance.control_queue.get()) + if health_check_task is None: + health_check_task = loop.create_task(self._ensure_alive(instance.service)) done, _ = await asyncio.wait( - (batch_task, signal_task), return_when=asyncio.FIRST_COMPLETED + (batch_task, signal_task, health_check_task), + return_when=asyncio.FIRST_COMPLETED, ) if batch_task in done: @@ -302,15 +346,21 @@ def change_state(event: Union[ControlSignal, ExcInfo] = (None, None, None)) -> N change_state(ctl) signal_task = None + if health_check_task in done: + try: + health_check_task.result() + except ServiceRunnerError as exception: + logger.info("Health check task aborted: %s", exception) + change_state(sys.exc_info()) + health_check_task = None + logger.debug("No handler for %s in state %s", instance.service, instance.state.value) try: - if batch_task: - batch_task.cancel() - await batch_task - if signal_task: - signal_task.cancel() - await signal_task + for t in [batch_task, signal_task, health_check_task]: + if t is not None: + t.cancel() + await t except asyncio.CancelledError: pass From 6ce596ddfb6195b71782cebbf248287597d2a40f Mon Sep 17 00:00:00 2001 From: shadeofblue Date: Wed, 2 Nov 2022 07:50:50 +0100 Subject: [PATCH 2/6] + service runner `ensure_alive` test --- tests/services/test_service_runner.py | 86 +++++++++++++++++++++++++++ yapapi/services/service_runner.py | 15 ++++- 2 files changed, 100 insertions(+), 1 deletion(-) create mode 100644 tests/services/test_service_runner.py diff --git a/tests/services/test_service_runner.py b/tests/services/test_service_runner.py new file mode 100644 index 000000000..949a4017a --- /dev/null +++ b/tests/services/test_service_runner.py @@ -0,0 +1,86 @@ +import asyncio +import pytest +from statemachine import State +import sys +from typing import Optional +from unittest import mock + +from ya_activity.exceptions import ApiException +from yapapi.ctx import WorkContext +from yapapi.services.service import Service, ServiceState +from yapapi.services.service_runner import ServiceRunner, ServiceRunnerError + + +def mock_service(init_state: Optional[State] = None): + service = Service() + if init_state: + service.service_instance.service_state = ServiceState(start_value=init_state.name) + service._ctx = WorkContext(mock.AsyncMock(), mock.Mock(), mock.Mock(), mock.Mock()) + return service + + +@pytest.mark.asyncio +async def test_ensure_alive_no_interval(): + with mock.patch("asyncio.Future", mock.AsyncMock()) as future: + service_runner = ServiceRunner(mock.Mock(), health_check_interval=None) + await service_runner._ensure_alive(mock.AsyncMock()) + + future.assert_awaited() + + +@pytest.mark.skipif(sys.version_info < (3, 8), reason="AsyncMock requires python 3.8+") +@pytest.mark.parametrize( + "service_state, side_effect, num_retries, expected_alive, expected_num_calls", + ( + (ServiceState.pending, None, None, True, 0), + (ServiceState.running, None, None, True, 10), + (ServiceState.running, ApiException(), None, False, 3), + (ServiceState.running, ApiException(), 2, False, 2), + (ServiceState.running, ApiException(), 5, False, 5), + ), +) +@pytest.mark.asyncio +async def test_ensure_alive( + service_state, + side_effect, + num_retries, + expected_alive, + expected_num_calls, +): + service = mock_service(service_state) + with mock.patch( + "yapapi.ctx.WorkContext.get_raw_state", mock.AsyncMock(side_effect=side_effect) + ) as grs_mock: + + service_runner = ServiceRunner( + mock.Mock(), + health_check_interval=0.001, + **({"health_check_retries": num_retries} if num_retries else {}), + ) + + loop = asyncio.get_event_loop() + ensure_alive = loop.create_task(service_runner._ensure_alive(service)) + sentinel = loop.create_task(asyncio.sleep(0.02)) + + done, pending = await asyncio.wait( + ( + ensure_alive, + sentinel, + ), + return_when=asyncio.FIRST_COMPLETED, + ) + + if expected_alive: + assert sentinel in done + assert ensure_alive in pending + else: + assert sentinel in pending + assert ensure_alive in done + + with pytest.raises(ServiceRunnerError): + ensure_alive.result() + + sentinel.cancel() + ensure_alive.cancel() + + assert len(grs_mock.mock_calls) >= expected_num_calls diff --git a/yapapi/services/service_runner.py b/yapapi/services/service_runner.py index 62f91c998..603c5cf08 100644 --- a/yapapi/services/service_runner.py +++ b/yapapi/services/service_runner.py @@ -59,9 +59,17 @@ class ServiceRunner(AsyncContextManager): def __init__( self, job: "Job", - health_check_interval: float = DEFAULT_HEALTH_CHECK_INTERVAL, + health_check_interval: Optional[float] = DEFAULT_HEALTH_CHECK_INTERVAL, health_check_retries: int = DEFAULT_HEALTH_CHECK_RETRIES, ): + """Initialize the ServiceRunner. + + :param job: the engine's :class:`~yapapi.engine.Job` within which the ServiceRunner will run + :param health_check_interval: an interval in seconds between subsequent health checks. + Setting it to `None` turns off the health check. + :param health_check_retries: number of times the health check will be retried before + it's reported as failed + """ self._job = job self._instances: List[Service] = [] self._instance_tasks: List[asyncio.Task] = [] @@ -224,6 +232,11 @@ def _change_state( return instance.state != prev_state async def _ensure_alive(self, service: Service): + # wait indefinitely when the interval is not defined + if self._health_check_interval is None: + await asyncio.Future() + return + retries_left = self._health_check_retries exc = None while True: From 1dcf202e4c1cf5762d53327e624bd4d02eec4776 Mon Sep 17 00:00:00 2001 From: shadeofblue Date: Wed, 2 Nov 2022 10:37:30 +0100 Subject: [PATCH 3/6] isort --- pyproject.toml | 2 +- tests/factories/rest/__init__.py | 1 + tests/factories/rest/market.py | 1 + tests/factories/rest/payment.py | 1 + tests/goth_tests/test_renegotiate_proposal/requestor.py | 2 +- tests/goth_tests/test_resubscription.py | 4 ++-- tests/rest/test_activity.py | 1 + tests/rest/test_repeat_on_error.py | 1 + tests/services/test_service_runner.py | 1 + tests/test_payment_platforms.py | 1 + yapapi/network.py | 1 + yapapi/rest/activity.py | 1 + yapapi/rest/common.py | 1 + yapapi/rest/configuration.py | 1 + yapapi/rest/market.py | 1 + yapapi/rest/net.py | 1 + yapapi/rest/payment.py | 1 + yapapi/services/service_runner.py | 5 ++--- 18 files changed, 20 insertions(+), 7 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 347484694..9cbd32e3d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -76,7 +76,7 @@ pytest-rerunfailures = "^10.1" [tool.isort] profile = "black" force_sort_within_sections = true -known_golem = "goth" +known_golem = "goth, ya_activity, ya_market, ya_net, ya_payment" sections = ["FUTURE","THIRDPARTY","GOLEM", "FIRSTPARTY","LOCALFOLDER"] default_section = "THIRDPARTY" # STDLIB section will be cobined with THIRDPARTY section diff --git a/tests/factories/rest/__init__.py b/tests/factories/rest/__init__.py index 1715cf25b..180e31b83 100644 --- a/tests/factories/rest/__init__.py +++ b/tests/factories/rest/__init__.py @@ -1,5 +1,6 @@ import factory from unittest import mock + from ya_market.api.requestor_api import RequestorApi diff --git a/tests/factories/rest/market.py b/tests/factories/rest/market.py index 457e0cbc1..7790920d0 100644 --- a/tests/factories/rest/market.py +++ b/tests/factories/rest/market.py @@ -1,5 +1,6 @@ import datetime import factory + from ya_market import models as market_models from tests.factories.props.com import ComLinearPropsFactory diff --git a/tests/factories/rest/payment.py b/tests/factories/rest/payment.py index 7c5bd8804..23e61162d 100644 --- a/tests/factories/rest/payment.py +++ b/tests/factories/rest/payment.py @@ -1,6 +1,7 @@ import datetime import factory from unittest import mock + from ya_payment import models as payment_models from ya_payment.api.requestor_api import RequestorApi diff --git a/tests/goth_tests/test_renegotiate_proposal/requestor.py b/tests/goth_tests/test_renegotiate_proposal/requestor.py index 6a9f73745..fcf71c68c 100755 --- a/tests/goth_tests/test_renegotiate_proposal/requestor.py +++ b/tests/goth_tests/test_renegotiate_proposal/requestor.py @@ -2,8 +2,8 @@ import asyncio from asyncio import TimeoutError import datetime -import sys from typing_extensions import Final + import ya_market from examples import utils diff --git a/tests/goth_tests/test_resubscription.py b/tests/goth_tests/test_resubscription.py index 8e3bbe920..6cd4532b9 100644 --- a/tests/goth_tests/test_resubscription.py +++ b/tests/goth_tests/test_resubscription.py @@ -8,8 +8,6 @@ import time from typing import Dict, List, Set, Type from unittest.mock import Mock -from ya_market import ApiException -import ya_market.api.requestor_api from goth.assertions import EventStream from goth.assertions.monitor import EventMonitor @@ -18,6 +16,8 @@ from goth.runner import Runner from goth.runner.log import configure_logging from goth.runner.probe import RequestorProbe +from ya_market import ApiException +import ya_market.api.requestor_api from yapapi import Golem, Task from yapapi.events import Event, JobFinished, JobStarted, SubscriptionCreated diff --git a/tests/rest/test_activity.py b/tests/rest/test_activity.py index 79e90588b..ee6b04bf3 100644 --- a/tests/rest/test_activity.py +++ b/tests/rest/test_activity.py @@ -1,6 +1,7 @@ import pytest from typing import List, Optional, Tuple, Type from unittest.mock import Mock + from ya_activity.exceptions import ApiException from yapapi.rest.activity import BatchError, PollingBatch diff --git a/tests/rest/test_repeat_on_error.py b/tests/rest/test_repeat_on_error.py index e37fc992e..620863b6a 100644 --- a/tests/rest/test_repeat_on_error.py +++ b/tests/rest/test_repeat_on_error.py @@ -1,6 +1,7 @@ import aiohttp import asyncio import pytest + import ya_activity import ya_market import ya_payment diff --git a/tests/services/test_service_runner.py b/tests/services/test_service_runner.py index 949a4017a..0dad19db1 100644 --- a/tests/services/test_service_runner.py +++ b/tests/services/test_service_runner.py @@ -6,6 +6,7 @@ from unittest import mock from ya_activity.exceptions import ApiException + from yapapi.ctx import WorkContext from yapapi.services.service import Service, ServiceState from yapapi.services.service_runner import ServiceRunner, ServiceRunnerError diff --git a/tests/test_payment_platforms.py b/tests/test_payment_platforms.py index afe7ddb58..a9454c7e9 100644 --- a/tests/test_payment_platforms.py +++ b/tests/test_payment_platforms.py @@ -1,6 +1,7 @@ """Unit tests for code that selects payment platforms based on driver/network specification.""" import pytest from unittest import mock + from ya_payment import RequestorApi from yapapi import NoPaymentAccountError diff --git a/yapapi/network.py b/yapapi/network.py index f73118560..c362b1d07 100644 --- a/yapapi/network.py +++ b/yapapi/network.py @@ -12,6 +12,7 @@ from statemachine import State, StateMachine # type: ignore from typing import Dict, Optional, Union from urllib.parse import urlparse + from ya_net.exceptions import ApiException import yapapi diff --git a/yapapi/rest/activity.py b/yapapi/rest/activity.py index 080e77435..f7a57542a 100644 --- a/yapapi/rest/activity.py +++ b/yapapi/rest/activity.py @@ -8,6 +8,7 @@ import logging from typing import Any, AsyncIterator, Dict, List, Optional, Tuple, Type from typing_extensions import AsyncContextManager, AsyncIterable + from ya_activity import ApiClient, ApiException, RequestorControlApi, RequestorStateApi from ya_activity import exceptions as yexc from ya_activity import models as yaa diff --git a/yapapi/rest/common.py b/yapapi/rest/common.py index b3f97e8a7..9bf9752aa 100644 --- a/yapapi/rest/common.py +++ b/yapapi/rest/common.py @@ -3,6 +3,7 @@ import functools import logging from typing import Callable, Optional + import ya_activity import ya_market import ya_payment diff --git a/yapapi/rest/configuration.py b/yapapi/rest/configuration.py index a42349adf..3157d8d36 100644 --- a/yapapi/rest/configuration.py +++ b/yapapi/rest/configuration.py @@ -1,6 +1,7 @@ import os from typing import Optional from typing_extensions import Final + import ya_activity # type: ignore import ya_market # type: ignore import ya_net # type: ignore diff --git a/yapapi/rest/market.py b/yapapi/rest/market.py index 098b41c60..b38f46a81 100644 --- a/yapapi/rest/market.py +++ b/yapapi/rest/market.py @@ -6,6 +6,7 @@ from types import TracebackType from typing import Any, AsyncIterator, Generator, Generic, Optional, Type, TypeVar from typing_extensions import AsyncContextManager, Awaitable + from ya_market import ApiClient, ApiException, RequestorApi, models # type: ignore from ..props import Model, NodeInfo diff --git a/yapapi/rest/net.py b/yapapi/rest/net.py index f76d9ee35..e6e323b86 100644 --- a/yapapi/rest/net.py +++ b/yapapi/rest/net.py @@ -1,4 +1,5 @@ from typing import Optional + from ya_net import ApiClient, RequestorApi from ya_net import models as yan diff --git a/yapapi/rest/payment.py b/yapapi/rest/payment.py index 27ad758f7..86bb31c40 100644 --- a/yapapi/rest/payment.py +++ b/yapapi/rest/payment.py @@ -4,6 +4,7 @@ from decimal import Decimal import logging from typing import AsyncIterator, Iterable, List, Optional, Union, cast + from ya_payment import Account, ApiClient, RequestorApi import ya_payment.models as yap diff --git a/yapapi/services/service_runner.py b/yapapi/services/service_runner.py index 603c5cf08..0d52dc8ff 100644 --- a/yapapi/services/service_runner.py +++ b/yapapi/services/service_runner.py @@ -16,9 +16,6 @@ ) from typing_extensions import Final - -logger = logging.getLogger(__name__) - if TYPE_CHECKING: from yapapi.engine import Job @@ -33,6 +30,8 @@ from .service import Service, ServiceInstance, ServiceType from .service_state import ServiceState +logger = logging.getLogger(__name__) + # Return type for `sys.exc_info()` ExcInfo = Union[ Tuple[Type[BaseException], BaseException, TracebackType], From a4d18240b8e34d111c57434740170013afa42ade Mon Sep 17 00:00:00 2001 From: shadeofblue Date: Wed, 2 Nov 2022 10:45:04 +0100 Subject: [PATCH 4/6] skip an asyncmock test on py3.7 --- tests/services/test_service_runner.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/services/test_service_runner.py b/tests/services/test_service_runner.py index 0dad19db1..57376bb30 100644 --- a/tests/services/test_service_runner.py +++ b/tests/services/test_service_runner.py @@ -20,6 +20,7 @@ def mock_service(init_state: Optional[State] = None): return service +@pytest.mark.skipif(sys.version_info < (3, 8), reason="AsyncMock requires python 3.8+") @pytest.mark.asyncio async def test_ensure_alive_no_interval(): with mock.patch("asyncio.Future", mock.AsyncMock()) as future: From 9afe2f0f207ca5d0fb338439423e7faf928f52ea Mon Sep 17 00:00:00 2001 From: shadeofblue Date: Wed, 2 Nov 2022 11:13:25 +0100 Subject: [PATCH 5/6] - unnecessary assert --- tests/services/test_service_runner.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/services/test_service_runner.py b/tests/services/test_service_runner.py index 57376bb30..fcb7077b8 100644 --- a/tests/services/test_service_runner.py +++ b/tests/services/test_service_runner.py @@ -73,10 +73,8 @@ async def test_ensure_alive( ) if expected_alive: - assert sentinel in done assert ensure_alive in pending else: - assert sentinel in pending assert ensure_alive in done with pytest.raises(ServiceRunnerError): From f6d7a0836eba40b55ca2b6eca5962bd159f3c75e Mon Sep 17 00:00:00 2001 From: shadeofblue Date: Wed, 2 Nov 2022 13:35:48 +0100 Subject: [PATCH 6/6] address review comments --- tests/services/test_service_runner.py | 2 +- yapapi/ctx.py | 2 +- yapapi/services/service.py | 21 +++++++++++++++++++++ yapapi/services/service_runner.py | 14 ++++---------- 4 files changed, 27 insertions(+), 12 deletions(-) diff --git a/tests/services/test_service_runner.py b/tests/services/test_service_runner.py index fcb7077b8..d057dcf57 100644 --- a/tests/services/test_service_runner.py +++ b/tests/services/test_service_runner.py @@ -62,7 +62,7 @@ async def test_ensure_alive( loop = asyncio.get_event_loop() ensure_alive = loop.create_task(service_runner._ensure_alive(service)) - sentinel = loop.create_task(asyncio.sleep(0.02)) + sentinel = loop.create_task(asyncio.sleep(0.1)) done, pending = await asyncio.wait( ( diff --git a/yapapi/ctx.py b/yapapi/ctx.py index e5dade686..ebcecf430 100644 --- a/yapapi/ctx.py +++ b/yapapi/ctx.py @@ -135,7 +135,7 @@ async def get_usage(self) -> "ActivityUsage": return usage async def get_raw_state(self) -> yaa_ActivityState: - """Get the state activity bound to this work context. + """Get the state of the activity bound to this work context. The value comes directly from the low level API and is not interpreted in any way. """ diff --git a/yapapi/services/service.py b/yapapi/services/service.py index 224af2873..e5a7a2fa4 100644 --- a/yapapi/services/service.py +++ b/yapapi/services/service.py @@ -1,5 +1,6 @@ import asyncio from dataclasses import dataclass, field +import logging import statemachine # type: ignore from types import TracebackType from typing import ( @@ -17,6 +18,8 @@ ) import uuid +from ya_activity.exceptions import ApiException + from yapapi import events from yapapi.ctx import WorkContext from yapapi.network import Network, Node @@ -29,6 +32,9 @@ from .cluster import Cluster from .service_runner import ControlSignal + +logger = logging.getLogger(__name__) + # Return type for `sys.exc_info()` ExcInfo = Union[ Tuple[Type[BaseException], BaseException, TracebackType], @@ -395,6 +401,21 @@ def state(self): def service_instance(self): return self.__service_instance + async def is_activity_responsive(self) -> bool: + """Verify if the provider's activity is responsive. + + Tries to get the state activity. Returns True if the activity state could be + queried successfully and false otherwise. + """ + if not self._ctx: + return False + + try: + return bool(await self._ctx.get_raw_state()) + except ApiException as e: + logger.error("Couldn't retrieve the activity state (%s)", e) + return False + ServiceType = TypeVar("ServiceType", bound=Service) diff --git a/yapapi/services/service_runner.py b/yapapi/services/service_runner.py index 0d52dc8ff..d2ddf7b90 100644 --- a/yapapi/services/service_runner.py +++ b/yapapi/services/service_runner.py @@ -19,8 +19,6 @@ if TYPE_CHECKING: from yapapi.engine import Job -from ya_activity.exceptions import ApiException - from yapapi import events from yapapi.ctx import WorkContext from yapapi.network import Network @@ -237,21 +235,17 @@ async def _ensure_alive(self, service: Service): return retries_left = self._health_check_retries - exc = None while True: - if service.is_available and service._ctx: - try: - await service._ctx.get_raw_state() + if service.is_available: + if await service.is_activity_responsive(): retries_left = self._health_check_retries - except ApiException as e: + else: retries_left -= 1 - exc = e logger.warning("Service health check failed, retries left: %s", retries_left) if retries_left <= 0: raise ServiceRunnerError( - "Service health check failed after %s retries with %s", + "Service health check failed after %s retries", self._health_check_retries, - exc, ) await asyncio.sleep(self._health_check_interval)