Skip to content

Commit

Permalink
Catch warnings in inactive tg test and update docs
Browse files Browse the repository at this point in the history
  • Loading branch information
Jason-Y-Z committed Mar 5, 2024
1 parent 19c8bea commit 5d0d518
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 7 deletions.
3 changes: 3 additions & 0 deletions Doc/library/asyncio-task.rst
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,9 @@ and reliable way to wait for all tasks in the group to finish.

Create a task in this task group.
The signature matches that of :func:`asyncio.create_task`.
If the task group is inactive (e.g. not yet entered,
already finished, or in the process of shutting down),
we will close the given ``coro``.

.. versionchanged:: 3.13

Expand Down
3 changes: 2 additions & 1 deletion Doc/whatsnew/3.13.rst
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,8 @@ Other Language Changes

* When :func:`asyncio.TaskGroup.create_task` is called on an inactive
:class:`asyncio.TaskGroup`, the given coroutine will be closed (which
prevents a :exc:`RuntimeWarning`).
prevents a :exc:`RuntimeWarning` about the given coroutine being
never awaited).

(Contributed by Arthur Tacca and Jason Zhang in :gh:`115957`.)

Expand Down
20 changes: 14 additions & 6 deletions Lib/test/test_asyncio/test_taskgroups.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import contextlib
from asyncio import taskgroups
import unittest
import warnings

from test.test_asyncio.utils import await_without_task

Expand Down Expand Up @@ -794,12 +795,19 @@ async def test_taskgroup_double_enter(self):
pass

async def test_taskgroup_finished(self):
tg = taskgroups.TaskGroup()
async with tg:
pass
coro = asyncio.sleep(0)
with self.assertRaisesRegex(RuntimeError, "is finished"):
tg.create_task(coro)
async def create_task_after_tg_finish():
tg = taskgroups.TaskGroup()
async with tg:
pass
coro = asyncio.sleep(0)
with self.assertRaisesRegex(RuntimeError, "is finished"):
tg.create_task(coro)

# Make sure the coroutine was closed when submitted to the inactive tg
# (if not closed, a RuntimeWarning should have been raised)
with warnings.catch_warnings(record=True) as w:
await create_task_after_tg_finish()
self.assertEqual(len(w), 0)

async def test_taskgroup_not_entered(self):
tg = taskgroups.TaskGroup()
Expand Down

0 comments on commit 5d0d518

Please sign in to comment.