Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix the PollingBatch with the current version of yagna #302

Merged
merged 10 commits into from
Apr 7, 2021
8 changes: 5 additions & 3 deletions yapapi/rest/activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,11 +167,13 @@ async def __aiter__(self) -> AsyncIterator[events.CommandEventContext]:
raise BatchTimeoutError()
try:
results: List[yaa.ExeScriptCommandResult] = await self._api.get_exec_batch_results(
self._activity_id, self._batch_id, timeout=timeout
self._activity_id, self._batch_id, _request_timeout=min(timeout, 5)
)
except asyncio.TimeoutError:
continue
except ApiException as err:
Copy link
Contributor

@azawlocki azawlocki Apr 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In addition to ApiException we should also catch asyncio.TimeoutError here and raise BatchTimeoutError instead.

Copy link
Contributor

@azawlocki azawlocki Apr 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would also shorten the amount of sleep when no new results are available, at the end of this method. It's now 10 seconds:

if not any_new:
delay = min(10, max(0, self.seconds_left()))
await asyncio.sleep(delay)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's three seconds in yajsapi.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In addition to ApiException we should also catch asyncio.TimeoutError here and raise BatchTimeoutError instead.

To avoid (tcp) connection closed errors, I limited maximum timeout to 5 seconds and added continue when timeouts happen.

if err.status == 408:
raise BatchTimeoutError()
continue
raise
any_new: bool = False
results = results[last_idx:]
Expand All @@ -194,7 +196,7 @@ async def __aiter__(self) -> AsyncIterator[events.CommandEventContext]:
if result.is_batch_finished:
break
if not any_new:
delay = min(10, max(0, self.seconds_left()))
delay = min(3, max(0, self.seconds_left()))
await asyncio.sleep(delay)


Expand Down