From 0cf904aa0f93ca952826fd007bb57d576cbf5c75 Mon Sep 17 00:00:00 2001 From: Aleksandr Danshyn Date: Wed, 21 Sep 2016 10:50:46 +0300 Subject: [PATCH 1/7] Added maybe_resume decorator to read_nowait --- aiohttp/streams.py | 62 +++++++++++++++++++++++++++++----------------- 1 file changed, 39 insertions(+), 23 deletions(-) diff --git a/aiohttp/streams.py b/aiohttp/streams.py index e9833aaf7fc..b60d009da39 100644 --- a/aiohttp/streams.py +++ b/aiohttp/streams.py @@ -501,29 +501,19 @@ def read(self): def maybe_resume(func): - @asyncio.coroutine - @functools.wraps(func) - def wrapper(self, *args, **kw): - result = yield from func(self, *args, **kw) - - if self._stream.paused: - if self._buffer_size < self._b_limit: - try: - self._stream.transport.resume_reading() - except (AttributeError, NotImplementedError): - pass - else: - self._stream.paused = False - else: - if self._buffer_size > self._b_limit: - try: - self._stream.transport.pause_reading() - except (AttributeError, NotImplementedError): - pass - else: - self._stream.paused = True - - return result + if asyncio.iscoroutinefunction(func): + @asyncio.coroutine + @functools.wraps(func) + def wrapper(self, *args, **kw): + result = yield from func(self, *args, **kw) + self._check_buffer_size() + return result + else: + @functools.wraps(func) + def wrapper(self, *args, **kw): + result = func(self, *args, **kw) + self._check_buffer_size() + return result return wrapper @@ -543,6 +533,24 @@ def __init__(self, stream, limit=DEFAULT_LIMIT, *args, **kwargs): except (AttributeError, NotImplementedError): pass + def _check_buffer_size(self): + if self._stream.paused: + if self._buffer_size < self._b_limit: + try: + self._stream.transport.resume_reading() + except (AttributeError, NotImplementedError): + pass + else: + self._stream.paused = False + else: + if self._buffer_size > self._b_limit: + try: + self._stream.transport.pause_reading() + except (AttributeError, NotImplementedError): + pass + else: + self._stream.paused = True + def feed_data(self, data, size=0): has_waiter = self._waiter is not None and not self._waiter.cancelled() @@ -558,21 +566,29 @@ def feed_data(self, data, size=0): self._stream.paused = True @maybe_resume + @asyncio.coroutine def read(self, n=-1): return (yield from super().read(n)) @maybe_resume + @asyncio.coroutine def readline(self): return (yield from super().readline()) @maybe_resume + @asyncio.coroutine def readany(self): return (yield from super().readany()) @maybe_resume + @asyncio.coroutine def readexactly(self, n): return (yield from super().readexactly(n)) + @maybe_resume + def read_nowait(self, n=-1): + return super().read_nowait(n) + class FlowControlDataQueue(DataQueue): """FlowControlDataQueue resumes and pauses an underlying stream. From 02f8234efc6a6482bee3cb0de2e5f9ebc5857a91 Mon Sep 17 00:00:00 2001 From: Aleksandr Danshyn Date: Wed, 21 Sep 2016 13:20:36 +0300 Subject: [PATCH 2/7] Added tests --- aiohttp/streams.py | 2 ++ tests/test_flowcontrol_streams.py | 40 ++++++++++++++++++++++++------- 2 files changed, 34 insertions(+), 8 deletions(-) diff --git a/aiohttp/streams.py b/aiohttp/streams.py index b60d009da39..c47cfb86912 100644 --- a/aiohttp/streams.py +++ b/aiohttp/streams.py @@ -532,6 +532,8 @@ def __init__(self, stream, limit=DEFAULT_LIMIT, *args, **kwargs): self._stream.transport.resume_reading() except (AttributeError, NotImplementedError): pass + else: + self._stream.paused = False def _check_buffer_size(self): if self._stream.paused: diff --git a/tests/test_flowcontrol_streams.py b/tests/test_flowcontrol_streams.py index f305a53ebb3..7f5e32b89a4 100644 --- a/tests/test_flowcontrol_streams.py +++ b/tests/test_flowcontrol_streams.py @@ -8,7 +8,7 @@ class TestFlowControlStreamReader(unittest.TestCase): def setUp(self): - self.stream = mock.Mock() + self.stream = mock.Mock(paused=False) self.transp = self.stream.transport self.loop = asyncio.new_event_loop() asyncio.set_event_loop(None) @@ -22,7 +22,7 @@ def _make_one(self, *args, **kwargs): def test_read(self): r = self._make_one() - r.paused = True + r._stream.paused = True r.feed_data(b'da', 2) res = self.loop.run_until_complete(r.read(1)) self.assertEqual(res, b'd') @@ -30,7 +30,7 @@ def test_read(self): def test_readline(self): r = self._make_one() - r.paused = True + r._stream.paused = True r.feed_data(b'data\n', 5) res = self.loop.run_until_complete(r.readline()) self.assertEqual(res, b'data\n') @@ -38,7 +38,7 @@ def test_readline(self): def test_readany(self): r = self._make_one() - r.paused = True + r._stream.paused = True r.feed_data(b'data', 4) res = self.loop.run_until_complete(r.readany()) self.assertEqual(res, b'data') @@ -46,10 +46,10 @@ def test_readany(self): def test_readexactly(self): r = self._make_one() - r.paused = True - r.feed_data(b'datadata', 8) - res = self.loop.run_until_complete(r.readexactly(2)) - self.assertEqual(res, b'da') + r._stream.paused = True + r.feed_data(b'data', 4) + res = self.loop.run_until_complete(r.readexactly(3)) + self.assertEqual(res, b'dat') self.assertTrue(self.transp.resume_reading.called) def test_feed_data(self): @@ -58,6 +58,30 @@ def test_feed_data(self): r.feed_data(b'datadata', 8) self.assertTrue(self.transp.pause_reading.called) + def test_read_nowait(self): + r = self._make_one() + r._stream.paused = False + r.feed_data(b'data1', 5) + r.feed_data(b'data2', 5) + + res = self.loop.run_until_complete(r.read(5)) + self.assertTrue(res == b'data1') + # _buffer_size > _buffer_limit + self.assertTrue(self.transp.pause_reading.call_count == 1) + self.assertTrue(not self.transp.resume_reading.call_count) + + res = r.read_nowait(5) + self.assertTrue(res == b'data2') + # _buffer_size < _buffer_limit + self.assertTrue(self.transp.pause_reading.call_count == 1) + self.assertTrue(self.transp.resume_reading.call_count == 1) + + res = r.read_nowait(5) + self.assertTrue(res == b'') + # _buffer_size < _buffer_limit + self.assertTrue(self.transp.pause_reading.call_count == 1) + self.assertTrue(self.transp.resume_reading.call_count == 1) + class FlowControlMixin: From a5fa371b79a9d67e58499b153e8e3d285ce43f54 Mon Sep 17 00:00:00 2001 From: Aleksandr Danshyn Date: Thu, 22 Sep 2016 16:38:37 +0300 Subject: [PATCH 3/7] added myself to contributors --- CONTRIBUTORS.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/CONTRIBUTORS.txt b/CONTRIBUTORS.txt index ac2f26da275..4b5f0dd47a9 100644 --- a/CONTRIBUTORS.txt +++ b/CONTRIBUTORS.txt @@ -3,6 +3,7 @@ Contributors A. Jesse Jiryu Davis Alejandro Gómez +Aleksandr Danshyn Aleksey Kutepov Alex Khomchenko Alex Lisovoy From 2e8e833b42d29cafa6b7a17d806fca18e724bebc Mon Sep 17 00:00:00 2001 From: Aleksandr Danshyn Date: Thu, 22 Sep 2016 16:42:42 +0300 Subject: [PATCH 4/7] Added an entry to CHANGES --- CHANGES.rst | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGES.rst b/CHANGES.rst index 2c28ef4c6f6..61a05f55c59 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -6,7 +6,8 @@ CHANGES - Make CookieJar compatible with 32-bit systems #1188 -- +- Fix FlowControlStreamReader.read_nowait so that it checks + whether the transport is paused - From ebbd9dcd9f5135104d443a78dcefcdb5a688a60f Mon Sep 17 00:00:00 2001 From: Aleksandr Danshyn Date: Mon, 3 Oct 2016 23:22:36 +0300 Subject: [PATCH 5/7] Covered remaining lines --- tests/test_flowcontrol_streams.py | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/tests/test_flowcontrol_streams.py b/tests/test_flowcontrol_streams.py index 7f5e32b89a4..0bc622786b1 100644 --- a/tests/test_flowcontrol_streams.py +++ b/tests/test_flowcontrol_streams.py @@ -82,6 +82,31 @@ def test_read_nowait(self): self.assertTrue(self.transp.pause_reading.call_count == 1) self.assertTrue(self.transp.resume_reading.call_count == 1) + def test_rudimentary_transport(self): + self.transp.resume_reading.side_effect = NotImplementedError() + self.transp.pause_reading.side_effect = NotImplementedError() + self.stream.paused = True + + r = self._make_one() + self.assertTrue(self.transp.pause_reading.call_count == 0) + self.assertTrue(self.transp.resume_reading.call_count == 1) + self.assertTrue(self.stream.paused) + + r.feed_data(b'data', 4) + res = self.loop.run_until_complete(r.read(4)) + self.assertTrue(self.transp.pause_reading.call_count == 0) + self.assertTrue(self.transp.resume_reading.call_count == 2) + self.assertTrue(self.stream.paused) + self.assertTrue(res == b'data') + + self.stream.paused = False + r.feed_data(b'data', 4) + res = self.loop.run_until_complete(r.read(1)) + self.assertTrue(self.transp.pause_reading.call_count == 2) + self.assertTrue(self.transp.resume_reading.call_count == 2) + self.assertTrue(not self.stream.paused) + self.assertTrue(res == b'd') + class FlowControlMixin: From 3c87108971497dbedf8fe614642a22fa4ad58685 Mon Sep 17 00:00:00 2001 From: Aleksandr Danshyn Date: Tue, 4 Oct 2016 00:13:59 +0300 Subject: [PATCH 6/7] Covered last line --- tests/test_flowcontrol_streams.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/tests/test_flowcontrol_streams.py b/tests/test_flowcontrol_streams.py index 0bc622786b1..64e64015b52 100644 --- a/tests/test_flowcontrol_streams.py +++ b/tests/test_flowcontrol_streams.py @@ -63,24 +63,29 @@ def test_read_nowait(self): r._stream.paused = False r.feed_data(b'data1', 5) r.feed_data(b'data2', 5) + self.assertTrue(self.stream.paused) + r._stream.paused = False res = self.loop.run_until_complete(r.read(5)) self.assertTrue(res == b'data1') # _buffer_size > _buffer_limit - self.assertTrue(self.transp.pause_reading.call_count == 1) - self.assertTrue(not self.transp.resume_reading.call_count) + self.assertTrue(self.transp.pause_reading.call_count == 2) + self.assertTrue(self.transp.resume_reading.call_count == 0) + self.assertTrue(self.stream.paused) res = r.read_nowait(5) self.assertTrue(res == b'data2') # _buffer_size < _buffer_limit - self.assertTrue(self.transp.pause_reading.call_count == 1) + self.assertTrue(self.transp.pause_reading.call_count == 2) self.assertTrue(self.transp.resume_reading.call_count == 1) + self.assertTrue(not self.stream.paused) res = r.read_nowait(5) self.assertTrue(res == b'') # _buffer_size < _buffer_limit - self.assertTrue(self.transp.pause_reading.call_count == 1) + self.assertTrue(self.transp.pause_reading.call_count == 2) self.assertTrue(self.transp.resume_reading.call_count == 1) + self.assertTrue(not self.stream.paused) def test_rudimentary_transport(self): self.transp.resume_reading.side_effect = NotImplementedError() From ca5ed93cd896b7dd224eb36ae42ee0c3df23fc6b Mon Sep 17 00:00:00 2001 From: Aleksandr Danshyn Date: Tue, 4 Oct 2016 00:28:51 +0300 Subject: [PATCH 7/7] Covered missed branch --- tests/test_flowcontrol_streams.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/tests/test_flowcontrol_streams.py b/tests/test_flowcontrol_streams.py index 64e64015b52..f8f45cedf4c 100644 --- a/tests/test_flowcontrol_streams.py +++ b/tests/test_flowcontrol_streams.py @@ -63,18 +63,26 @@ def test_read_nowait(self): r._stream.paused = False r.feed_data(b'data1', 5) r.feed_data(b'data2', 5) + r.feed_data(b'data3', 5) self.assertTrue(self.stream.paused) - r._stream.paused = False res = self.loop.run_until_complete(r.read(5)) self.assertTrue(res == b'data1') # _buffer_size > _buffer_limit - self.assertTrue(self.transp.pause_reading.call_count == 2) + self.assertTrue(self.transp.pause_reading.call_count == 1) self.assertTrue(self.transp.resume_reading.call_count == 0) self.assertTrue(self.stream.paused) + r._stream.paused = False res = r.read_nowait(5) self.assertTrue(res == b'data2') + # _buffer_size > _buffer_limit + self.assertTrue(self.transp.pause_reading.call_count == 2) + self.assertTrue(self.transp.resume_reading.call_count == 0) + self.assertTrue(self.stream.paused) + + res = r.read_nowait(5) + self.assertTrue(res == b'data3') # _buffer_size < _buffer_limit self.assertTrue(self.transp.pause_reading.call_count == 2) self.assertTrue(self.transp.resume_reading.call_count == 1)