-
Notifications
You must be signed in to change notification settings - Fork 137
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixed memory object stream sometimes dropping sent items #735
Merged
Merged
Changes from all commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
535bcb5
Fixed memory object stream sometimes dropping sent items
agronholm 03eb540
Added changelog note and docstring for has_pending_cancellation()
agronholm 9a449cb
Merge branch 'master' into fix-memobjectstream
agronholm 32270b0
Fixed pyright error
agronholm 55aaa39
Added test for native cancellation
agronholm 98b4fb1
Fixed test failures on py < 3.11
agronholm b61c9a7
Reverted wording change (list -> sequence)
agronholm 94a1e8a
Merge branch 'master' into fix-memobjectstream
agronholm File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,7 +17,7 @@ | |
fail_after, | ||
wait_all_tasks_blocked, | ||
) | ||
from anyio.abc import ObjectReceiveStream, ObjectSendStream | ||
from anyio.abc import ObjectReceiveStream, ObjectSendStream, TaskStatus | ||
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream | ||
|
||
if sys.version_info < (3, 11): | ||
|
@@ -305,28 +305,49 @@ async def test_cancel_during_receive() -> None: | |
stream to be lost. | ||
|
||
""" | ||
receiver_scope = None | ||
|
||
async def scoped_receiver() -> None: | ||
nonlocal receiver_scope | ||
with CancelScope() as receiver_scope: | ||
async def scoped_receiver(task_status: TaskStatus[CancelScope]) -> None: | ||
with CancelScope() as cancel_scope: | ||
task_status.started(cancel_scope) | ||
received.append(await receive.receive()) | ||
|
||
assert receiver_scope.cancel_called | ||
assert cancel_scope.cancel_called | ||
|
||
received: list[str] = [] | ||
send, receive = create_memory_object_stream[str]() | ||
async with create_task_group() as tg: | ||
tg.start_soon(scoped_receiver) | ||
await wait_all_tasks_blocked() | ||
send.send_nowait("hello") | ||
assert receiver_scope is not None | ||
receiver_scope.cancel() | ||
with send, receive: | ||
async with create_task_group() as tg: | ||
receiver_scope = await tg.start(scoped_receiver) | ||
await wait_all_tasks_blocked() | ||
send.send_nowait("hello") | ||
receiver_scope.cancel() | ||
|
||
assert received == ["hello"] | ||
|
||
send.close() | ||
receive.close() | ||
|
||
async def test_cancel_during_receive_buffered() -> None: | ||
""" | ||
Test that sending an item to a memory object stream when the receiver that is next | ||
in line has been cancelled will not result in the item being lost. | ||
""" | ||
|
||
async def scoped_receiver( | ||
receive: MemoryObjectReceiveStream[str], task_status: TaskStatus[CancelScope] | ||
) -> None: | ||
with CancelScope() as cancel_scope: | ||
task_status.started(cancel_scope) | ||
await receive.receive() | ||
|
||
send, receive = create_memory_object_stream[str](1) | ||
with send, receive: | ||
async with create_task_group() as tg: | ||
cancel_scope = await tg.start(scoped_receiver, receive) | ||
await wait_all_tasks_blocked() | ||
cancel_scope.cancel() | ||
send.send_nowait("item") | ||
|
||
Comment on lines
+346
to
+348
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. from comment above: uncancelling that scope is my maybe-deadlock. |
||
# Since the item was not sent to the cancelled task, it should be available here | ||
assert receive.receive_nowait() == "item" | ||
|
||
|
||
async def test_close_receive_after_send() -> None: | ||
|
@@ -455,3 +476,25 @@ async def test_not_closed_warning() -> None: | |
with pytest.warns(ResourceWarning, match="Unclosed <MemoryObjectReceiveStream>"): | ||
del receive | ||
gc.collect() | ||
|
||
|
||
@pytest.mark.parametrize("anyio_backend", ["asyncio"], indirect=True) | ||
async def test_send_to_natively_cancelled_receiver() -> None: | ||
""" | ||
Test that if a task waiting on receive.receive() is cancelled and then another | ||
task sends an item, said item is not delivered to the task with a pending | ||
cancellation, but rather to the next one in line. | ||
|
||
""" | ||
from asyncio import CancelledError, create_task | ||
|
||
send, receive = create_memory_object_stream[str](1) | ||
with send, receive: | ||
receive_task = create_task(receive.receive()) | ||
await wait_all_tasks_blocked() # ensure that the task is waiting to receive | ||
receive_task.cancel() | ||
send.send_nowait("hello") | ||
with pytest.raises(CancelledError): | ||
await receive_task | ||
|
||
assert receive.receive_nowait() == "hello" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suspect that this can deadlock if the receiving task is cancelled, then send_nowait invoked (removing it from the waiting_receivers), and then uncancelled before the event loop actually kicks the receiving task out of
await receive
.Of course this is a pretty nasty use of the already brittle uncancel semantics, but we probably want a comment here to make the possibility salient to future readers? Or maybe I'm wrong about this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was not able to create a situation where an uncancel task would get its foot in the door to prevent the delivery of the cancellation. Here's my best attempt so far:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like I wasn't trying hard enough. I was able to repro the situation using just asyncio's task APIs:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Scratch that too, because the task hadn't even started and that's why the sender was blocking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, here's my third attempt which seems to confirm that the cancellation is scheduled ahead of everything else:
Output:
Interestingly, if I cancel the receiver task and uncancel it right after without yielding to another task, I get different results based on the Python version. On 3.13.0b1, the task is scheduled normally and receives the item:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose this is the effect of Guido's recent cancellation PR.