-
Notifications
You must be signed in to change notification settings - Fork 167
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve Matchmaking finalizers #357
Conversation
Codecov Report
@@ Coverage Diff @@
## master #357 +/- ##
==========================================
- Coverage 83.43% 83.41% -0.03%
==========================================
Files 66 66
Lines 6109 6095 -14
==========================================
- Hits 5097 5084 -13
+ Misses 1012 1011 -1
|
@@ -521,8 +512,6 @@ async def _declare_averager_periodically(self, key_manager: GroupKeyManager): | |||
await asyncio.sleep(self.declared_expiration_time - get_dht_time()) | |||
if self.running.is_set() and len(self.leader_queue) == 0: | |||
await key_manager.update_key_on_not_enough_peers() | |||
except (concurrent.futures.CancelledError, asyncio.CancelledError): | |||
pass # note: this is a compatibility layer for python3.7 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[non-blocking]
Q: are you certain that we no longer need to handle concurrent.futures.CancelledError here or is it an educated guess?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have edited this PR to only remove the except
statements in _update_queue_periodically()
and _declare_averager_periodically()
methods.
These methods are only awaited in cancel_and_wait()
that handles both cancel and normal errors by itself.
2c93456
to
f670f40
Compare
@@ -477,37 +475,31 @@ def request_expiration_time(self) -> float: | |||
else: | |||
return min(get_dht_time() + self.averaging_expiration, self.search_end_time) | |||
|
|||
async def _update_queue_periodically(self, key_manager: GroupKeyManager): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code is identical besides removing the try-except
block (rationale is explained in this comment).
@@ -127,10 +127,9 @@ async def look_for_group(self, *, data_for_gather: bytes, timeout: Optional[floa | |||
raise | |||
|
|||
finally: | |||
if not request_leaders_task.done(): | |||
request_leaders_task.cancel() | |||
if not self.assembled_group.done(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.cancel()
just returns False when the awaitable is done.
declare_averager_task.cancel() | ||
await cancel_and_wait(update_queue_task) | ||
if declare: | ||
await cancel_and_wait(declare_averager_task) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using cancel_and_wait()
here resolves this frequent warning:
Task was destroyed but it is pending!
task: <Task pending name='Task-11' coro=<PotentialLeaders._declare_averager_periodically() done, defined at /home/jheuristic/Documents/exp/hivemind/hivemind/averaging/matchmaking.py:513> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7efcec662fd0>()]>>
0772c5c
to
05c8030
Compare
@@ -261,18 +261,11 @@ def run_coroutine( | |||
async def _run_coroutine( | |||
self, coro: Callable[[DHT, DHTNode], Awaitable[ReturnType]], future: MPFuture[ReturnType] | |||
): | |||
main_task = asyncio.create_task(coro(self, self._node)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cancels here did not work since the new MPFuture implementation does not support asyncio await
s in a child process.
They produced the following exception that was accidentally suppressed in await_cancelled()
:
Traceback (most recent call last):
File "/home/borzunov/hivemind/hivemind/utils/asyncio.py", line 83, in await_cancelled
await awaitable
File "/home/borzunov/hivemind/hivemind/utils/mpfuture.py", line 284, in __await__
raise RuntimeError("Can't await: MPFuture was created with no event loop")
RuntimeError: Can't await: MPFuture was created with no event loop
This PR removes them since they are not used, as discussed with @justheuristic.
This PR resolves the frequent warning below, as well as a number of other finalizer issues.
Discussion:
begin_search()
untildeclare_averager(..., looking_for_group=False)
is finished. While it is idiomatic for asyncio, this makes averaging a little longer. Is it a desired behavior? Or maybe we should just suppress theTask was destroyed but it is pending!
warning instead? Or maybe add to an averager-wide pool of background tasks?