Skip to content

Commit

Permalink
Merge pull request #1033 from golemfactory/blue/service-healthcheck
Browse files Browse the repository at this point in the history
ServiceRunner's health check task
  • Loading branch information
shadeofblue authored Nov 4, 2022
2 parents 888b25b + f6d7a08 commit 74fe935
Show file tree
Hide file tree
Showing 20 changed files with 191 additions and 15 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ pytest-rerunfailures = "^10.1"
[tool.isort]
profile = "black"
force_sort_within_sections = true
known_golem = "goth"
known_golem = "goth, ya_activity, ya_market, ya_net, ya_payment"
sections = ["FUTURE","THIRDPARTY","GOLEM", "FIRSTPARTY","LOCALFOLDER"]
default_section = "THIRDPARTY" # STDLIB section will be cobined with THIRDPARTY section

Expand Down
1 change: 1 addition & 0 deletions tests/factories/rest/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import factory
from unittest import mock

from ya_market.api.requestor_api import RequestorApi


Expand Down
1 change: 1 addition & 0 deletions tests/factories/rest/market.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import datetime
import factory

from ya_market import models as market_models

from tests.factories.props.com import ComLinearPropsFactory
Expand Down
1 change: 1 addition & 0 deletions tests/factories/rest/payment.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import datetime
import factory
from unittest import mock

from ya_payment import models as payment_models
from ya_payment.api.requestor_api import RequestorApi

Expand Down
2 changes: 1 addition & 1 deletion tests/goth_tests/test_renegotiate_proposal/requestor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
import asyncio
from asyncio import TimeoutError
import datetime
import sys
from typing_extensions import Final

import ya_market

from examples import utils
Expand Down
4 changes: 2 additions & 2 deletions tests/goth_tests/test_resubscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
import time
from typing import Dict, List, Set, Type
from unittest.mock import Mock
from ya_market import ApiException
import ya_market.api.requestor_api

from goth.assertions import EventStream
from goth.assertions.monitor import EventMonitor
Expand All @@ -18,6 +16,8 @@
from goth.runner import Runner
from goth.runner.log import configure_logging
from goth.runner.probe import RequestorProbe
from ya_market import ApiException
import ya_market.api.requestor_api

from yapapi import Golem, Task
from yapapi.events import Event, JobFinished, JobStarted, SubscriptionCreated
Expand Down
1 change: 1 addition & 0 deletions tests/rest/test_activity.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import pytest
from typing import List, Optional, Tuple, Type
from unittest.mock import Mock

from ya_activity.exceptions import ApiException

from yapapi.rest.activity import BatchError, PollingBatch
Expand Down
1 change: 1 addition & 0 deletions tests/rest/test_repeat_on_error.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import aiohttp
import asyncio
import pytest

import ya_activity
import ya_market
import ya_payment
Expand Down
86 changes: 86 additions & 0 deletions tests/services/test_service_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import asyncio
import pytest
from statemachine import State
import sys
from typing import Optional
from unittest import mock

from ya_activity.exceptions import ApiException

from yapapi.ctx import WorkContext
from yapapi.services.service import Service, ServiceState
from yapapi.services.service_runner import ServiceRunner, ServiceRunnerError


def mock_service(init_state: Optional[State] = None):
service = Service()
if init_state:
service.service_instance.service_state = ServiceState(start_value=init_state.name)
service._ctx = WorkContext(mock.AsyncMock(), mock.Mock(), mock.Mock(), mock.Mock())
return service


@pytest.mark.skipif(sys.version_info < (3, 8), reason="AsyncMock requires python 3.8+")
@pytest.mark.asyncio
async def test_ensure_alive_no_interval():
with mock.patch("asyncio.Future", mock.AsyncMock()) as future:
service_runner = ServiceRunner(mock.Mock(), health_check_interval=None)
await service_runner._ensure_alive(mock.AsyncMock())

future.assert_awaited()


@pytest.mark.skipif(sys.version_info < (3, 8), reason="AsyncMock requires python 3.8+")
@pytest.mark.parametrize(
"service_state, side_effect, num_retries, expected_alive, expected_num_calls",
(
(ServiceState.pending, None, None, True, 0),
(ServiceState.running, None, None, True, 10),
(ServiceState.running, ApiException(), None, False, 3),
(ServiceState.running, ApiException(), 2, False, 2),
(ServiceState.running, ApiException(), 5, False, 5),
),
)
@pytest.mark.asyncio
async def test_ensure_alive(
service_state,
side_effect,
num_retries,
expected_alive,
expected_num_calls,
):
service = mock_service(service_state)
with mock.patch(
"yapapi.ctx.WorkContext.get_raw_state", mock.AsyncMock(side_effect=side_effect)
) as grs_mock:

