Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Repeat payment api calls #473

Merged
merged 5 commits into from
Jun 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"

[tool.poetry]
name = "yapapi"
version = "0.6.0"
version = "0.6.1-alpha.0"
description = "High-level Python API for the New Golem"
authors = ["Przemysław K. Rekucki <[email protected]>", "GolemFactory <[email protected]>"]
license = "LGPL-3.0-or-later"
Expand Down
106 changes: 106 additions & 0 deletions tests/rest/test_repeat_on_error.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import asyncio

import aiohttp
import pytest

import ya_activity
import ya_market
import ya_payment

from yapapi.rest.common import repeat_on_error, SuppressedExceptions, is_intermittent_error


@pytest.mark.parametrize(
"max_tries, exceptions, calls_expected, expected_error",
[
(1, [], 1, None),
(1, [asyncio.TimeoutError()], 1, asyncio.TimeoutError),
(1, [ya_activity.ApiException(408)], 1, ya_activity.ApiException),
(1, [ya_activity.ApiException(500)], 1, ya_activity.ApiException),
(1, [ValueError()], 1, ValueError),
#
(2, [], 1, None),
(2, [asyncio.TimeoutError()], 2, None),
(2, [ya_activity.ApiException(408)], 2, None),
(2, [ya_market.ApiException(408)], 2, None),
(2, [ya_payment.ApiException(408)], 2, None),
(2, [ya_activity.ApiException(500)], 1, ya_activity.ApiException),
(2, [aiohttp.ServerDisconnectedError()], 2, None),
(2, [aiohttp.ClientOSError(32, "Broken pipe")], 2, None),
(2, [aiohttp.ClientOSError(1132, "UnBroken pipe")], 1, aiohttp.ClientOSError),
(2, [ValueError()], 1, ValueError),
(2, [asyncio.TimeoutError()] * 2, 2, asyncio.TimeoutError),
#
(3, [], 1, None),
(3, [asyncio.TimeoutError()], 2, None),
(3, [ya_activity.ApiException(408)], 2, None),
(3, [asyncio.TimeoutError()] * 2, 3, None),
(3, [asyncio.TimeoutError()] * 3, 3, asyncio.TimeoutError),
(3, [ya_activity.ApiException(500)], 1, ya_activity.ApiException),
(3, [asyncio.TimeoutError(), ValueError()], 2, ValueError),
],
)
@pytest.mark.asyncio
async def test_repeat_on_error(max_tries, exceptions, calls_expected, expected_error):

calls_made = 0

@repeat_on_error(max_tries=max_tries, interval=0.0)
async def request():
nonlocal calls_made, exceptions
calls_made += 1
if exceptions:
e = exceptions[0]
exceptions = exceptions[1:]
raise e
return True

try:
await request()
except Exception as e:
assert expected_error is not None, f"Unexpected exception: {e}"
assert isinstance(e, expected_error), f"Expected an {expected_error}, got {e}"
assert (
calls_made == calls_expected
), f"{calls_made} attempts were made, expected {calls_expected}"


@pytest.mark.asyncio
async def test_suppressed_exceptions():

async with SuppressedExceptions(is_intermittent_error) as se:
pass
assert se.exception is None

async with SuppressedExceptions(is_intermittent_error) as se:
raise asyncio.TimeoutError()
assert isinstance(se.exception, asyncio.TimeoutError)

async with SuppressedExceptions(is_intermittent_error) as se:
raise aiohttp.ClientOSError(32, "Broken pipe")
assert isinstance(se.exception, aiohttp.ClientOSError)

async with SuppressedExceptions(is_intermittent_error) as se:
raise aiohttp.ServerDisconnectedError()
assert isinstance(se.exception, aiohttp.ServerDisconnectedError)

with pytest.raises(AssertionError):
async with SuppressedExceptions(is_intermittent_error):
raise AssertionError()


@pytest.mark.asyncio
async def test_suppressed_exceptions_with_return():
async def success():
return "success"

