Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor FlowControlDataQueue to improve performances #9659

Merged
merged 11 commits into from
Nov 4, 2024
Merged
1 change: 1 addition & 0 deletions CHANGES/9659.misc.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improved performance of the internal ``DataQueue`` -- by :user:`bdraco`.
65 changes: 32 additions & 33 deletions aiohttp/streams.py
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As DataQueue doesn't have _size anymore, the Generic should not require Sized anymore. Also, probably want to have WSMessage queues move to that, as they don't need the _size calculations.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_SizedT is in this case WSMessage .... I think could be changed to _T

FlowControlDataQueue is only used by the WebSocket? Does the WebSocket not need to pause the protocol when the limit is hit?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah. That logic can simply be removed then. I thought I remembered it being used in the parser or something with bytes, where it tracked the size of the data stream..

Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,6 @@ def __init__(self, loop: asyncio.AbstractEventLoop) -> None:
self._eof = False
self._waiter: Optional[asyncio.Future[None]] = None
self._exception: Union[Type[BaseException], BaseException, None] = None
self._size = 0
self._buffer: Deque[_SizedT] = collections.deque()

def __len__(self) -> int:
Expand All @@ -630,48 +629,39 @@ def set_exception(
) -> None:
self._eof = True
self._exception = exc

waiter = self._waiter
if waiter is not None:
if (waiter := self._waiter) is not None:
self._waiter = None
set_exception(waiter, exc, exc_cause)

def feed_data(self, data: _SizedT) -> None:
self._size += len(data)
self._buffer.append(data)

waiter = self._waiter
if waiter is not None:
if (waiter := self._waiter) is not None:
self._waiter = None
set_result(waiter, None)

def feed_eof(self) -> None:
self._eof = True

waiter = self._waiter
if waiter is not None:
if (waiter := self._waiter) is not None:
self._waiter = None
set_result(waiter, None)

async def _wait_for_data(self) -> None:
assert not self._waiter
self._waiter = self._loop.create_future()
try:
await self._waiter
except (asyncio.CancelledError, asyncio.TimeoutError):
self._waiter = None
raise

async def read(self) -> _SizedT:
if not self._buffer and not self._eof:
assert not self._waiter
self._waiter = self._loop.create_future()
try:
await self._waiter
except (asyncio.CancelledError, asyncio.TimeoutError):
self._waiter = None
raise

await self._wait_for_data()
if self._buffer:
data = self._buffer.popleft()
self._size -= len(data)
return data
else:
if self._exception is not None:
raise self._exception
else:
raise EofStream
return self._buffer.popleft()
if self._exception is not None:
raise self._exception
Fixed Show fixed Hide fixed
raise EofStream

def __aiter__(self) -> AsyncStreamIterator[_SizedT]:
return AsyncStreamIterator(self.read)
Expand All @@ -687,19 +677,28 @@ def __init__(
self, protocol: BaseProtocol, limit: int, *, loop: asyncio.AbstractEventLoop
) -> None:
super().__init__(loop=loop)

self._size = 0
self._protocol = protocol
self._limit = limit * 2

def feed_data(self, data: _SizedT) -> None:
super().feed_data(data)

self._size += len(data)
self._buffer.append(data)
if (waiter := self._waiter) is not None:
self._waiter = None
set_result(waiter, None)
if self._size > self._limit and not self._protocol._reading_paused:
self._protocol.pause_reading()

async def read(self) -> _SizedT:
try:
return await super().read()
bdraco marked this conversation as resolved.
Show resolved Hide resolved
finally:
if not self._buffer and not self._eof:
await self._wait_for_data()
if self._buffer:
data = self._buffer.popleft()
self._size -= len(data)
if self._size < self._limit and self._protocol._reading_paused:
self._protocol.resume_reading()
return data
if self._exception is not None:
raise self._exception
raise EofStream
Loading