Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor FlowControlDataQueue to improve performances #9659

Merged
merged 11 commits into from
Nov 4, 2024
29 changes: 21 additions & 8 deletions aiohttp/streams.py
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As DataQueue doesn't have _size anymore, the Generic should not require Sized anymore. Also, probably want to have WSMessage queues move to that, as they don't need the _size calculations.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_SizedT is in this case WSMessage .... I think could be changed to _T

FlowControlDataQueue is only used by the WebSocket? Does the WebSocket not need to pause the protocol when the limit is hit?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah. That logic can simply be removed then. I thought I remembered it being used in the parser or something with bytes, where it tracked the size of the data stream..

Original file line number Diff line number Diff line change
Expand Up @@ -667,11 +667,9 @@ async def read(self) -> _SizedT:
data = self._buffer.popleft()
self._size -= len(data)
return data
else:
if self._exception is not None:
raise self._exception
else:
raise EofStream
if self._exception is not None:
raise self._exception
Fixed Show fixed Hide fixed
raise EofStream

def __aiter__(self) -> AsyncStreamIterator[_SizedT]:
return AsyncStreamIterator(self.read)
Expand All @@ -698,8 +696,23 @@ def feed_data(self, data: _SizedT) -> None:
self._protocol.pause_reading()

async def read(self) -> _SizedT:
try:
return await super().read()
bdraco marked this conversation as resolved.
Show resolved Hide resolved
finally:
if not self._buffer and not self._eof:
assert not self._waiter
self._waiter = self._loop.create_future()
try:
await self._waiter
except (asyncio.CancelledError, asyncio.TimeoutError):
self._waiter = None
raise

if self._buffer:
data = self._buffer.popleft()
self._size -= len(data)
if self._size < self._limit and self._protocol._reading_paused:
self._protocol.resume_reading()
return data

if self._exception is not None:
raise self._exception

raise EofStream
Loading