Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented readuntil in StreamResponse #4734

Merged
merged 8 commits into from
Nov 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES/4054.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implemented readuntil in StreamResponse
29 changes: 19 additions & 10 deletions aiohttp/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,34 +310,41 @@ async def _wait(self, func_name: str) -> None:
self._waiter = None

async def readline(self) -> bytes:
return await self.readuntil()

async def readuntil(self, separator: bytes = b"\n") -> bytes:
seplen = len(separator)
if seplen == 0:
raise ValueError("Separator should be at least one-byte string")

if self._exception is not None:
raise self._exception

line = []
line_size = 0
chunk = b""
chunk_size = 0
not_enough = True

while not_enough:
while self._buffer and not_enough:
offset = self._buffer_offset
ichar = self._buffer[0].find(b"\n", offset) + 1
# Read from current offset to found b'\n' or to the end.
ichar = self._buffer[0].find(separator, offset) + 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

asyncio has much more effective solution. Why not to copy-paste it ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not want to change much the code.

What do you think should be copied from asyncio implementation?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think sync the implementation with asyncio would be great!
asyncio doesn't incrementally copy data to the buffer in a loop but calculates boundaries and move bytes once.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I will fix it

# Read from current offset to found separator or to the end.
data = self._read_nowait_chunk(ichar - offset if ichar else -1)
line.append(data)
line_size += len(data)
chunk += data
chunk_size += len(data)
if ichar:
not_enough = False

if line_size > self._high_water:
raise ValueError("Line is too long")
if chunk_size > self._high_water:
raise ValueError("Chunk too big")

if self._eof:
break

if not_enough:
await self._wait("readline")
await self._wait("readuntil")

return b"".join(line)
return chunk

async def read(self, n: int = -1) -> bytes:
if self._exception is not None:
Expand Down Expand Up @@ -517,6 +524,8 @@ async def readline(self) -> bytes:
async def read(self, n: int = -1) -> bytes:
return b""

# TODO add async def readuntil

async def readany(self) -> bytes:
return b""

Expand Down
13 changes: 13 additions & 0 deletions docs/streams.rst
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,19 @@ Reading Methods

:return bytes: the given line

.. comethod:: StreamReader.readuntil(separator="\n")

Read until separator, where `separator` is a sequence of bytes.

If EOF is received, and `separator` was not found, the method will
return the partial read bytes.

If the EOF was received and the internal buffer is empty, return an
empty bytes object.

.. versionadded:: 3.8

:return bytes: the given data

.. comethod:: StreamReader.readchunk()

Expand Down
111 changes: 111 additions & 0 deletions tests/test_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,117 @@ async def test_readline_exception(self) -> None:
with pytest.raises(ValueError):
await stream.readline()

async def test_readuntil(self) -> None:
loop = asyncio.get_event_loop()
# Read one chunk. 'readuntil' will need to wait for the data
# to come from 'cb'
stream = self._make_one()
stream.feed_data(b"chunk1 ")
read_task = loop.create_task(stream.readuntil(b"*"))

def cb():
stream.feed_data(b"chunk2 ")
stream.feed_data(b"chunk3 ")
stream.feed_data(b"* chunk4")

loop.call_soon(cb)

line = await read_task
assert b"chunk1 chunk2 chunk3 *" == line

stream.feed_eof()
data = await stream.read()
assert b" chunk4" == data

async def test_readuntil_limit_with_existing_data(self) -> None:
# Read one chunk. The data is in StreamReader's buffer
# before the event loop is run.

stream = self._make_one(limit=2)
stream.feed_data(b"li")
stream.feed_data(b"ne1&line2&")

with pytest.raises(ValueError):
await stream.readuntil(b"&")
# The buffer should contain the remaining data after exception
stream.feed_eof()
data = await stream.read()
assert b"line2&" == data

async def test_readuntil_limit(self) -> None:
loop = asyncio.get_event_loop()
# Read one chunk. StreamReaders are fed with data after
# their 'readuntil' methods are called.
stream = self._make_one(limit=4)

def cb():
stream.feed_data(b"chunk1")
stream.feed_data(b"chunk2$")
stream.feed_data(b"chunk3#")
stream.feed_eof()

loop.call_soon(cb)

with pytest.raises(ValueError):
await stream.readuntil(b"$")
data = await stream.read()
assert b"chunk3#" == data

async def test_readuntil_nolimit_nowait(self) -> None:
# All needed data for the first 'readuntil' call will be
# in the buffer.
stream = self._make_one()
data = b"line1!line2!line3!"
stream.feed_data(data[:6])
stream.feed_data(data[6:])

line = await stream.readuntil(b"!")
assert b"line1!" == line

stream.feed_eof()
data = await stream.read()
assert b"line2!line3!" == data

async def test_readuntil_eof(self) -> None:
stream = self._make_one()
stream.feed_data(b"some data")
stream.feed_eof()

line = await stream.readuntil(b"@")
assert b"some data" == line

async def test_readuntil_empty_eof(self) -> None:
stream = self._make_one()
stream.feed_eof()

line = await stream.readuntil(b"@")
assert b"" == line

async def test_readuntil_read_byte_count(self) -> None:
stream = self._make_one()
data = b"line1!line2!line3!"
stream.feed_data(data)

await stream.readuntil(b"!")

data = await stream.read(7)
assert b"line2!l" == data

stream.feed_eof()
data = await stream.read()
assert b"ine3!" == data

async def test_readuntil_exception(self) -> None:
stream = self._make_one()
stream.feed_data(b"line#")

data = await stream.readuntil(b"#")
assert b"line#" == data

stream.set_exception(ValueError())
with pytest.raises(ValueError):
await stream.readuntil(b"#")

async def test_readexactly_zero_or_less(self) -> None:
# Read exact number of bytes (zero or less).
stream = self._make_one()
Expand Down