Skip to content

Commit

Permalink
B0.6 to master (#607)
Browse files Browse the repository at this point in the history
* Delete unused temp files and close unpublished files in GftpStorageProvider (#543)

* Delete unused temp files and close unpublished URLs in GftpStorageProvider
* Add unit tests for GftpStorageProvider
* Ensure newly created temp files are closed before being used
  If we keep a file open we won't be able to delete it on Windows
* Address code review suggestions
Co-authored-by: Kuba Mazurek <[email protected]>

* Add team mention to goth nightly failure messages (#575)

* Read env var to determine if GftpProvider shoud call `gftp close` (#573)

* Read env var to determine if GftpProvider shoud call `gftp close`

* Add info on YAPAPI_USE_GFTP_CLOSE env var to README.md

* Fix upper bound for expiraio time in no-offers warning (#582)

* Check gftp version to determine if `gftp close URL` should be used (#583)

* Check gftp version to determine if `gftp close URL` should be used

* Add `# type: ignore` to `import semantic_version`

* Add unit test for various env var/gftp version combinations

* Retry GetExecBatchResults on ApiExceptions caused by GSB Errors (#588)

* Implement re-tries for ApiExceptions caused by GSB Errors

* Endpoint -> endpoint

* Apply fixes after code review

* Fixes after code review: part II

* debug -> warning

* Improve logs when activity is prematurely terminated on the provider

* Formatting

* Raise BatchError when an activity is terminated by the provider

* Add unit tests for PollingBatch behavior when GSB errors occur

Co-authored-by: filipgolem <[email protected]>
Co-authored-by: Filip <[email protected]>

* Bump version to 0.6.3-alpha.0 (#589)

* Add semantic-version to dependencies, bump version to 0.6.3-alpha.1 (#591)

* Bump version to 0.6.3 (#592)

* Fix stable branch name regex in nightly workflow (#593)

Co-authored-by: Kuba Mazurek <[email protected]>
Co-authored-by: filipgolem <[email protected]>
Co-authored-by: Filip <[email protected]>
  • Loading branch information
4 people authored Aug 24, 2021
1 parent a23f1e1 commit b5d92be
Show file tree
Hide file tree
Showing 6 changed files with 326 additions and 48 deletions.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ ya-aioclient = "^0.6"
toml = "^0.10.1"
srvresolver = "^0.3.5"
colorama = "^0.4.4"
semantic-version="^2.8"

# Adding `goth` to dependencies causes > 40 additional packages to be installed. Given
# that dependency resolution in `poetry` is rather slow, we'd like to avoid installing
Expand Down
126 changes: 126 additions & 0 deletions tests/rest/test_activity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
from typing import List, Optional, Tuple, Type
from unittest.mock import Mock

import pytest

from ya_activity.exceptions import ApiException
from yapapi.rest.activity import BatchError, PollingBatch


GetExecBatchResultsSpec = Tuple[Optional[Exception], List[str]]


def mock_activity(specs: List[GetExecBatchResultsSpec]):
"""Create a mock activity.
The argument `specs` is a list of pairs specifying the behavior of subsequent calls
to `get_exec_batch_results()`: i-th pair corresponds to the i-th call.
The first element of the pair is an optional error raised by the call, the second element
is the activity state (the `.state` component of the object returned by `Activity.state()`).
"""
i = -1

async def mock_results(*_args, **_kwargs):
nonlocal specs, i
i += 1
error = specs[i][0]
if error:
raise error
return [Mock(index=0)]

async def mock_state():
nonlocal specs, i
state = specs[i][1]
return Mock(state=state)

return Mock(state=mock_state, _api=Mock(get_exec_batch_results=mock_results))


GSB_ERROR = ":( GSB error: some endpoint address not found :("


@pytest.mark.parametrize(
"specs, expected_error",
[
# No errors
([(None, ["Running", "Running"])], None),
# Exception other than ApiException should stop iteration over batch results
(
[(ValueError("!?"), ["Running", "Running"])],
ValueError,
),
# ApiException not related to GSB should stop iteration over batch results
(
[(ApiException(status=400), ["Running", "Running"])],
ApiException,
),
# As above, but with status 500
(
[
(
ApiException(http_resp=Mock(status=500, data='{"message": "???"}')),
["Running", "Running"],
)
],
ApiException,
),
# ApiException not related to GSB should raise BatchError if activity is terminated
(
[
(
ApiException(http_resp=Mock(status=500, data='{"message": "???"}')),
["Running", "Terminated"],
)
],
BatchError,
),
# GSB-related ApiException should cause retrying if the activity is running
(
[
(
ApiException(http_resp=Mock(status=500, data=f'{{"message": "{GSB_ERROR}"}}')),
["Running", "Running"],
),
(None, ["Running", "Running"]),
],
None,
),
# As above, but max number of tries is reached
(
[
(
ApiException(http_resp=Mock(status=500, data=f'{{"message": "{GSB_ERROR}"}}')),
["Running", "Running"],
)
]
* PollingBatch.GET_EXEC_BATCH_RESULTS_MAX_TRIES,
ApiException,
),
# GSB-related ApiException should raise BatchError if activity is terminated
(
[
(
ApiException(http_resp=Mock(status=500, data=f'{{"message": "{GSB_ERROR}"}}')),
["Running", "Terminated"],
)
],
BatchError,
),
],
)
@pytest.mark.asyncio
async def test_polling_batch_on_gsb_error(
specs: List[GetExecBatchResultsSpec], expected_error: Optional[Type[Exception]]
) -> None:
"""Test the behavior of PollingBatch when get_exec_batch_results() raises exceptions."""

PollingBatch.GET_EXEC_BATCH_RESULTS_INTERVAL = 0.1

activity = mock_activity(specs)
batch = PollingBatch(activity, "batch_id", 1)
try:
async for _ in batch:
pass
assert expected_error is None
except Exception as error:
assert expected_error is not None and isinstance(error, expected_error)
64 changes: 59 additions & 5 deletions tests/storage/test_gftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,9 @@ class MockService(gftp.GftpDriver):
stripped from whitespaces.
"""

def __init__(self):
def __init__(self, version="0.0.0"):
self.published = defaultdict(list)
self._version = version

async def __aenter__(self):
return self
Expand All @@ -74,7 +75,7 @@ async def __aexit__(self, *exc_info):
pass

async def version(self) -> str:
return "0.0.0"
return self._version

async def publish(self, *, files: List[str]) -> List[gftp.PubLink]:
links = []
Expand Down Expand Up @@ -210,9 +211,7 @@ async def worker(id: int, provider: gftp.GftpProvider):
],
)
@pytest.mark.asyncio
async def test_gftp_close_env_var(
temp_dir, mock_service, monkeypatch, env_value, expect_unpublished
):
async def test_gftp_close_env(temp_dir, mock_service, monkeypatch, env_value, expect_unpublished):
"""Test that the GftpProvider calls close() on the underlying service."""

# Enable or disable using `gftp close` by GftpProvider
Expand Down Expand Up @@ -240,6 +239,61 @@ async def test_gftp_close_env_var(
assert (not mock_service.published["bytes"]) == expect_unpublished


@pytest.mark.parametrize(
"env_value, gftp_version, expect_unpublished",
[
("1", "0.6.0", True),
("1", "0.7.2", True),
("1", "0.7.3", True),
("0", "0.7.2", False),
("0", "0.7.3", False),
("0", "1.0.0", False),
("whatever", "0.6.0", False),
("whatever", "0.7.2", False),
("whatever", "0.7.3-rc.2", False),
("whatever", "0.7.3", True),
("whatever", "1.0.0", True),
(None, "0.6.0", False),
(None, "0.7.2", False),
(None, "0.7.3-rc.2", False),
(None, "0.7.3", True),
(None, "1.0.0", True),
],
)
@pytest.mark.asyncio
async def test_gftp_close_env_version(
temp_dir, monkeypatch, env_value, gftp_version, expect_unpublished
):
"""Test that the GftpProvider calls close() on the underlying service."""

service = MockService(version=gftp_version)
monkeypatch.setattr(gftp, "service", lambda _debug: service)

# Enable or disable using `gftp close` by GftpProvider
if env_value is not None:
monkeypatch.setenv(gftp.USE_GFTP_CLOSE_ENV_VAR, env_value)
else:
monkeypatch.delenv(gftp.USE_GFTP_CLOSE_ENV_VAR, raising=False)

async with gftp.GftpProvider(tmpdir=temp_dir) as provider:
assert isinstance(provider, gftp.GftpProvider)

src_1 = await provider.upload_bytes(b"bytes")
assert service.published["bytes"]

src_2 = await provider.upload_bytes(b"bytes")
assert service.published["bytes"]

assert src_1.download_url == src_2.download_url

await provider.release_source(src_1)
# the URL should not be unpublished just yet
assert service.published["bytes"]

await provider.release_source(src_2)
assert (not service.published["bytes"]) == expect_unpublished


ME = __file__


Expand Down
19 changes: 12 additions & 7 deletions yapapi/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,10 @@ class ProviderInfo:
subnet_tag: Optional[str]


MAX_AGREEMENT_EXPIRATION_MINUTES = round(MAX_AGREEMENT_EXPIRATION.seconds / 60)
MIN_AGREEMENT_EXPIRATION_MINUTES = round(MIN_AGREEMENT_EXPIRATION.seconds / 60)


# Some type aliases to make types more meaningful
AgreementId = str
JobId = str
Expand Down Expand Up @@ -425,12 +429,12 @@ def _handle(self, event: events.Event):
if not MIN_AGREEMENT_EXPIRATION <= provider_timeout <= MAX_AGREEMENT_EXPIRATION:
min, sec = divmod(round(timeout.total_seconds()), 60)
seconds_str = f" {sec} sec " if sec else " "
max_minutes = round(MAX_AGREEMENT_EXPIRATION.seconds / 60)
self.logger.warning(
f"Expiration time for your tasks is set to {min} min{seconds_str}from now."
" Providers may not be willing to take up tasks which expire sooner than 5 min"
f" or later than {max_minutes} min, counting from the moment they get your"
" demand."
f" Providers may not be willing to take up tasks which expire sooner than"
f" {MIN_AGREEMENT_EXPIRATION_MINUTES} min or later than"
f" {MAX_AGREEMENT_EXPIRATION_MINUTES} min, counting"
f" from the moment they get your demand."
)

elif isinstance(event, events.ProposalReceived):
Expand All @@ -453,9 +457,10 @@ def _handle(self, event: events.Event):
f"{self.time_waiting_for_proposals.seconds}s."
)
msg += (
" Make sure you're using the latest released versions of yagna and yapapi,"
" and the correct subnet. Also make sure that the timeout for computing all"
" tasks is within the 5 min to 30 min range."
f" Make sure you're using the latest released versions of yagna and yapapi,"
f" and the correct subnet. Also make sure that the timeout for computing all"
f" tasks is within the {MIN_AGREEMENT_EXPIRATION_MINUTES} min to"
f" {MAX_AGREEMENT_EXPIRATION_MINUTES} min range."
)
self.logger.warning(msg)

Expand Down
Loading

0 comments on commit b5d92be

Please sign in to comment.