diff --git a/CHANGES/9963.bugfix.rst b/CHANGES/9963.bugfix.rst new file mode 100644 index 00000000000..0c05ebab24f --- /dev/null +++ b/CHANGES/9963.bugfix.rst @@ -0,0 +1,3 @@ +Restored the ``FlowControlDataQueue`` class -- by :user:`bdraco`. + +This class is no longer used internally, and will be permanently removed in the next major version. diff --git a/aiohttp/__init__.py b/aiohttp/__init__.py index 4bac155c9d6..741b93a9b37 100644 --- a/aiohttp/__init__.py +++ b/aiohttp/__init__.py @@ -93,6 +93,7 @@ EMPTY_PAYLOAD as EMPTY_PAYLOAD, DataQueue as DataQueue, EofStream as EofStream, + FlowControlDataQueue as FlowControlDataQueue, StreamReader as StreamReader, ) from .tracing import ( @@ -148,6 +149,7 @@ "ConnectionTimeoutError", "ContentTypeError", "Fingerprint", + "FlowControlDataQueue", "InvalidURL", "InvalidUrlClientError", "InvalidUrlRedirectClientError", diff --git a/aiohttp/streams.py b/aiohttp/streams.py index 59aa1dd0c3b..b97846171b1 100644 --- a/aiohttp/streams.py +++ b/aiohttp/streams.py @@ -677,3 +677,46 @@ async def read(self) -> _T: def __aiter__(self) -> AsyncStreamIterator[_T]: return AsyncStreamIterator(self.read) + + +class FlowControlDataQueue(DataQueue[_T]): + """FlowControlDataQueue resumes and pauses an underlying stream. + + It is a destination for parsed data. + + This class is deprecated and will be removed in version 4.0. + """ + + def __init__( + self, protocol: BaseProtocol, limit: int, *, loop: asyncio.AbstractEventLoop + ) -> None: + super().__init__(loop=loop) + self._size = 0 + self._protocol = protocol + self._limit = limit * 2 + + def feed_data(self, data: _T, size: int = 0) -> None: + super().feed_data(data, size) + self._size += size + + if self._size > self._limit and not self._protocol._reading_paused: + self._protocol.pause_reading() + + async def read(self) -> _T: + if not self._buffer and not self._eof: + assert not self._waiter + self._waiter = self._loop.create_future() + try: + await self._waiter + except (asyncio.CancelledError, asyncio.TimeoutError): + self._waiter = None + raise + if self._buffer: + data, size = self._buffer.popleft() + self._size -= size + if self._size < self._limit and self._protocol._reading_paused: + self._protocol.resume_reading() + return data + if self._exception is not None: + raise self._exception + raise EofStream diff --git a/tests/test_flowcontrol_streams.py b/tests/test_flowcontrol_streams.py index 08f6be21a2c..68e623b6dd7 100644 --- a/tests/test_flowcontrol_streams.py +++ b/tests/test_flowcontrol_streams.py @@ -1,3 +1,4 @@ +import asyncio from unittest import mock import pytest @@ -15,6 +16,13 @@ def stream(loop, protocol): return streams.StreamReader(protocol, limit=1, loop=loop) +@pytest.fixture +def buffer(loop, protocol: mock.Mock) -> streams.FlowControlDataQueue: + out = streams.FlowControlDataQueue(protocol, limit=1, loop=loop) + out._allow_pause = True + return out + + class TestFlowControlStreamReader: async def test_read(self, stream) -> None: stream.feed_data(b"da", 2) @@ -103,3 +111,72 @@ async def test_read_nowait(self, stream) -> None: res = stream.read_nowait(5) assert res == b"" assert stream._protocol.resume_reading.call_count == 1 # type: ignore[attr-defined] + + +async def test_flow_control_data_queue_waiter_cancelled( + buffer: streams.FlowControlDataQueue, +) -> None: + """Test that the waiter is cancelled it is cleared.""" + task = asyncio.create_task(buffer.read()) + await asyncio.sleep(0) + assert buffer._waiter is not None + buffer._waiter.cancel() + + with pytest.raises(asyncio.CancelledError): + await task + assert buffer._waiter is None + + +async def test_flow_control_data_queue_has_buffer( + buffer: streams.FlowControlDataQueue, +) -> None: + """Test reading from the buffer.""" + data = object() + buffer.feed_data(data, 100) + assert buffer._size == 100 + read_data = await buffer.read() + assert read_data is data + assert buffer._size == 0 + + +async def test_flow_control_data_queue_read_with_exception( + buffer: streams.FlowControlDataQueue, +) -> None: + """Test reading when the buffer is empty and an exception is set.""" + buffer.set_exception(ValueError("unique_string")) + with pytest.raises(ValueError, match="unique_string"): + await buffer.read() + + +def test_flow_control_data_queue_feed_pause( + buffer: streams.FlowControlDataQueue, +) -> None: + """Test feeding data and pausing the reader.""" + buffer._protocol._reading_paused = False + buffer.feed_data(object(), 100) + assert buffer._protocol.pause_reading.called + + buffer._protocol._reading_paused = True + buffer._protocol.pause_reading.reset_mock() + buffer.feed_data(object(), 100) + assert not buffer._protocol.pause_reading.called + + +async def test_flow_control_data_queue_resume_on_read( + buffer: streams.FlowControlDataQueue, +) -> None: + """Test that the reader is resumed when reading.""" + buffer.feed_data(object(), 100) + + buffer._protocol._reading_paused = True + await buffer.read() + assert buffer._protocol.resume_reading.called + + +async def test_flow_control_data_queue_read_eof( + buffer: streams.FlowControlDataQueue, +) -> None: + """Test that reading after eof raises EofStream.""" + buffer.feed_eof() + with pytest.raises(streams.EofStream): + await buffer.read()