From 7e8b10c5709087f42554a4c1cc3455e73b923f16 Mon Sep 17 00:00:00 2001 From: Philipp A Date: Fri, 2 Oct 2015 14:46:28 +0200 Subject: [PATCH] =?UTF-8?q?Added=20Python=203.5=20=E2=80=9Casync=20for?= =?UTF-8?q?=E2=80=9D=20compatibility?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Instead of while True: msg = await dataqueue.read() ... do: async for msg in dataqueue: ... --- aiohttp/streams.py | 68 ++++++++++++++++++++++++++++++- tests/test_py35/__init__.py | 5 +++ tests/test_py35/test_streams.py | 72 +++++++++++++++++++++++++++++++++ 3 files changed, 143 insertions(+), 2 deletions(-) create mode 100644 tests/test_py35/__init__.py create mode 100644 tests/test_py35/test_streams.py diff --git a/aiohttp/streams.py b/aiohttp/streams.py index aca8e91652b..2086444cef7 100644 --- a/aiohttp/streams.py +++ b/aiohttp/streams.py @@ -1,3 +1,4 @@ +import sys import asyncio import collections import functools @@ -10,6 +11,8 @@ 'FlowControlStreamReader', 'FlowControlDataQueue', 'FlowControlChunksQueue') +PY_35 = sys.version_info >= (3, 5) + EOF_MARKER = b'' DEFAULT_LIMIT = 2 ** 16 @@ -18,7 +21,63 @@ class EofStream(Exception): """eof stream indication.""" -class StreamReader(asyncio.StreamReader): +class AsyncStreamIterator: + + def __init__(self, read_func): + self.read_func = read_func + + @asyncio.coroutine + def __aiter__(self): + return self + + @asyncio.coroutine + def __anext__(self): + try: + rv = yield from self.read_func() + except EofStream: + raise StopAsyncIteration # NOQA + if rv == EOF_MARKER: + raise StopAsyncIteration # NOQA + return rv + + +class AsyncStreamReaderMixin: + + if PY_35: + @asyncio.coroutine + def __aiter__(self): + return AsyncStreamIterator(self.readline) + + def iter_chunked(self, n): + """Returns an asynchronous iterator that yields chunks of size n. + + .. versionadded:: Python-3.5 available for Python 3.5+ only + """ + return AsyncStreamIterator(lambda: self.read(n)) + + def iter_any(self): + """Returns an asynchronous iterator that yields slices of data as they come. + + .. versionadded:: Python-3.5 available for Python 3.5+ only + """ + return AsyncStreamIterator(self.readany) + + +class StreamReader(asyncio.StreamReader, AsyncStreamReaderMixin): + """An enhancement of :class:`asyncio.StreamReader`. + + Supports asynchronous iteration by line, chunk or as available:: + + async for line in reader: + ... + async for chunk in reader.iter_chunked(1024): + ... + async for slice in reader.iter_any(): + ... + + .. automethod:: AsyncStreamReaderMixin.iter_chunked + .. automethod:: AsyncStreamReaderMixin.iter_any + """ total_bytes = 0 @@ -270,7 +329,7 @@ def _read_nowait(self, n=None): return data -class EmptyStreamReader: +class EmptyStreamReader(AsyncStreamReaderMixin): def exception(self): return None @@ -386,6 +445,11 @@ def read(self): else: raise EofStream + if PY_35: + @asyncio.coroutine + def __aiter__(self): + return AsyncStreamIterator(self.read) + class ChunksQueue(DataQueue): """Like a :class:`DataQueue`, but for binary chunked data transfer.""" diff --git a/tests/test_py35/__init__.py b/tests/test_py35/__init__.py new file mode 100644 index 00000000000..9fe3d521fcd --- /dev/null +++ b/tests/test_py35/__init__.py @@ -0,0 +1,5 @@ +""" +Python 3.5 test module, testing new native async stuff. + +This file allows files in here to be called the same as other test files. +""" diff --git a/tests/test_py35/test_streams.py b/tests/test_py35/test_streams.py new file mode 100644 index 00000000000..83d298a0b1a --- /dev/null +++ b/tests/test_py35/test_streams.py @@ -0,0 +1,72 @@ +import pytest + +from aiohttp import streams + + +DATA = b'line1\nline2\nline3\n' + + +def chunkify(seq, n): + for i in range(0, len(seq), n): + yield seq[i:i+n] + + +def create_stream(loop): + stream = streams.StreamReader(loop=loop) + stream.feed_data(DATA) + stream.feed_eof() + return stream + + +@pytest.mark.run_loop +async def test_stream_reader_lines(loop): + line_iter = iter(DATA.splitlines(keepends=True)) + async for line in create_stream(loop): + assert line == next(line_iter, None) + pytest.raises(StopIteration, next, line_iter) + + +@pytest.mark.run_loop +async def test_stream_reader_chunks_complete(loop): + """Tests if chunked iteration works if the chunking works out + (i.e. the data is divisible by the chunk size) + """ + chunk_iter = chunkify(DATA, 9) + async for line in create_stream(loop).iter_chunked(9): + assert line == next(chunk_iter, None) + pytest.raises(StopIteration, next, chunk_iter) + + +@pytest.mark.run_loop +async def test_stream_reader_chunks_incomplete(loop): + """Tests if chunked iteration works if the last chunk is incomplete""" + chunk_iter = chunkify(DATA, 8) + async for line in create_stream(loop).iter_chunked(8): + assert line == next(chunk_iter, None) + pytest.raises(StopIteration, next, chunk_iter) + + +@pytest.mark.run_loop +async def test_data_queue_empty(loop): + """Tests that async looping yields nothing if nothing is there""" + buffer = streams.DataQueue(loop=loop) + buffer.feed_eof() + + async for _ in buffer: # NOQA + assert False + + +@pytest.mark.run_loop +async def test_data_queue_items(loop): + """Tests that async looping yields objects identically""" + buffer = streams.DataQueue(loop=loop) + + items = [object(), object()] + buffer.feed_data(items[0], 1) + buffer.feed_data(items[1], 1) + buffer.feed_eof() + + item_iter = iter(items) + async for item in buffer: + assert item is next(item_iter, None) + pytest.raises(StopIteration, next, item_iter)