-
Notifications
You must be signed in to change notification settings - Fork 137
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixed memory object stream sometimes dropping sent items #735
Conversation
Check if the receiving task has a pending cancellation before sending an item. Fixes #728.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good to me, modulo the single concern about a possible uncancel edge case.
I'm not sure whether that even can happen, but it seems worth checking. If it can, I'd probably just add a code comment + note in the docs and merge anyway.
src/anyio/_core/_testing.py
Outdated
:return: a list of task info objects | ||
:return: a sequence of task info objects |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we want this to be a general Sequence
, let's also update the type annotation above and cast below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That was my first thought too, but:
- it is factually returning a list
- a Sequence (or even a MutableSequence) would narrow the return type, thus breaking backwards compatibility, albeit only from the type checker standpoint.
I see no ideal solution here, so I chose the least disruptive way forward. Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's just say "list" instead of "sequence" here in the docstring.
As a broader comment, I tend to treat type annotations as documentation rather than a stable API, and thus don't mind updating them in minor versions if that otherwise makes sense. But continuing to use list
here seems best anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've reverted the wording.
while self._state.waiting_receivers: | ||
receive_event, receiver = self._state.waiting_receivers.popitem(last=False) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suspect that this can deadlock if the receiving task is cancelled, then send_nowait invoked (removing it from the waiting_receivers), and then uncancelled before the event loop actually kicks the receiving task out of await receive
.
Of course this is a pretty nasty use of the already brittle uncancel semantics, but we probably want a comment here to make the possibility salient to future readers? Or maybe I'm wrong about this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was not able to create a situation where an uncancel task would get its foot in the door to prevent the delivery of the cancellation. Here's my best attempt so far:
import asyncio
from asyncio import CancelledError, current_task, get_running_loop
from anyio import create_memory_object_stream, create_task_group
async def receiver(receive, task_status):
with receive:
task_status.started(current_task())
try:
item = await receive.receive()
except CancelledError:
print("cancelled")
raise
print("got", item)
async def main():
send, receive = create_memory_object_stream()
async with create_task_group() as tg:
task = await tg.start(receiver, receive)
get_running_loop().call_soon(task.uncancel)
task.cancel()
await send.send(6)
asyncio.run(main())
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like I wasn't trying hard enough. I was able to repro the situation using just asyncio's task APIs:
import asyncio
from asyncio import CancelledError, create_task
from anyio import create_memory_object_stream
async def receiver(receive):
with receive:
try:
item = await receive.receive()
except CancelledError:
print("cancelled")
raise
print("got", item)
async def uncanceller(task):
task.uncancel()
print("uncancelled, cancelling =", task.cancelling())
async def main():
send, receive = create_memory_object_stream()
task = create_task(receiver(receive))
uncancel_task = create_task(uncanceller(task))
task.cancel()
await send.send(6)
asyncio.run(main())
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Scratch that too, because the task hadn't even started and that's why the sender was blocking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, here's my third attempt which seems to confirm that the cancellation is scheduled ahead of everything else:
import asyncio
from asyncio import CancelledError, create_task, sleep
from anyio import create_memory_object_stream
async def receiver(receive):
with receive:
print("waiting for item")
try:
item = await receive.receive()
except CancelledError:
print("cancelled")
else:
print("got", item)
async def uncanceller(task):
task.uncancel()
print("uncancelled, cancelling =", task.cancelling())
async def main():
send, receive = create_memory_object_stream(1)
receive_task = create_task(receiver(receive))
await sleep(0)
uncancel_task = create_task(uncanceller(receive_task))
receive_task.cancel()
await send.send(6)
await uncancel_task
await receive_task
asyncio.run(main())
Output:
waiting for item
cancelled
uncancelled, cancelling = 0
Traceback (most recent call last):
...
Interestingly, if I cancel the receiver task and uncancel it right after without yielding to another task, I get different results based on the Python version. On 3.13.0b1, the task is scheduled normally and receives the item:
import asyncio
from asyncio import CancelledError, create_task, sleep
from anyio import create_memory_object_stream
async def receiver(receive):
with receive:
print("waiting for item")
try:
item = await receive.receive()
except CancelledError:
print("cancelled")
else:
print("got", item)
async def main():
send, receive = create_memory_object_stream()
receive_task = create_task(receiver(receive))
await sleep(0)
receive_task.cancel()
receive_task.uncancel()
await send.send(6)
await receive_task
asyncio.run(main())
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose this is the effect of Guido's recent cancellation PR.
cancel_scope.cancel() | ||
send.send_nowait("item") | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from comment above: uncancelling that scope is my maybe-deadlock.
I've now added an asyncio-only test that ensures that a native cancellation won't cause an item to be dropped. I've verified that the test fails on current |
Huh, now it's failing on py < 3.11. I need to investigate. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Native-cancellation handling looks good. I'm looking forward to using this!
src/anyio/_core/_testing.py
Outdated
:return: a list of task info objects | ||
:return: a sequence of task info objects |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's just say "list" instead of "sequence" here in the docstring.
As a broader comment, I tend to treat type annotations as documentation rather than a stable API, and thus don't mind updating them in minor versions if that otherwise makes sense. But continuing to use list
here seems best anyway.
Changes
Fixes #728.
Checklist
If this is a user-facing code change, like a bugfix or a new feature, please ensure that
you've fulfilled the following conditions (where applicable):
tests/
) added which would fail without your patchdocs/
, in case of behavior changes or newfeatures)
docs/versionhistory.rst
).If this is a trivial change, like a typo fix or a code reformatting, then you can ignore
these instructions.
Updating the changelog
If there are no entries after the last release, use
**UNRELEASED**
as the version.If, say, your patch fixes issue #123, the entry should look like this:
* Fix big bad boo-boo in task groups (#123 <https://github.com/agronholm/anyio/issues/123>_; PR by Yourname)
If there's no issue linked, just link to your pull request instead by updating the
changelog after you've created the PR.
If possible, use your real name in the changelog entry. If not, use your GitHub
username.