Skip to content

Commit

Permalink
Memory leak in ParserBuffer #579
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Oct 21, 2015
1 parent 1741564 commit 457a25e
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 40 deletions.
2 changes: 1 addition & 1 deletion CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ CHANGES
0.19.0 (XX-XX-XXXX)
-------------------


- Memory leak in ParserBuffer #579
1 change: 1 addition & 0 deletions CONTRIBUTORS.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ Igor Pavlov
Ingmar Steen
Jaesung Lee
Jashandeep Sohi
Jeroen van der Heijden
Julien Duponchelle
Junjie Tao
Kay Zheng
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ vtest: flake .develop
cov cover coverage:
tox

cov-dev: develop
cov-dev: .develop
@coverage erase
@coverage run -m pytest -s tests
@mv .coverage .coverage.accel
Expand Down
112 changes: 75 additions & 37 deletions aiohttp/parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,47 +263,58 @@ def eof_received(self):
self.reader.feed_eof()


class ParserBuffer(bytearray):
"""ParserBuffer is a bytearray extension.
class _ParserBufferHelper:

__slots__ = ('exception', 'data')

def __init__(self, exception, data):
self.exception = exception
self.data = data


class ParserBuffer:
"""ParserBuffer is NOT a bytearray extension anymore.
ParserBuffer provides helper methods for parsers.
"""
__slots__ = ('_exception', '_writer')
__slots__ = ('_helper', '_writer', '_data')

def __init__(self, *args):
super().__init__(*args)

self._exception = None
self._writer = self._feed_data()
self._data = bytearray(*args)
self._helper = _ParserBufferHelper(None, self._data)
self._writer = self._feed_data(self._helper)
next(self._writer)

def exception(self):
return self._exception
return self._helper.exception

def set_exception(self, exc):
self._exception = exc
self._helper.exception = exc

def _feed_data(self):
@staticmethod
def _feed_data(helper):
while True:
chunk = yield
if chunk:
self.extend(chunk)
helper.data.extend(chunk)

if self._exception:
self._writer = self._feed_data()
next(self._writer)
raise self._exception
if helper.exception:
raise helper.exception

def feed_data(self, data):
self._writer.send(data)
if not self._helper.exception:
self._writer.send(data)

def read(self, size):
"""read() reads specified amount of bytes."""

while True:
if len(self) >= size:
data = self[:size]
del self[:size]
if self._helper.exception:
raise self._helper.exception

if len(self._data) >= size:
data = self._data[:size]
del self._data[:size]
return data

self._writer.send((yield))
Expand All @@ -312,13 +323,16 @@ def readsome(self, size=None):
"""reads size of less amount of bytes."""

while True:
length = len(self)
if self._helper.exception:
raise self._helper.exception

length = len(self._data)
if length > 0:
if size is None or length < size:
size = length

data = self[:size]
del self[:size]
data = self._data[:size]
del self._data[:size]
return data

self._writer.send((yield))
Expand All @@ -330,19 +344,22 @@ def readuntil(self, stop, limit=None):
stop_len = len(stop)

while True:
pos = self.find(stop)
if self._helper.exception:
raise self._helper.exception

pos = self._data.find(stop)
if pos >= 0:
end = pos + stop_len
size = end
if limit is not None and size > limit:
raise errors.LineLimitExceededParserError(
'Line is too long.', limit)

data = self[:size]
del self[:size]
data = self._data[:size]
del self._data[:size]
return data
else:
if limit is not None and len(self) > limit:
if limit is not None and len(self._data) > limit:
raise errors.LineLimitExceededParserError(
'Line is too long.', limit)

Expand All @@ -353,8 +370,11 @@ def wait(self, size):
then returns data without changing internal buffer."""

while True:
if len(self) >= size:
return self[:size]
if self._helper.exception:
raise self._helper.exception

if len(self._data) >= size:
return self._data[:size]

self._writer.send((yield))

Expand All @@ -366,28 +386,34 @@ def waituntil(self, stop, limit=None):
stop_len = len(stop)

while True:
pos = self.find(stop)
if self._helper.exception:
raise self._helper.exception

pos = self._data.find(stop)
if pos >= 0:
size = pos + stop_len
if limit is not None and size > limit:
raise errors.LineLimitExceededParserError(
'Line is too long. %s' % bytes(self), limit)
'Line is too long. %s' % bytes(self._data), limit)

return self[:size]
return self._data[:size]
else:
if limit is not None and len(self) > limit:
if limit is not None and len(self._data) > limit:
raise errors.LineLimitExceededParserError(
'Line is too long. %s' % bytes(self), limit)
'Line is too long. %s' % bytes(self._data), limit)

self._writer.send((yield))

def skip(self, size):
"""skip() skips specified amount of bytes."""