async def failure():
raise asyncio.TimeoutError()

async def func(request):
async with SuppressedExceptions(is_intermittent_error):
return await request
return "failure" # noqa

assert await func(success()) == "success"
assert await func(failure()) == "failure"
91 changes: 91 additions & 0 deletions yapapi/rest/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import asyncio
import functools
import logging
from typing import Callable, Optional

import aiohttp

import ya_market
import ya_activity
import ya_payment


logger = logging.getLogger(__name__)


def is_intermittent_error(e: Exception) -> bool:
"""Check if `e` indicates an intermittent communication failure such as network timeout."""

is_timeout_exception = isinstance(e, asyncio.TimeoutError) or (
isinstance(e, (ya_activity.ApiException, ya_market.ApiException, ya_payment.ApiException))
and e.status in (408, 504)
)

return (
is_timeout_exception
or isinstance(e, aiohttp.ServerDisconnectedError)
# OS error with errno 32 is "Broken pipe"
or (isinstance(e, aiohttp.ClientOSError) and e.errno == 32)
)


class SuppressedExceptions:
"""An async context manager for suppressing exceptions satisfying given condition."""

exception: Optional[Exception]

def __init__(self, condition: Callable[[Exception], bool], report_exceptions: bool = True):
self._condition = condition
self._report_exceptions = report_exceptions
self.exception = None

async def __aenter__(self) -> "SuppressedExceptions":
return self

async def __aexit__(self, exc_type, exc_value, traceback):
if exc_value and self._condition(exc_value):
self.exception = exc_value
if self._report_exceptions:
logger.debug(
"Exception suppressed: %r", exc_value, exc_info=(exc_type, exc_value, traceback)
)
return True
return False


def repeat_on_error(
max_tries: int,
condition: Callable[[Exception], bool] = is_intermittent_error,
interval: float = 1.0,
):
"""Decorate a function to repeat calls up to `max_tries` times when errors occur.

Only exceptions satisfying the given `condition` will cause the decorated function
to be retried. All remaining exceptions will fall through.
"""

