Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Python 3.5 “async for” compatibility #542

Merged
merged 1 commit into from
Oct 18, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 66 additions & 2 deletions aiohttp/streams.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import sys
import asyncio
import collections
import functools
Expand All @@ -10,6 +11,8 @@
'FlowControlStreamReader',
'FlowControlDataQueue', 'FlowControlChunksQueue')

PY_35 = sys.version_info >= (3, 5)

EOF_MARKER = b''
DEFAULT_LIMIT = 2 ** 16

Expand All @@ -18,7 +21,63 @@ class EofStream(Exception):
"""eof stream indication."""


class StreamReader(asyncio.StreamReader):
class AsyncStreamIterator:

def __init__(self, read_func):
self.read_func = read_func

@asyncio.coroutine
def __aiter__(self):
return self

@asyncio.coroutine
def __anext__(self):
try:
rv = yield from self.read_func()
except EofStream:
raise StopAsyncIteration # NOQA
if rv == EOF_MARKER:
raise StopAsyncIteration # NOQA
return rv


class AsyncStreamReaderMixin:

if PY_35:
@asyncio.coroutine
def __aiter__(self):
return AsyncStreamIterator(self.readline)

def iter_chunked(self, n):
"""Returns an asynchronous iterator that yields chunks of size n.

.. versionadded:: Python-3.5 available for Python 3.5+ only
"""
return AsyncStreamIterator(lambda: self.read(n))

def iter_any(self):
"""Returns an asynchronous iterator that yields slices of data as they come.

.. versionadded:: Python-3.5 available for Python 3.5+ only
"""
return AsyncStreamIterator(self.readany)


class StreamReader(asyncio.StreamReader, AsyncStreamReaderMixin):
"""An enhancement of :class:`asyncio.StreamReader`.

Supports asynchronous iteration by line, chunk or as available::

async for line in reader:
...
async for chunk in reader.iter_chunked(1024):
...
async for slice in reader.iter_any():
...

.. automethod:: AsyncStreamReaderMixin.iter_chunked
.. automethod:: AsyncStreamReaderMixin.iter_any
"""

total_bytes = 0

Expand Down Expand Up @@ -270,7 +329,7 @@ def _read_nowait(self, n=None):
return data


class EmptyStreamReader:
class EmptyStreamReader(AsyncStreamReaderMixin):

def exception(self):
return None
Expand Down Expand Up @@ -386,6 +445,11 @@ def read(self):
else:
raise EofStream

if PY_35:
@asyncio.coroutine
def __aiter__(self):
return AsyncStreamIterator(self.read)


class ChunksQueue(DataQueue):
"""Like a :class:`DataQueue`, but for binary chunked data transfer."""
Expand Down
5 changes: 5 additions & 0 deletions tests/test_py35/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just drop the file.
test_py35 is not a package.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i explained that in the file:

This file allows files in here to be called the same as other test files.

if there is e.g. /tests/test_streams and /tests/test_py35/test_streams, then /tests/test_py35/__init__.py has to exist due to the module cache. try it if you don’t believe me 😄

Python 3.5 test module, testing new native async stuff.

This file allows files in here to be called the same as other test files.
"""
72 changes: 72 additions & 0 deletions tests/test_py35/test_streams.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import pytest

from aiohttp import streams


DATA = b'line1\nline2\nline3\n'


def chunkify(seq, n):
for i in range(0, len(seq), n):
yield seq[i:i+n]


def create_stream(loop):
stream = streams.StreamReader(loop=loop)
stream.feed_data(DATA)
stream.feed_eof()
return stream


@pytest.mark.run_loop
async def test_stream_reader_lines(loop):
line_iter = iter(DATA.splitlines(keepends=True))
async for line in create_stream(loop):
assert line == next(line_iter, None)
pytest.raises(StopIteration, next, line_iter)


@pytest.mark.run_loop
async def test_stream_reader_chunks_complete(loop):
"""Tests if chunked iteration works if the chunking works out
(i.e. the data is divisible by the chunk size)
"""
chunk_iter = chunkify(DATA, 9)
async for line in create_stream(loop).iter_chunked(9):
assert line == next(chunk_iter, None)
pytest.raises(StopIteration, next, chunk_iter)


@pytest.mark.run_loop
async def test_stream_reader_chunks_incomplete(loop):
"""Tests if chunked iteration works if the last chunk is incomplete"""
chunk_iter = chunkify(DATA, 8)
async for line in create_stream(loop).iter_chunked(8):
assert line == next(chunk_iter, None)
pytest.raises(StopIteration, next, chunk_iter)


@pytest.mark.run_loop
async def test_data_queue_empty(loop):
"""Tests that async looping yields nothing if nothing is there"""
buffer = streams.DataQueue(loop=loop)
buffer.feed_eof()

async for _ in buffer: # NOQA
assert False


@pytest.mark.run_loop
async def test_data_queue_items(loop):
"""Tests that async looping yields objects identically"""
buffer = streams.DataQueue(loop=loop)

items = [object(), object()]
buffer.feed_data(items[0], 1)
buffer.feed_data(items[1], 1)
buffer.feed_eof()

item_iter = iter(items)
async for item in buffer:
assert item is next(item_iter, None)
pytest.raises(StopIteration, next, item_iter)