Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix resolve_host "Task was destroyed but it is pending" errors #8967

Merged
merged 6 commits into from
Sep 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES/8967.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed resolve_host() 'Task was destroyed but is pending' errors -- by :user:`Dreamsorcerer`.
20 changes: 15 additions & 5 deletions aiohttp/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from itertools import cycle, islice
from time import monotonic
from types import TracebackType
from typing import ( # noqa
from typing import (
TYPE_CHECKING,
Any,
Awaitable,
Expand Down Expand Up @@ -404,8 +404,8 @@ async def close(self) -> None:
err_msg = "Error while closing connector: " + repr(res)
logging.error(err_msg)

def _close_immediately(self) -> List["asyncio.Future[None]"]:
waiters: List["asyncio.Future[None]"] = []
def _close_immediately(self) -> List[Awaitable[object]]:
waiters: List[Awaitable[object]] = []

if self._closed:
return waiters
Expand Down Expand Up @@ -805,11 +805,19 @@ def __init__(
self._local_addr_infos = aiohappyeyeballs.addr_to_addr_infos(local_addr)
self._happy_eyeballs_delay = happy_eyeballs_delay
self._interleave = interleave
self._resolve_host_tasks: Set["asyncio.Task[List[ResolveResult]]"] = set()

def _close_immediately(self) -> List["asyncio.Future[None]"]:
def _close_immediately(self) -> List[Awaitable[object]]:
for ev in self._throttle_dns_events.values():
ev.cancel()
return super()._close_immediately()

waiters = super()._close_immediately()

for t in self._resolve_host_tasks:
t.cancel()
bdraco marked this conversation as resolved.
Show resolved Hide resolved
waiters.append(t)

return waiters

@property
def family(self) -> int:
Expand Down Expand Up @@ -885,6 +893,8 @@ async def _resolve_host(
resolved_host_task = asyncio.create_task(
bdraco marked this conversation as resolved.
Show resolved Hide resolved
self._resolve_host_with_throttle(key, host, port, traces)
)
self._resolve_host_tasks.add(resolved_host_task)
resolved_host_task.add_done_callback(self._resolve_host_tasks.discard)
try:
return await asyncio.shield(resolved_host_task)
except asyncio.CancelledError:
Expand Down
35 changes: 34 additions & 1 deletion tests/test_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import sys
import uuid
from collections import deque
from contextlib import closing
from contextlib import closing, suppress
from typing import (
Awaitable,
Callable,
Expand Down Expand Up @@ -1839,6 +1839,39 @@ async def test_close_cancels_cleanup_handle(
assert conn._cleanup_handle is None


async def test_close_cancels_resolve_host(loop: asyncio.AbstractEventLoop) -> None:
cancelled = False

async def delay_resolve_host(*args: object) -> None:
"""Delay _resolve_host() task in order to test cancellation."""
nonlocal cancelled
try:
await asyncio.sleep(10)
except asyncio.CancelledError:
cancelled = True
raise

conn = aiohttp.TCPConnector()
req = ClientRequest(
"GET", URL("http://localhost:80"), loop=loop, response_class=mock.Mock()
)
with mock.patch.object(conn, "_resolve_host_with_throttle", delay_resolve_host):
t = asyncio.create_task(conn.connect(req, [], ClientTimeout()))
# Let it create the internal task
await asyncio.sleep(0)
# Let that task start running
await asyncio.sleep(0)

# We now have a task being tracked and can ensure that .close() cancels it.
assert len(conn._resolve_host_tasks) == 1
await conn.close()
assert cancelled
assert len(conn._resolve_host_tasks) == 0

with suppress(asyncio.CancelledError):
await t
Dismissed Show dismissed Hide dismissed


async def test_close_abort_closed_transports(loop: asyncio.AbstractEventLoop) -> None:
tr = mock.Mock()

Expand Down
Loading