service_runner = ServiceRunner(
mock.Mock(),
health_check_interval=0.001,
**({"health_check_retries": num_retries} if num_retries else {}),
)

loop = asyncio.get_event_loop()
ensure_alive = loop.create_task(service_runner._ensure_alive(service))
sentinel = loop.create_task(asyncio.sleep(0.1))

done, pending = await asyncio.wait(
(
ensure_alive,
sentinel,
),
return_when=asyncio.FIRST_COMPLETED,
)

if expected_alive:
assert ensure_alive in pending
else:
assert ensure_alive in done

with pytest.raises(ServiceRunnerError):
ensure_alive.result()

sentinel.cancel()
ensure_alive.cancel()

assert len(grs_mock.mock_calls) >= expected_num_calls
1 change: 1 addition & 0 deletions tests/test_payment_platforms.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Unit tests for code that selects payment platforms based on driver/network specification."""
import pytest
from unittest import mock

from ya_payment import RequestorApi

from yapapi import NoPaymentAccountError
Expand Down
2 changes: 1 addition & 1 deletion yapapi/ctx.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ async def get_usage(self) -> "ActivityUsage":
return usage

async def get_raw_state(self) -> yaa_ActivityState:
"""Get the state activity bound to this work context.
"""Get the state of the activity bound to this work context.
The value comes directly from the low level API and is not interpreted in any way.
"""
Expand Down
1 change: 1 addition & 0 deletions yapapi/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from statemachine import State, StateMachine # type: ignore
from typing import Dict, Optional, Union
from urllib.parse import urlparse

from ya_net.exceptions import ApiException

import yapapi
Expand Down
1 change: 1 addition & 0 deletions yapapi/rest/activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import logging
from typing import Any, AsyncIterator, Dict, List, Optional, Tuple, Type
from typing_extensions import AsyncContextManager, AsyncIterable

from ya_activity import ApiClient, ApiException, RequestorControlApi, RequestorStateApi
from ya_activity import exceptions as yexc
from ya_activity import models as yaa
Expand Down
1 change: 1 addition & 0 deletions yapapi/rest/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import functools
import logging
from typing import Callable, Optional

import ya_activity
import ya_market
import ya_payment
Expand Down
1 change: 1 addition & 0 deletions yapapi/rest/configuration.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
from typing import Optional
from typing_extensions import Final

import ya_activity # type: ignore
import ya_market # type: ignore
import ya_net # type: ignore
Expand Down
1 change: 1 addition & 0 deletions yapapi/rest/market.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from types import TracebackType
from typing import Any, AsyncIterator, Generator, Generic, Optional, Type, TypeVar
from typing_extensions import AsyncContextManager, Awaitable

from ya_market import ApiClient, ApiException, RequestorApi, models # type: ignore

from ..props import Model, NodeInfo
Expand Down
1 change: 1 addition & 0 deletions yapapi/rest/net.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import Optional

from ya_net import ApiClient, RequestorApi
from ya_net import models as yan

Expand Down
1 change: 1 addition & 0 deletions yapapi/rest/payment.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from decimal import Decimal
import logging
from typing import AsyncIterator, Iterable, List, Optional, Union, cast

from ya_payment import Account, ApiClient, RequestorApi
import ya_payment.models as yap

Expand Down
21 changes: 21 additions & 0 deletions yapapi/services/service.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
from dataclasses import dataclass, field
import logging
import statemachine # type: ignore
from types import TracebackType
from typing import (
Expand All @@ -17,6 +18,8 @@
)
import uuid

from ya_activity.exceptions import ApiException

from yapapi import events
from yapapi.ctx import WorkContext
from yapapi.network import Network, Node
Expand All @@ -29,6 +32,9 @@
from .cluster import Cluster
from .service_runner import ControlSignal


logger = logging.getLogger(__name__)

# Return type for `sys.exc_info()`
ExcInfo = Union[
Tuple[Type[BaseException], BaseException, TracebackType],
Expand Down Expand Up @@ -395,6 +401,21 @@ def state(self):
def service_instance(self):
return self.__service_instance

async def is_activity_responsive(self) -> bool:
"""Verify if the provider's activity is responsive.
Tries to get the state activity. Returns True if the activity state could be
queried successfully and false otherwise.
"""
if not self._ctx:
return False

try:
return bool(await self._ctx.get_raw_state())
except ApiException as e:
logger.error("Couldn't retrieve the activity state (%s)", e)
return False


ServiceType = TypeVar("ServiceType", bound=Service)

Expand Down
Loading

0 comments on commit 74fe935

Please sign in to comment.