Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry GetExecBatchResults on ApiExceptions caused by GSB Errors #588

Merged
merged 10 commits into from
Aug 11, 2021
78 changes: 58 additions & 20 deletions yapapi/rest/activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
)

from yapapi import events
from yapapi.rest.common import is_intermittent_error, SuppressedExceptions


_log = logging.getLogger("yapapi.rest")

Expand Down Expand Up @@ -73,8 +75,8 @@ async def send(self, script: List[dict], deadline: Optional[datetime] = None) ->
batch_id = await self._api.call_exec(self._id, yaa.ExeScriptRequest(text=script_txt))

if self._stream_events:
return StreamingBatch(self._api, self._id, batch_id, len(script), deadline)
return PollingBatch(self._api, self._id, batch_id, len(script), deadline)
return StreamingBatch(self, batch_id, len(script), deadline)
return PollingBatch(self, batch_id, len(script), deadline)

async def __aenter__(self) -> "Activity":
return self
Expand Down Expand Up @@ -146,22 +148,19 @@ class BatchTimeoutError(BatchError):
class Batch(abc.ABC, AsyncIterable[events.CommandEventContext]):
"""Abstract base class for iterating over events related to a batch running on provider."""

_api: RequestorControlApi
_activity_id: str
_activity: Activity
_batch_id: str
_size: int
_deadline: datetime

def __init__(
self,
api: RequestorControlApi,
activity_id: str,
activity: Activity,
batch_id: str,
batch_size: int,
deadline: Optional[datetime] = None,
) -> None:
self._api = api
self._activity_id = activity_id
self._activity = activity
self._batch_id = batch_id
self._size = batch_size
self._deadline = (
Expand All @@ -182,22 +181,61 @@ def id(self):
class PollingBatch(Batch):
"""A `Batch` implementation that polls the server repeatedly for command status."""

async def _activity_terminated(self) -> bool:
filipgolem marked this conversation as resolved.
Show resolved Hide resolved
"""Check if the activity we're using is in "Terminated" state."""
try:
state_list = await self._activity.state().state # type: ignore
return "Terminated" in state_list
except Exception:
_log.debug("Cannot query activity state", exc_info=True)
return False

def _is_endpoint_not_found_error(self, err: ApiException) -> bool:
"""Check if `err` is caused by "Endpoint address not found" GSB error."""

if err.status != 500:
return False
try:
msg = json.loads(err.body)["message"]
return "GSB error" in msg and "Endpoint address not found" in msg
filipgolem marked this conversation as resolved.
Show resolved Hide resolved
except Exception:
_log.debug("Cannot read error message from ApiException", exc_info=True)
return False

async def _get_results(self, timeout: float, num_tries: int, delay: float = 3.0):
"""Call GetExecBatchResults with re-trying on "Endpoint address not found" GSB error."""

while num_tries:
try:
results = await self._activity._api.get_exec_batch_results(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This conflicts with changes in #548 , which should be merged first.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#548 was a PR to master, and the current PR is to b0.6. When we will merge changes in b0.6 to master we'll have to merge those two sets of changes, but no need to worry about this now. I don't think we plan to backport #548 to b0.6, do we?

Copy link
Contributor

@filipgolem filipgolem Aug 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A similar PR will be a part of the next yajsapi release. IMO it would be great to backport #548.

self._activity._id, self._batch_id, _request_timeout=min(timeout, 5)
)
return results
except ApiException as err:
if await self._activity_terminated():
_log.debug("Activity %s terminated by provider", self._activity._id)
filipgolem marked this conversation as resolved.
Show resolved Hide resolved
# TODO: add and use a new Exception class (subclass of BatchError)
# to indicate closing the activity by the provider
raise err
if not self._is_endpoint_not_found_error(err):
raise err
num_tries -= 1
if num_tries:
_log.debug("Retrying ")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the message was not finished.

Copy link
Contributor Author

@azawlocki azawlocki Aug 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed! Thanks, fixed in c6356d9.

await asyncio.sleep(delay)

async def __aiter__(self) -> AsyncIterator[events.CommandEventContext]:
last_idx = 0

while last_idx < self._size:
timeout = self.seconds_left()
if timeout <= 0:
raise BatchTimeoutError()
try:
results: List[yaa.ExeScriptCommandResult] = await self._api.get_exec_batch_results(
self._activity_id, self._batch_id, _request_timeout=min(timeout, 5)
)
except asyncio.TimeoutError:
continue
except ApiException as err:
if err.status == 408:
continue
raise

results: List[yaa.ExeScriptCommandResult] = []
async with SuppressedExceptions(is_intermittent_error):
results = await self._get_results(timeout=min(timeout, 5), num_tries=3)

any_new: bool = False
results = results[last_idx:]
for result in results:
Expand Down Expand Up @@ -227,13 +265,13 @@ class StreamingBatch(Batch):
async def __aiter__(self) -> AsyncIterator[events.CommandEventContext]:
from aiohttp_sse_client import client as sse_client # type: ignore

api_client = self._api.api_client
api_client = self._activity._api.api_client
host = api_client.configuration.host
headers = api_client.default_headers

api_client.update_params_for_auth(headers, None, ["app_key"])

activity_id = self._activity_id
activity_id = self._activity._id
batch_id = self._batch_id
last_idx = self._size - 1

Expand Down