Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

B0.6 to master #607

Merged
merged 12 commits into from
Aug 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ ya-aioclient = "^0.6"
toml = "^0.10.1"
srvresolver = "^0.3.5"
colorama = "^0.4.4"
semantic-version="^2.8"

# Adding `goth` to dependencies causes > 40 additional packages to be installed. Given
# that dependency resolution in `poetry` is rather slow, we'd like to avoid installing
Expand Down
126 changes: 126 additions & 0 deletions tests/rest/test_activity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
from typing import List, Optional, Tuple, Type
from unittest.mock import Mock

import pytest

from ya_activity.exceptions import ApiException
from yapapi.rest.activity import BatchError, PollingBatch


GetExecBatchResultsSpec = Tuple[Optional[Exception], List[str]]


def mock_activity(specs: List[GetExecBatchResultsSpec]):
"""Create a mock activity.

The argument `specs` is a list of pairs specifying the behavior of subsequent calls
to `get_exec_batch_results()`: i-th pair corresponds to the i-th call.
The first element of the pair is an optional error raised by the call, the second element
is the activity state (the `.state` component of the object returned by `Activity.state()`).
"""
i = -1

async def mock_results(*_args, **_kwargs):
nonlocal specs, i
i += 1
error = specs[i][0]
if error:
raise error
return [Mock(index=0)]

async def mock_state():
nonlocal specs, i
state = specs[i][1]
return Mock(state=state)

return Mock(state=mock_state, _api=Mock(get_exec_batch_results=mock_results))


GSB_ERROR = ":( GSB error: some endpoint address not found :("


@pytest.mark.parametrize(
"specs, expected_error",
[
# No errors
([(None, ["Running", "Running"])], None),
# Exception other than ApiException should stop iteration over batch results
(
[(ValueError("!?"), ["Running", "Running"])],
ValueError,
),
# ApiException not related to GSB should stop iteration over batch results
(
[(ApiException(status=400), ["Running", "Running"])],
ApiException,
),
# As above, but with status 500
(
[
(
ApiException(http_resp=Mock(status=500, data='{"message": "???"}')),
["Running", "Running"],
)
],
ApiException,
),
# ApiException not related to GSB should raise BatchError if activity is terminated
(
[
(
ApiException(http_resp=Mock(status=500, data='{"message": "???"}')),
["Running", "Terminated"],
)
],
BatchError,
),
# GSB-related ApiException should cause retrying if the activity is running
(
[
(
ApiException(http_resp=Mock(status=500, data=f'{{"message": "{GSB_ERROR}"}}')),
["Running", "Running"],
),
(None, ["Running", "Running"]),
],
None,
),
# As above, but max number of tries is reached
(
[
(
ApiException(http_resp=Mock(status=500, data=f'{{"message": "{GSB_ERROR}"}}')),
["Running", "Running"],
)
]
* PollingBatch.GET_EXEC_BATCH_RESULTS_MAX_TRIES,
ApiException,
),
# GSB-related ApiException should raise BatchError if activity is terminated
(
[
(
ApiException(http_resp=Mock(status=500, data=f'{{"message": "{GSB_ERROR}"}}')),
["Running", "Terminated"],
)
],
BatchError,
),
],
)
@pytest.mark.asyncio
async def test_polling_batch_on_gsb_error(
specs: List[GetExecBatchResultsSpec], expected_error: Optional[Type[Exception]]
) -> None:
"""Test the behavior of PollingBatch when get_exec_batch_results() raises exceptions."""

PollingBatch.GET_EXEC_BATCH_RESULTS_INTERVAL = 0.1

