diff --git a/CHANGES.txt b/CHANGES.txt index 07c2961a5b0..d7f7c697e6f 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -4,4 +4,4 @@ CHANGES 0.19.0 (XX-XX-XXXX) ------------------- - +- Memory leak in ParserBuffer #579 diff --git a/CONTRIBUTORS.txt b/CONTRIBUTORS.txt index e008ae46d9a..e50d52e8df2 100644 --- a/CONTRIBUTORS.txt +++ b/CONTRIBUTORS.txt @@ -39,6 +39,7 @@ Igor Pavlov Ingmar Steen Jaesung Lee Jashandeep Sohi +Jeroen van der Heijden Julien Duponchelle Junjie Tao Kay Zheng diff --git a/Makefile b/Makefile index 84ed20234a9..fa34381be92 100644 --- a/Makefile +++ b/Makefile @@ -25,7 +25,7 @@ vtest: flake .develop cov cover coverage: tox -cov-dev: develop +cov-dev: .develop @coverage erase @coverage run -m pytest -s tests @mv .coverage .coverage.accel diff --git a/aiohttp/parsers.py b/aiohttp/parsers.py index a1971e11212..0dd5a71ad2d 100644 --- a/aiohttp/parsers.py +++ b/aiohttp/parsers.py @@ -263,47 +263,58 @@ def eof_received(self): self.reader.feed_eof() -class ParserBuffer(bytearray): - """ParserBuffer is a bytearray extension. +class _ParserBufferHelper: + + __slots__ = ('exception', 'data') + + def __init__(self, exception, data): + self.exception = exception + self.data = data + + +class ParserBuffer: + """ParserBuffer is NOT a bytearray extension anymore. ParserBuffer provides helper methods for parsers. """ - __slots__ = ('_exception', '_writer') + __slots__ = ('_helper', '_writer', '_data') def __init__(self, *args): - super().__init__(*args) - - self._exception = None - self._writer = self._feed_data() + self._data = bytearray(*args) + self._helper = _ParserBufferHelper(None, self._data) + self._writer = self._feed_data(self._helper) next(self._writer) def exception(self): - return self._exception + return self._helper.exception def set_exception(self, exc): - self._exception = exc + self._helper.exception = exc - def _feed_data(self): + @staticmethod + def _feed_data(helper): while True: chunk = yield if chunk: - self.extend(chunk) + helper.data.extend(chunk) - if self._exception: - self._writer = self._feed_data() - next(self._writer) - raise self._exception + if helper.exception: + raise helper.exception def feed_data(self, data): - self._writer.send(data) + if not self._helper.exception: + self._writer.send(data) def read(self, size): """read() reads specified amount of bytes.""" while True: - if len(self) >= size: - data = self[:size] - del self[:size] + if self._helper.exception: + raise self._helper.exception + + if len(self._data) >= size: + data = self._data[:size] + del self._data[:size] return data self._writer.send((yield)) @@ -312,13 +323,16 @@ def readsome(self, size=None): """reads size of less amount of bytes.""" while True: - length = len(self) + if self._helper.exception: + raise self._helper.exception + + length = len(self._data) if length > 0: if size is None or length < size: size = length - data = self[:size] - del self[:size] + data = self._data[:size] + del self._data[:size] return data self._writer.send((yield)) @@ -330,7 +344,10 @@ def readuntil(self, stop, limit=None): stop_len = len(stop) while True: - pos = self.find(stop) + if self._helper.exception: + raise self._helper.exception + + pos = self._data.find(stop) if pos >= 0: end = pos + stop_len size = end @@ -338,11 +355,11 @@ def readuntil(self, stop, limit=None): raise errors.LineLimitExceededParserError( 'Line is too long.', limit) - data = self[:size] - del self[:size] + data = self._data[:size] + del self._data[:size] return data else: - if limit is not None and len(self) > limit: + if limit is not None and len(self._data) > limit: raise errors.LineLimitExceededParserError( 'Line is too long.', limit) @@ -353,8 +370,11 @@ def wait(self, size): then returns data without changing internal buffer.""" while True: - if len(self) >= size: - return self[:size] + if self._helper.exception: + raise self._helper.exception + + if len(self._data) >= size: + return self._data[:size] self._writer.send((yield)) @@ -366,28 +386,34 @@ def waituntil(self, stop, limit=None): stop_len = len(stop) while True: - pos = self.find(stop) + if self._helper.exception: + raise self._helper.exception + + pos = self._data.find(stop) if pos >= 0: size = pos + stop_len if limit is not None and size > limit: raise errors.LineLimitExceededParserError( - 'Line is too long. %s' % bytes(self), limit) + 'Line is too long. %s' % bytes(self._data), limit) - return self[:size] + return self._data[:size] else: - if limit is not None and len(self) > limit: + if limit is not None and len(self._data) > limit: raise errors.LineLimitExceededParserError( - 'Line is too long. %s' % bytes(self), limit) + 'Line is too long. %s' % bytes(self._data), limit) self._writer.send((yield)) def skip(self, size): """skip() skips specified amount of bytes.""" - while len(self) < size: + while len(self._data) < size: + if self._helper.exception: + raise self._helper.exception + self._writer.send((yield)) - del self[:size] + del self._data[:size] def skipuntil(self, stop): """skipuntil() reads until `stop` bytes sequence.""" @@ -397,14 +423,26 @@ def skipuntil(self, stop): stop_len = len(stop) while True: - stop_line = self.find(stop) + if self._helper.exception: + raise self._helper.exception + + stop_line = self._data.find(stop) if stop_line >= 0: size = stop_line + stop_len - del self[:size] + del self._data[:size] return self._writer.send((yield)) + def extend(self, data): + self._data.extend(data) + + def __len__(self): + return len(self._data) + + def __bytes__(self): + return bytes(self._data) + class LinesParser: """Lines parser. diff --git a/tests/test_parsers.py b/tests/test_parsers.py index 0d30bee7d2d..9e01a2a6b5c 100644 --- a/tests/test_parsers.py +++ b/tests/test_parsers.py @@ -407,14 +407,39 @@ def test_feed_data(self): self.assertEqual(len(buf), 4) self.assertEqual(bytes(buf), b'data') + def test_feed_data_after_exception(self): + buf = self._make_one() + buf.feed_data(b'data') + + exc = ValueError() + buf.set_exception(exc) + buf.feed_data(b'more') + self.assertEqual(len(buf), 4) + self.assertEqual(bytes(buf), b'data') + def test_read_exc(self): buf = self._make_one() + p = buf.read(3) + next(p) + p.send(b'1') + exc = ValueError() buf.set_exception(exc) self.assertIs(buf.exception(), exc) + self.assertRaises(ValueError, p.send, b'1') + + def test_read_exc_multiple(self): + buf = self._make_one() p = buf.read(3) next(p) - self.assertRaises(ValueError, p.send, b'1') + p.send(b'1') + + exc = ValueError() + buf.set_exception(exc) + self.assertIs(buf.exception(), exc) + + p = buf.read(3) + self.assertRaises(ValueError, next, p) def test_read(self): buf = self._make_one() @@ -448,6 +473,13 @@ def test_readsome(self): self.assertEqual(res, b'23') self.assertEqual(b'4', bytes(buf)) + def test_readsome_exc(self): + buf = self._make_one() + buf.set_exception(ValueError()) + + p = buf.readsome(3) + self.assertRaises(ValueError, next, p) + def test_wait(self): buf = self._make_one() p = buf.wait(3) @@ -461,6 +493,13 @@ def test_wait(self): self.assertEqual(res, b'123') self.assertEqual(b'1234', bytes(buf)) + def test_wait_exc(self): + buf = self._make_one() + buf.set_exception(ValueError()) + + p = buf.wait(3) + self.assertRaises(ValueError, next, p) + def test_skip(self): buf = self._make_one() p = buf.skip(3) @@ -474,6 +513,12 @@ def test_skip(self): self.assertIsNone(res) self.assertEqual(b'4', bytes(buf)) + def test_skip_exc(self): + buf = self._make_one() + buf.set_exception(ValueError()) + p = buf.skip(3) + self.assertRaises(ValueError, next, p) + def test_readuntil_limit(self): buf = self._make_one() p = buf.readuntil(b'\n', 4) @@ -507,6 +552,12 @@ def test_readuntil(self): self.assertEqual(res, b'123\n') self.assertEqual(b'456', bytes(buf)) + def test_readuntil_exc(self): + buf = self._make_one() + buf.set_exception(ValueError()) + p = buf.readuntil(b'\n', 4) + self.assertRaises(ValueError, next, p) + def test_waituntil_limit(self): buf = self._make_one() p = buf.waituntil(b'\n', 4) @@ -540,6 +591,12 @@ def test_waituntil(self): self.assertEqual(res, b'123\n') self.assertEqual(b'123\n456', bytes(buf)) + def test_waituntil_exc(self): + buf = self._make_one() + buf.set_exception(ValueError()) + p = buf.waituntil(b'\n', 4) + self.assertRaises(ValueError, next, p) + def test_skipuntil(self): buf = self._make_one() p = buf.skipuntil(b'\n') @@ -558,6 +615,12 @@ def test_skipuntil(self): pass self.assertEqual(b'', bytes(buf)) + def test_skipuntil_exc(self): + buf = self._make_one() + buf.set_exception(ValueError()) + p = buf.skipuntil(b'\n') + self.assertRaises(ValueError, next, p) + def test_lines_parser(self): out = parsers.FlowControlDataQueue(self.stream, loop=self.loop) buf = self._make_one()