while len(self) < size:
while len(self._data) < size:
if self._helper.exception:
raise self._helper.exception

self._writer.send((yield))

del self[:size]
del self._data[:size]

def skipuntil(self, stop):
"""skipuntil() reads until `stop` bytes sequence."""
Expand All @@ -397,14 +423,26 @@ def skipuntil(self, stop):
stop_len = len(stop)

while True:
stop_line = self.find(stop)
if self._helper.exception:
raise self._helper.exception

stop_line = self._data.find(stop)
if stop_line >= 0:
size = stop_line + stop_len
del self[:size]
del self._data[:size]
return

self._writer.send((yield))

def extend(self, data):
self._data.extend(data)

def __len__(self):
return len(self._data)

def __bytes__(self):
return bytes(self._data)


class LinesParser:
"""Lines parser.
Expand Down
65 changes: 64 additions & 1 deletion tests/test_parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,14 +407,39 @@ def test_feed_data(self):
self.assertEqual(len(buf), 4)
self.assertEqual(bytes(buf), b'data')

def test_feed_data_after_exception(self):
buf = self._make_one()
buf.feed_data(b'data')

exc = ValueError()
buf.set_exception(exc)
buf.feed_data(b'more')
self.assertEqual(len(buf), 4)
self.assertEqual(bytes(buf), b'data')

def test_read_exc(self):
buf = self._make_one()
p = buf.read(3)
next(p)
p.send(b'1')

exc = ValueError()
buf.set_exception(exc)
self.assertIs(buf.exception(), exc)
self.assertRaises(ValueError, p.send, b'1')

def test_read_exc_multiple(self):
buf = self._make_one()
p = buf.read(3)
next(p)
self.assertRaises(ValueError, p.send, b'1')
p.send(b'1')

exc = ValueError()
buf.set_exception(exc)
self.assertIs(buf.exception(), exc)

p = buf.read(3)
self.assertRaises(ValueError, next, p)

def test_read(self):
buf = self._make_one()
Expand Down Expand Up @@ -448,6 +473,13 @@ def test_readsome(self):
self.assertEqual(res, b'23')
self.assertEqual(b'4', bytes(buf))

def test_readsome_exc(self):
buf = self._make_one()
buf.set_exception(ValueError())

p = buf.readsome(3)
self.assertRaises(ValueError, next, p)

def test_wait(self):
buf = self._make_one()
p = buf.wait(3)
Expand All @@ -461,6 +493,13 @@ def test_wait(self):
self.assertEqual(res, b'123')
self.assertEqual(b'1234', bytes(buf))

def test_wait_exc(self):
buf = self._make_one()
buf.set_exception(ValueError())

p = buf.wait(3)
self.assertRaises(ValueError, next, p)

def test_skip(self):
buf = self._make_one()
p = buf.skip(3)
Expand All @@ -474,6 +513,12 @@ def test_skip(self):
self.assertIsNone(res)
self.assertEqual(b'4', bytes(buf))

def test_skip_exc(self):
buf = self._make_one()
buf.set_exception(ValueError())
p = buf.skip(3)
self.assertRaises(ValueError, next, p)

def test_readuntil_limit(self):
buf = self._make_one()
p = buf.readuntil(b'\n', 4)
Expand Down Expand Up @@ -507,6 +552,12 @@ def test_readuntil(self):
self.assertEqual(res, b'123\n')
self.assertEqual(b'456', bytes(buf))

def test_readuntil_exc(self):
buf = self._make_one()
buf.set_exception(ValueError())
p = buf.readuntil(b'\n', 4)
self.assertRaises(ValueError, next, p)

def test_waituntil_limit(self):
buf = self._make_one()
p = buf.waituntil(b'\n', 4)
Expand Down Expand Up @@ -540,6 +591,12 @@ def test_waituntil(self):
self.assertEqual(res, b'123\n')
self.assertEqual(b'123\n456', bytes(buf))

def test_waituntil_exc(self):
buf = self._make_one()
buf.set_exception(ValueError())
p = buf.waituntil(b'\n', 4)
self.assertRaises(ValueError, next, p)

def test_skipuntil(self):
buf = self._make_one()
p = buf.skipuntil(b'\n')
Expand All @@ -558,6 +615,12 @@ def test_skipuntil(self):
pass
self.assertEqual(b'', bytes(buf))

def test_skipuntil_exc(self):
buf = self._make_one()
buf.set_exception(ValueError())
p = buf.skipuntil(b'\n')
self.assertRaises(ValueError, next, p)

def test_lines_parser(self):
out = parsers.FlowControlDataQueue(self.stream, loop=self.loop)
buf = self._make_one()
Expand Down

0 comments on commit 457a25e

Please sign in to comment.