diff --git a/pyproject.toml b/pyproject.toml index 038bfc504..cb6eed6e1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,6 +36,7 @@ ya-aioclient = "^0.6" toml = "^0.10.1" srvresolver = "^0.3.5" colorama = "^0.4.4" +semantic-version="^2.8" # Adding `goth` to dependencies causes > 40 additional packages to be installed. Given # that dependency resolution in `poetry` is rather slow, we'd like to avoid installing diff --git a/tests/rest/test_activity.py b/tests/rest/test_activity.py new file mode 100644 index 000000000..a10b20fbc --- /dev/null +++ b/tests/rest/test_activity.py @@ -0,0 +1,126 @@ +from typing import List, Optional, Tuple, Type +from unittest.mock import Mock + +import pytest + +from ya_activity.exceptions import ApiException +from yapapi.rest.activity import BatchError, PollingBatch + + +GetExecBatchResultsSpec = Tuple[Optional[Exception], List[str]] + + +def mock_activity(specs: List[GetExecBatchResultsSpec]): + """Create a mock activity. + + The argument `specs` is a list of pairs specifying the behavior of subsequent calls + to `get_exec_batch_results()`: i-th pair corresponds to the i-th call. + The first element of the pair is an optional error raised by the call, the second element + is the activity state (the `.state` component of the object returned by `Activity.state()`). + """ + i = -1 + + async def mock_results(*_args, **_kwargs): + nonlocal specs, i + i += 1 + error = specs[i][0] + if error: + raise error + return [Mock(index=0)] + + async def mock_state(): + nonlocal specs, i + state = specs[i][1] + return Mock(state=state) + + return Mock(state=mock_state, _api=Mock(get_exec_batch_results=mock_results)) + + +GSB_ERROR = ":( GSB error: some endpoint address not found :(" + + +@pytest.mark.parametrize( + "specs, expected_error", + [ + # No errors + ([(None, ["Running", "Running"])], None), + # Exception other than ApiException should stop iteration over batch results + ( + [(ValueError("!?"), ["Running", "Running"])], + ValueError, + ), + # ApiException not related to GSB should stop iteration over batch results + ( + [(ApiException(status=400), ["Running", "Running"])], + ApiException, + ), + # As above, but with status 500 + ( + [ + ( + ApiException(http_resp=Mock(status=500, data='{"message": "???"}')), + ["Running", "Running"], + ) + ], + ApiException, + ), + # ApiException not related to GSB should raise BatchError if activity is terminated + ( + [ + ( + ApiException(http_resp=Mock(status=500, data='{"message": "???"}')), + ["Running", "Terminated"], + ) + ], + BatchError, + ), + # GSB-related ApiException should cause retrying if the activity is running + ( + [ + ( + ApiException(http_resp=Mock(status=500, data=f'{{"message": "{GSB_ERROR}"}}')), + ["Running", "Running"], + ), + (None, ["Running", "Running"]), + ], + None, + ), + # As above, but max number of tries is reached + ( + [ + ( + ApiException(http_resp=Mock(status=500, data=f'{{"message": "{GSB_ERROR}"}}')), + ["Running", "Running"], + ) + ] + * PollingBatch.GET_EXEC_BATCH_RESULTS_MAX_TRIES, + ApiException, + ), + # GSB-related ApiException should raise BatchError if activity is terminated + ( + [ + ( + ApiException(http_resp=Mock(status=500, data=f'{{"message": "{GSB_ERROR}"}}')), + ["Running", "Terminated"], + ) + ], + BatchError, + ), + ], +) +@pytest.mark.asyncio +async def test_polling_batch_on_gsb_error( + specs: List[GetExecBatchResultsSpec], expected_error: Optional[Type[Exception]] +) -> None: + """Test the behavior of PollingBatch when get_exec_batch_results() raises exceptions.""" + + PollingBatch.GET_EXEC_BATCH_RESULTS_INTERVAL = 0.1 + + activity = mock_activity(specs) + batch = PollingBatch(activity, "batch_id", 1) + try: + async for _ in batch: + pass + assert expected_error is None + except Exception as error: + assert expected_error is not None and isinstance(error, expected_error) diff --git a/tests/storage/test_gftp.py b/tests/storage/test_gftp.py index 61821343c..653a1376e 100644 --- a/tests/storage/test_gftp.py +++ b/tests/storage/test_gftp.py @@ -64,8 +64,9 @@ class MockService(gftp.GftpDriver): stripped from whitespaces. """ - def __init__(self): + def __init__(self, version="0.0.0"): self.published = defaultdict(list) + self._version = version async def __aenter__(self): return self @@ -74,7 +75,7 @@ async def __aexit__(self, *exc_info): pass async def version(self) -> str: - return "0.0.0" + return self._version async def publish(self, *, files: List[str]) -> List[gftp.PubLink]: links = [] @@ -210,9 +211,7 @@ async def worker(id: int, provider: gftp.GftpProvider): ], ) @pytest.mark.asyncio -async def test_gftp_close_env_var( - temp_dir, mock_service, monkeypatch, env_value, expect_unpublished -): +async def test_gftp_close_env(temp_dir, mock_service, monkeypatch, env_value, expect_unpublished): """Test that the GftpProvider calls close() on the underlying service.""" # Enable or disable using `gftp close` by GftpProvider @@ -240,6 +239,61 @@ async def test_gftp_close_env_var( assert (not mock_service.published["bytes"]) == expect_unpublished +@pytest.mark.parametrize( + "env_value, gftp_version, expect_unpublished", + [ + ("1", "0.6.0", True), + ("1", "0.7.2", True), + ("1", "0.7.3", True), + ("0", "0.7.2", False), + ("0", "0.7.3", False), + ("0", "1.0.0", False), + ("whatever", "0.6.0", False), + ("whatever", "0.7.2", False), + ("whatever", "0.7.3-rc.2", False), + ("whatever", "0.7.3", True), + ("whatever", "1.0.0", True), + (None, "0.6.0", False), + (None, "0.7.2", False), + (None, "0.7.3-rc.2", False), + (None, "0.7.3", True), + (None, "1.0.0", True), + ], +) +@pytest.mark.asyncio +async def test_gftp_close_env_version( + temp_dir, monkeypatch, env_value, gftp_version, expect_unpublished +): + """Test that the GftpProvider calls close() on the underlying service.""" + + service = MockService(version=gftp_version) + monkeypatch.setattr(gftp, "service", lambda _debug: service) + + # Enable or disable using `gftp close` by GftpProvider + if env_value is not None: + monkeypatch.setenv(gftp.USE_GFTP_CLOSE_ENV_VAR, env_value) + else: + monkeypatch.delenv(gftp.USE_GFTP_CLOSE_ENV_VAR, raising=False) + + async with gftp.GftpProvider(tmpdir=temp_dir) as provider: + assert isinstance(provider, gftp.GftpProvider) + + src_1 = await provider.upload_bytes(b"bytes") + assert service.published["bytes"] + + src_2 = await provider.upload_bytes(b"bytes") + assert service.published["bytes"] + + assert src_1.download_url == src_2.download_url + + await provider.release_source(src_1) + # the URL should not be unpublished just yet + assert service.published["bytes"] + + await provider.release_source(src_2) + assert (not service.published["bytes"]) == expect_unpublished + + ME = __file__ diff --git a/yapapi/log.py b/yapapi/log.py index 7b25078c1..feef0b99c 100644 --- a/yapapi/log.py +++ b/yapapi/log.py @@ -228,6 +228,10 @@ class ProviderInfo: subnet_tag: Optional[str] +MAX_AGREEMENT_EXPIRATION_MINUTES = round(MAX_AGREEMENT_EXPIRATION.seconds / 60) +MIN_AGREEMENT_EXPIRATION_MINUTES = round(MIN_AGREEMENT_EXPIRATION.seconds / 60) + + # Some type aliases to make types more meaningful AgreementId = str JobId = str @@ -425,12 +429,12 @@ def _handle(self, event: events.Event): if not MIN_AGREEMENT_EXPIRATION <= provider_timeout <= MAX_AGREEMENT_EXPIRATION: min, sec = divmod(round(timeout.total_seconds()), 60) seconds_str = f" {sec} sec " if sec else " " - max_minutes = round(MAX_AGREEMENT_EXPIRATION.seconds / 60) self.logger.warning( f"Expiration time for your tasks is set to {min} min{seconds_str}from now." - " Providers may not be willing to take up tasks which expire sooner than 5 min" - f" or later than {max_minutes} min, counting from the moment they get your" - " demand." + f" Providers may not be willing to take up tasks which expire sooner than" + f" {MIN_AGREEMENT_EXPIRATION_MINUTES} min or later than" + f" {MAX_AGREEMENT_EXPIRATION_MINUTES} min, counting" + f" from the moment they get your demand." ) elif isinstance(event, events.ProposalReceived): @@ -453,9 +457,10 @@ def _handle(self, event: events.Event): f"{self.time_waiting_for_proposals.seconds}s." ) msg += ( - " Make sure you're using the latest released versions of yagna and yapapi," - " and the correct subnet. Also make sure that the timeout for computing all" - " tasks is within the 5 min to 30 min range." + f" Make sure you're using the latest released versions of yagna and yapapi," + f" and the correct subnet. Also make sure that the timeout for computing all" + f" tasks is within the {MIN_AGREEMENT_EXPIRATION_MINUTES} min to" + f" {MAX_AGREEMENT_EXPIRATION_MINUTES} min range." ) self.logger.warning(msg) diff --git a/yapapi/rest/activity.py b/yapapi/rest/activity.py index d8e20f36f..9228c7e9c 100644 --- a/yapapi/rest/activity.py +++ b/yapapi/rest/activity.py @@ -4,7 +4,7 @@ from datetime import datetime, timedelta, timezone import json import logging -from typing import AsyncIterator, List, Optional, Type, Any, Dict +from typing import AsyncIterator, List, Optional, Tuple, Type, Any, Dict from typing_extensions import AsyncContextManager, AsyncIterable @@ -21,6 +21,8 @@ ) from yapapi import events +from yapapi.rest.common import is_intermittent_error, SuppressedExceptions + _log = logging.getLogger("yapapi.rest") @@ -78,8 +80,8 @@ async def send(self, script: List[dict], deadline: Optional[datetime] = None) -> batch_id = await self._api.call_exec(self._id, yaa.ExeScriptRequest(text=script_txt)) if self._stream_events: - return StreamingBatch(self._api, self._id, batch_id, len(script), deadline) - return PollingBatch(self._api, self._id, batch_id, len(script), deadline) + return StreamingBatch(self, batch_id, len(script), deadline) + return PollingBatch(self, batch_id, len(script), deadline) async def __aenter__(self) -> "Activity": return self @@ -151,22 +153,19 @@ class BatchTimeoutError(BatchError): class Batch(abc.ABC, AsyncIterable[events.CommandEventContext]): """Abstract base class for iterating over events related to a batch running on provider.""" - _api: RequestorControlApi - _activity_id: str + _activity: Activity _batch_id: str _size: int _deadline: datetime def __init__( self, - api: RequestorControlApi, - activity_id: str, + activity: Activity, batch_id: str, batch_size: int, deadline: Optional[datetime] = None, ) -> None: - self._api = api - self._activity_id = activity_id + self._activity = activity self._batch_id = batch_id self._size = batch_size self._deadline = ( @@ -184,29 +183,83 @@ def id(self): return self._batch_id +def _is_gsb_endpoint_not_found_error(err: ApiException) -> bool: + """Check if `err` is caused by "Endpoint address not found" GSB error.""" + + if err.status != 500: + return False + try: + msg = json.loads(err.body)["message"] + return "GSB error" in msg and "endpoint address not found" in msg + except Exception: + _log.debug("Cannot read error message from ApiException", exc_info=True) + return False + + class PollingBatch(Batch): """A `Batch` implementation that polls the server repeatedly for command status.""" + GET_EXEC_BATCH_RESULTS_MAX_TRIES = 3 + """Max number of attempts to call GetExecBatchResults if a GSB error occurs.""" + + GET_EXEC_BATCH_RESULTS_INTERVAL = 3.0 + """Time in seconds before retrying GetExecBatchResults after a GSB error occurs.""" + + async def _activity_terminated(self) -> Tuple[bool, Optional[str], Optional[str]]: + """Check if the activity we're using is in "Terminated" state.""" + try: + state = await self._activity.state() + return "Terminated" in state.state, state.reason, state.error_message + except Exception: + _log.debug("Cannot query activity state", exc_info=True) + return False, None, None + + async def _get_results(self, timeout: float) -> List[yaa.ExeScriptCommandResult]: + """Call GetExecBatchResults with re-trying on "Endpoint address not found" GSB error.""" + + for n in range(self.GET_EXEC_BATCH_RESULTS_MAX_TRIES, 0, -1): + try: + results = await self._activity._api.get_exec_batch_results( + self._activity._id, + self._batch_id, + timeout=min(timeout, 5), + _request_timeout=min(timeout, 5) + 0.5, + ) + return results + except ApiException as err: + terminated, reason, error_msg = await self._activity_terminated() + if terminated: + raise BatchError("Activity terminated by provider", reason, error_msg) + # TODO: add and use a new Exception class (subclass of BatchError) + # to indicate closing the activity by the provider + if not _is_gsb_endpoint_not_found_error(err): + raise err + msg = "GetExecBatchResults failed due to GSB error" + if n > 1: + _log.debug("%s, retrying in %s s", msg, self.GET_EXEC_BATCH_RESULTS_INTERVAL) + await asyncio.sleep(self.GET_EXEC_BATCH_RESULTS_INTERVAL) + else: + _log.debug( + "%s, giving up after %d attempts", + msg, + self.GET_EXEC_BATCH_RESULTS_MAX_TRIES, + ) + raise err + + return [] + async def __aiter__(self) -> AsyncIterator[events.CommandEventContext]: last_idx = 0 + while last_idx < self._size: timeout = self.seconds_left() if timeout <= 0: raise BatchTimeoutError() - try: - results: List[yaa.ExeScriptCommandResult] = await self._api.get_exec_batch_results( - self._activity_id, - self._batch_id, - command_index=last_idx, - timeout=min(timeout, 5), - _request_timeout=min(timeout, 5) + 1, - ) - except asyncio.TimeoutError: - continue - except ApiException as err: - if err.status == 408: - continue - raise + + results: List[yaa.ExeScriptCommandResult] = [] + async with SuppressedExceptions(is_intermittent_error): + results = await self._get_results(timeout=min(timeout, 5)) + any_new: bool = False results = results[last_idx:] for result in results: @@ -236,13 +289,13 @@ class StreamingBatch(Batch): async def __aiter__(self) -> AsyncIterator[events.CommandEventContext]: from aiohttp_sse_client import client as sse_client # type: ignore - api_client = self._api.api_client + api_client = self._activity._api.api_client host = api_client.configuration.host headers = api_client.default_headers api_client.update_params_for_auth(headers, None, ["app_key"]) - activity_id = self._activity_id + activity_id = self._activity._id batch_id = self._batch_id last_idx = self._size - 1 diff --git a/yapapi/storage/gftp.py b/yapapi/storage/gftp.py index 8487285f2..63ad973ef 100644 --- a/yapapi/storage/gftp.py +++ b/yapapi/storage/gftp.py @@ -28,6 +28,7 @@ import jsonrpc_base # type: ignore from async_exit_stack import AsyncExitStack # type: ignore +import semantic_version # type: ignore from typing_extensions import Literal, Protocol, TypedDict from yapapi.storage import StorageProvider, Destination, Source, Content @@ -222,8 +223,13 @@ def _temp_file(temp_dir: Path) -> Iterator[Path]: def _delete_if_exists(path: Path) -> None: if path.exists(): - path.unlink() - _logger.debug("Deleted temporary file %s", path) + try: + path.unlink() + _logger.debug("Deleted temporary file %s", path) + except PermissionError as err: + # We're on Windows and using `gftp` < 0.7.3, so the file is kept open + # by `gftp` and cannot be deleted. + _logger.debug("Cannot delete file: %s", err) USE_GFTP_CLOSE_ENV_VAR = "YAPAPI_USE_GFTP_CLOSE" @@ -245,8 +251,30 @@ def read_use_gftp_close_env_var() -> Optional[bool]: return None +MIN_GFTP_VERSION_THAT_CAN_GFTP_CLOSE = semantic_version.Version("0.7.3") + + class GftpProvider(StorageProvider, AsyncContextManager[StorageProvider]): - """A StorageProvider that communicates with `gftp server` through JSON-RPC.""" + """A StorageProvider that communicates with `gftp server` through JSON-RPC. + + The provider keeps track of the files published by `gftp` and their URLs. + If an URL no longer needs to be published then the provider _should_ issue the + `gftp close URL` command, so the file published with this URL is closed by `gftp`. + + However, `gftp close URL` may cause errors, due to a bug in `gftp` prior to version 0.7.3 + (see https://github.com/golemfactory/yagna/pull/1501). Therefore the provider uses + the following logic to dermine if it should use the `gftp close URL` command: + 1. If the environment variable `YAPAPI_USE_GFTP_CLOSE` is set to a truthy value, + then `gftp close URL` will be used. + 2. If the environment variable `YAPAPI_USE_GFTP_CLOSE` is set to a falsy value, + then `gftp close URL` will not be used. + 3. If neither 1 nor 2 holds and the version reported by `gftp` is 0.7.3 or larger + (according to Semantic Versioning 2.0.0) then `gftp close URL` will be used. + 4. Otherwise `gftp close URL` will not be used. + + Note: Reading the `YAPAPI_USE_GFTP_CLOSE` variable is done once, when the provider + is instantiated, and the version check is made in the provider's `__aenter__()` method. + """ @dataclass class URLInfo: @@ -280,12 +308,8 @@ def __init__(self, *, tmpdir: Optional[str] = None): self._lock: asyncio.Lock = asyncio.Lock() # Flag indicating if this `GftpProvider` will close unpublished URLs. - # If set to `True` then the provider will call `gftp close ` for an URL - # that has no longer any published files. This should be the default behavior, - # but it may cause errors, due to a bug in `gftp` prior to version `0.7.3` - # (see https://github.com/golemfactory/yagna/pull/1501), and is therefore turned - # on only if `read_use_gftp_close_env_var()` returns `True`. - self._close_urls: bool = read_use_gftp_close_env_var() or False + # See this class' docstring for more details. + self._close_urls: Optional[bool] = read_use_gftp_close_env_var() # Reference to an external process running the `gftp server` command self._process: Optional["__Process"] = None @@ -297,8 +321,23 @@ async def __aenter__(self) -> StorageProvider: ) _logger.debug("Creating a temporary directory %s", self._temp_dir) process = await self.__get_process() - _ver = await process.version() - assert _ver + gftp_version = await process.version() + assert gftp_version + + if self._close_urls is None: + try: + # Gftp_version could be something like `7.2.3 (10116c7d 2021-07-28 build #164)`, + # we need to discard everything after the first space. + semver = semantic_version.Version(gftp_version.split()[0]) + self._close_urls = semver >= MIN_GFTP_VERSION_THAT_CAN_GFTP_CLOSE + _logger.debug( + "Setting _close_urls to %s, gftp version: %s", self._close_urls, gftp_version + ) + except ValueError: + _logger.warning("Cannot parse gftp version info '%s'", gftp_version) + self._close_urls = False + assert self._close_urls is not None + return self async def __aexit__(