diff --git a/CHANGES/5854.bugfix b/CHANGES/5854.bugfix new file mode 100644 index 00000000000..b7de2f4d232 --- /dev/null +++ b/CHANGES/5854.bugfix @@ -0,0 +1 @@ +Fixed client timeout not working when incoming data is always available without waiting -- by :user:`Dreamsorcerer`. diff --git a/aiohttp/helpers.py b/aiohttp/helpers.py index 431bd67dd14..660f7cf900c 100644 --- a/aiohttp/helpers.py +++ b/aiohttp/helpers.py @@ -676,7 +676,8 @@ def __call__(self) -> None: class BaseTimerContext(ContextManager["BaseTimerContext"]): - pass + def assert_timeout(self) -> None: + """Raise TimeoutError if timeout has been exceeded.""" class TimerNoop(BaseTimerContext): @@ -700,6 +701,11 @@ def __init__(self, loop: asyncio.AbstractEventLoop) -> None: self._tasks: List[asyncio.Task[Any]] = [] self._cancelled = False + def assert_timeout(self) -> None: + """Raise TimeoutError if timer has already been cancelled.""" + if self._cancelled: + raise asyncio.TimeoutError from None + def __enter__(self) -> BaseTimerContext: task = asyncio.current_task(loop=self._loop) diff --git a/aiohttp/streams.py b/aiohttp/streams.py index 259609209ae..1a2c5147fc3 100644 --- a/aiohttp/streams.py +++ b/aiohttp/streams.py @@ -6,7 +6,7 @@ from typing_extensions import Final from .base_protocol import BaseProtocol -from .helpers import BaseTimerContext, set_exception, set_result +from .helpers import BaseTimerContext, TimerNoop, set_exception, set_result from .log import internal_logger try: # pragma: no cover @@ -122,7 +122,7 @@ def __init__( self._waiter: Optional[asyncio.Future[None]] = None self._eof_waiter: Optional[asyncio.Future[None]] = None self._exception: Optional[BaseException] = None - self._timer = timer + self._timer = TimerNoop() if timer is None else timer self._eof_callbacks: List[Callable[[], None]] = [] def __repr__(self) -> str: @@ -297,10 +297,7 @@ async def _wait(self, func_name: str) -> None: waiter = self._waiter = self._loop.create_future() try: - if self._timer: - with self._timer: - await waiter - else: + with self._timer: await waiter finally: self._waiter = None @@ -477,8 +474,9 @@ def _read_nowait_chunk(self, n: int) -> bytes: def _read_nowait(self, n: int) -> bytes: """Read not more than n bytes, or whole buffer if n == -1""" - chunks = [] + self._timer.assert_timeout() + chunks = [] while self._buffer: chunk = self._read_nowait_chunk(n) chunks.append(chunk) diff --git a/tests/test_client_functional.py b/tests/test_client_functional.py index 84ee9cb8c3a..4edc00483cb 100644 --- a/tests/test_client_functional.py +++ b/tests/test_client_functional.py @@ -3022,6 +3022,30 @@ async def handler(request): await resp.read() +async def test_timeout_with_full_buffer(aiohttp_client: Any) -> None: + async def handler(request): + """Server response that never ends and always has more data available.""" + resp = web.StreamResponse() + await resp.prepare(request) + while True: + await resp.write(b"1" * 1000) + await asyncio.sleep(0.01) + + async def request(client): + timeout = aiohttp.ClientTimeout(total=0.5) + async with await client.get("/", timeout=timeout) as resp: + with pytest.raises(asyncio.TimeoutError): + async for data in resp.content.iter_chunked(1): + await asyncio.sleep(0.01) + + app = web.Application() + app.add_routes([web.get("/", handler)]) + + client = await aiohttp_client(app) + # wait_for() used just to ensure that a failing test doesn't hang. + await asyncio.wait_for(request(client), 1) + + async def test_read_bufsize_session_default(aiohttp_client: Any) -> None: async def handler(request): return web.Response(body=b"1234567")