def decorator(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
"""Make at most `max_tries` attempts to call `func`."""

for try_num in range(1, max_tries + 1):

if try_num > 1:
await asyncio.sleep(interval)

async with SuppressedExceptions(condition, False) as se:
return await func(*args, **kwargs)

assert se.exception # noqa (unreachable)
repeat = try_num < max_tries
msg = f"API call timed out (attempt {try_num}/{max_tries}), "
msg += f"retrying in {interval} s" if repeat else "giving up"
# Don't print traceback if this was the last attempt, let the caller do it.
logger.debug(msg, exc_info=repeat)
if not repeat:
raise se.exception

return wrapper

return decorator
13 changes: 9 additions & 4 deletions yapapi/rest/market.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
import asyncio
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
import logging
from types import TracebackType
from typing import AsyncIterator, Optional, TypeVar, Type, Generator, Any, Generic

from typing_extensions import Awaitable, AsyncContextManager

from ..props import Model
from ya_market import ApiClient, ApiException, RequestorApi, models # type: ignore
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone

from .common import is_intermittent_error, SuppressedExceptions
from ..props import Model


_ModelType = TypeVar("_ModelType", bound=Model)

Expand Down Expand Up @@ -191,8 +194,10 @@ async def events(self) -> AsyncIterator[OfferProposal]:
"""Yield counter-proposals based on the incoming, matching Offers."""
while self._open:

proposals = []
try:
proposals = await self._api.collect_offers(self._id, timeout=10, max_events=10)
async with SuppressedExceptions(is_intermittent_error):
proposals = await self._api.collect_offers(self._id, timeout=5, max_events=10)
except ApiException as ex:
if ex.status == 404:
logger.debug(
Expand Down
16 changes: 13 additions & 3 deletions yapapi/rest/payment.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from decimal import Decimal
from datetime import datetime, timezone, timedelta
from dataclasses import dataclass
from .common import repeat_on_error, is_intermittent_error, SuppressedExceptions
from .resource import ResourceCtx


Expand All @@ -18,6 +19,7 @@ def __init__(self, _api: RequestorApi, _base: yap.Invoice):
self.__dict__.update(**_base.__dict__)
self._api: RequestorApi = _api

@repeat_on_error(max_tries=5)
async def accept(self, *, amount: Union[Decimal, str], allocation: "Allocation"):
acceptance = yap.Acceptance(total_amount_accepted=str(amount), allocation_id=allocation.id)
await self._api.accept_invoice(self.invoice_id, acceptance)
Expand All @@ -28,6 +30,7 @@ def __init__(self, _api: RequestorApi, _base: yap.DebitNote):
self.__dict__.update(**_base.__dict__)
self._api: RequestorApi = _api

@repeat_on_error(max_tries=5)
async def accept(self, *, amount: Union[Decimal, str], allocation: "Allocation"):
acceptance = yap.Acceptance(total_amount_accepted=str(amount), allocation_id=allocation.id)
await self._api.accept_debit_note(self.debit_note_id, acceptance)
Expand Down Expand Up @@ -67,13 +70,15 @@ class Allocation(_Link):
expires: Optional[datetime]
"Allocation expiration timestamp"

@repeat_on_error(max_tries=5)
async def details(self) -> AllocationDetails:
details: yap.Allocation = await self._api.get_allocation(self.id)
return AllocationDetails(
spent_amount=Decimal(details.spent_amount),
remaining_amount=Decimal(details.remaining_amount),
)

@repeat_on_error(max_tries=5)
async def delete(self):
await self._api.release_allocation(self.id)

Expand Down Expand Up @@ -191,6 +196,7 @@ async def accounts(self) -> AsyncIterator[Account]:
async def decorate_demand(self, ids: List[str]) -> yap.MarketDecoration:
return await self._api.get_demand_decorations(ids)

@repeat_on_error(max_tries=5)
async def debit_note(self, debit_note_id: str) -> DebitNote:
debit_note = await self._api.get_debit_note(debit_note_id)
return DebitNote(_api=self._api, _base=debit_note)
Expand All @@ -200,21 +206,23 @@ async def invoices(self) -> AsyncIterator[Invoice]:
for invoice_obj in cast(Iterable[yap.Invoice], await self._api.get_invoices()):
yield Invoice(_api=self._api, _base=invoice_obj)

@repeat_on_error(max_tries=5)
async def invoice(self, invoice_id: str) -> Invoice:
invoice_obj = await self._api.get_invoice(invoice_id)
return Invoice(_api=self._api, _base=invoice_obj)

def incoming_invoices(self) -> AsyncIterator[Invoice]:
ts = datetime.now(timezone.utc)
api = self._api

async def fetch(init_ts: datetime):
ts = init_ts
while True:
# In the current version of `ya-aioclient` the method `get_invoice_events`
# incorrectly accepts `timeout` parameter, while the server uses `pollTimeout`
# events = await api.get_invoice_events(poll_timeout=5, after_timestamp=ts)
events = await api.get_invoice_events(after_timestamp=ts)
events = []
async with SuppressedExceptions(is_intermittent_error):
events = await self._api.get_invoice_events(after_timestamp=ts)
for ev in events:
logger.debug("Received invoice event: %r, type: %s", ev, ev.__class__)
if isinstance(ev, yap.InvoiceReceivedEvent):
Expand All @@ -235,7 +243,9 @@ def incoming_debit_notes(self) -> AsyncIterator[DebitNote]:
async def fetch(init_ts: datetime):
ts = init_ts
while True:
events = await self._api.get_debit_note_events(after_timestamp=ts)
events = []
async with SuppressedExceptions(is_intermittent_error):
events = await self._api.get_debit_note_events(after_timestamp=ts)
for ev in events:
logger.debug("Received debit note event: %r, type: %s", ev, ev.__class__)
if isinstance(ev, yap.DebitNoteReceivedEvent):
Expand Down