activity = mock_activity(specs)
batch = PollingBatch(activity, "batch_id", 1)
try:
async for _ in batch:
pass
assert expected_error is None
except Exception as error:
assert expected_error is not None and isinstance(error, expected_error)
64 changes: 59 additions & 5 deletions tests/storage/test_gftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,9 @@ class MockService(gftp.GftpDriver):
stripped from whitespaces.
"""

def __init__(self):
def __init__(self, version="0.0.0"):
self.published = defaultdict(list)
self._version = version

async def __aenter__(self):
return self
Expand All @@ -74,7 +75,7 @@ async def __aexit__(self, *exc_info):
pass

async def version(self) -> str:
return "0.0.0"
return self._version

async def publish(self, *, files: List[str]) -> List[gftp.PubLink]:
links = []
Expand Down Expand Up @@ -210,9 +211,7 @@ async def worker(id: int, provider: gftp.GftpProvider):
],
)
@pytest.mark.asyncio
async def test_gftp_close_env_var(
temp_dir, mock_service, monkeypatch, env_value, expect_unpublished
):
async def test_gftp_close_env(temp_dir, mock_service, monkeypatch, env_value, expect_unpublished):
"""Test that the GftpProvider calls close() on the underlying service."""

# Enable or disable using `gftp close` by GftpProvider
Expand Down Expand Up @@ -240,6 +239,61 @@ async def test_gftp_close_env_var(
assert (not mock_service.published["bytes"]) == expect_unpublished


@pytest.mark.parametrize(
"env_value, gftp_version, expect_unpublished",
[
("1", "0.6.0", True),
("1", "0.7.2", True),
("1", "0.7.3", True),
("0", "0.7.2", False),
("0", "0.7.3", False),
("0", "1.0.0", False),
("whatever", "0.6.0", False),
("whatever", "0.7.2", False),
("whatever", "0.7.3-rc.2", False),
("whatever", "0.7.3", True),
("whatever", "1.0.0", True),
(None, "0.6.0", False),
(None, "0.7.2", False),
(None, "0.7.3-rc.2", False),
(None, "0.7.3", True),
(None, "1.0.0", True),
],
)
@pytest.mark.asyncio
async def test_gftp_close_env_version(
temp_dir, monkeypatch, env_value, gftp_version, expect_unpublished
):
"""Test that the GftpProvider calls close() on the underlying service."""

service = MockService(version=gftp_version)
monkeypatch.setattr(gftp, "service", lambda _debug: service)

# Enable or disable using `gftp close` by GftpProvider
if env_value is not None:
monkeypatch.setenv(gftp.USE_GFTP_CLOSE_ENV_VAR, env_value)
else:
monkeypatch.delenv(gftp.USE_GFTP_CLOSE_ENV_VAR, raising=False)

async with gftp.GftpProvider(tmpdir=temp_dir) as provider:
assert isinstance(provider, gftp.GftpProvider)

src_1 = await provider.upload_bytes(b"bytes")
assert service.published["bytes"]

src_2 = await provider.upload_bytes(b"bytes")
assert service.published["bytes"]

assert src_1.download_url == src_2.download_url

await provider.release_source(src_1)
# the URL should not be unpublished just yet
assert service.published["bytes"]

await provider.release_source(src_2)
assert (not service.published["bytes"]) == expect_unpublished


ME = __file__


Expand Down
19 changes: 12 additions & 7 deletions yapapi/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,10 @@ class ProviderInfo:
subnet_tag: Optional[str]


MAX_AGREEMENT_EXPIRATION_MINUTES = round(MAX_AGREEMENT_EXPIRATION.seconds / 60)
MIN_AGREEMENT_EXPIRATION_MINUTES = round(MIN_AGREEMENT_EXPIRATION.seconds / 60)


# Some type aliases to make types more meaningful
AgreementId = str
JobId = str
Expand Down Expand Up @@ -425,12 +429,12 @@ def _handle(self, event: events.Event):
if not MIN_AGREEMENT_EXPIRATION <= provider_timeout <= MAX_AGREEMENT_EXPIRATION:
min, sec = divmod(round(timeout.total_seconds()), 60)
seconds_str = f" {sec} sec " if sec else " "
max_minutes = round(MAX_AGREEMENT_EXPIRATION.seconds / 60)
self.logger.warning(
f"Expiration time for your tasks is set to {min} min{seconds_str}from now."
" Providers may not be willing to take up tasks which expire sooner than 5 min"
f" or later than {max_minutes} min, counting from the moment they get your"
" demand."
f" Providers may not be willing to take up tasks which expire sooner than"
f" {MIN_AGREEMENT_EXPIRATION_MINUTES} min or later than"
f" {MAX_AGREEMENT_EXPIRATION_MINUTES} min, counting"
f" from the moment they get your demand."
)

elif isinstance(event, events.ProposalReceived):
Expand All @@ -453,9 +457,10 @@ def _handle(self, event: events.Event):
f"{self.time_waiting_for_proposals.seconds}s."
)
msg += (
" Make sure you're using the latest released versions of yagna and yapapi,"
" and the correct subnet. Also make sure that the timeout for computing all"
" tasks is within the 5 min to 30 min range."
f" Make sure you're using the latest released versions of yagna and yapapi,"
f" and the correct subnet. Also make sure that the timeout for computing all"
f" tasks is within the {MIN_AGREEMENT_EXPIRATION_MINUTES} min to"
f" {MAX_AGREEMENT_EXPIRATION_MINUTES} min range."
)
self.logger.warning(msg)

Expand Down
Loading