Skip to content

Commit

Permalink
cffi: consume remaining data in input buffer
Browse files Browse the repository at this point in the history
Before, the CFFI implementation of ZstdDecompressionReader.read()
may skip input data left over in an internal buffer. This would
result in feeding incorrect input into the decompressor, which would
likely manifest as a malformed zstd data error.

As part of fixing this, we added a test to reproduce the failure.
And we also improved the fuzzing coverage of this method.

Closes #71.
  • Loading branch information
indygreg committed Feb 17, 2019
1 parent 5d9af8f commit 8eb4029
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 14 deletions.
10 changes: 10 additions & 0 deletions NEWS.rst
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,19 @@ Backwards Compatibility Nodes
``import zstandard`` to cause an appropriate backend module to be loaded
automatically.

Bug Fixes
---------

* CFFI backend could encounter an error when calling
``ZstdDecompressionReader.read()`` if there was data remaining in an
internal buffer. The issue has been fixed. (#71)

Changes
-------

* CFFI's ``ZstdDecompressionReader.read()`` now properly handles data
remaining in any internal buffer. Before, repeated ``read()`` could
result in *random* errors. #71.
* Upgraded various Python packages in CI environment.
* Upgrade to hypothesis 4.5.11.
* In the CFFI backend, ``CompressionReader`` and ``DecompressionReader``
Expand Down
14 changes: 12 additions & 2 deletions tests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,13 @@ def random_input_data():
except OSError:
pass

# Also add some actual random data.
_source_files.append(os.urandom(100))
_source_files.append(os.urandom(1000))
_source_files.append(os.urandom(10000))
_source_files.append(os.urandom(100000))
_source_files.append(os.urandom(1000000))

return _source_files


Expand All @@ -165,11 +172,14 @@ def generate_samples():


if hypothesis:
default_settings = hypothesis.settings()
default_settings = hypothesis.settings(deadline=1000)
hypothesis.settings.register_profile('default', default_settings)

ci_settings = hypothesis.settings(max_examples=2500)
ci_settings = hypothesis.settings(deadline=10000, max_examples=2500)
hypothesis.settings.register_profile('ci', ci_settings)

expensive_settings = hypothesis.settings(deadline=None, max_examples=10000)
hypothesis.settings.register_profile('expensive', expensive_settings)

hypothesis.settings.load_profile(
os.environ.get('HYPOTHESIS_PROFILE', 'default'))
17 changes: 17 additions & 0 deletions tests/test_decompressor.py
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,23 @@ def test_read_after_error(self):
with self.assertRaisesRegexp(ValueError, 'stream is closed'):
reader.read(100)

def test_partial_read(self):
# Inspired by https://github.com/indygreg/python-zstandard/issues/71.
buffer = io.BytesIO()
cctx = zstd.ZstdCompressor()
writer = cctx.stream_writer(buffer)
writer.write(bytearray(os.urandom(1000000)))
writer.flush(zstd.FLUSH_FRAME)
buffer.seek(0)

dctx = zstd.ZstdDecompressor()
reader = dctx.stream_reader(buffer)

while True:
chunk = reader.read(8192)
if not chunk:
break


@make_cffi
class TestDecompressor_decompressobj(unittest.TestCase):
Expand Down
111 changes: 99 additions & 12 deletions tests/test_decompressor_fuzzing.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,29 @@ class TestDecompressor_stream_reader_fuzzing(unittest.TestCase):
suppress_health_check=[hypothesis.HealthCheck.large_base_example])
@hypothesis.given(original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
source_read_size=strategies.integers(1, 16384),
streaming=strategies.booleans(),
source_read_size=strategies.integers(1, 1048576),
read_sizes=strategies.data())
def test_stream_source_read_variance(self, original, level, source_read_size,
read_sizes):
def test_stream_source_read_variance(self, original, level, streaming,
source_read_size, read_sizes):
cctx = zstd.ZstdCompressor(level=level)
frame = cctx.compress(original)

if streaming:
source = io.BytesIO()
writer = cctx.stream_writer(source)
writer.write(original)
writer.flush(zstd.FLUSH_FRAME)
source.seek(0)
else:
frame = cctx.compress(original)
source = io.BytesIO(frame)

dctx = zstd.ZstdDecompressor()
source = io.BytesIO(frame)

chunks = []
with dctx.stream_reader(source, read_size=source_read_size) as reader:
while True:
read_size = read_sizes.draw(strategies.integers(1, 16384))
read_size = read_sizes.draw(strategies.integers(1, 131072))
chunk = reader.read(read_size)
if not chunk:
break
Expand All @@ -46,23 +55,67 @@ def test_stream_source_read_variance(self, original, level, source_read_size,

self.assertEqual(b''.join(chunks), original)

# Similar to above except we have a constant read() size.
@hypothesis.settings(
suppress_health_check=[hypothesis.HealthCheck.large_base_example])
@hypothesis.given(original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
streaming=strategies.booleans(),
source_read_size=strategies.integers(1, 1048576),
read_size=strategies.integers(1, 131072))
def test_stream_source_read_size(self, original, level, streaming,
source_read_size, read_size):
cctx = zstd.ZstdCompressor(level=level)

if streaming:
source = io.BytesIO()
writer = cctx.stream_writer(source)
writer.write(original)
writer.flush(zstd.FLUSH_FRAME)
source.seek(0)
else:
frame = cctx.compress(original)
source = io.BytesIO(frame)

dctx = zstd.ZstdDecompressor()

chunks = []
reader = dctx.stream_reader(source, read_size=source_read_size)
while True:
chunk = reader.read(read_size)
if not chunk:
break

chunks.append(chunk)

self.assertEqual(b''.join(chunks), original)

@hypothesis.settings(
suppress_health_check=[hypothesis.HealthCheck.large_base_example])
@hypothesis.given(original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
source_read_size=strategies.integers(1, 16384),
streaming=strategies.booleans(),
source_read_size=strategies.integers(1, 1048576),
read_sizes=strategies.data())
def test_buffer_source_read_variance(self, original, level, source_read_size,
read_sizes):
def test_buffer_source_read_variance(self, original, level, streaming,
source_read_size, read_sizes):
cctx = zstd.ZstdCompressor(level=level)
frame = cctx.compress(original)

if streaming:
source = io.BytesIO()
writer = cctx.stream_writer(source)
writer.write(original)
writer.flush(zstd.FLUSH_FRAME)
frame = source.getvalue()
else:
frame = cctx.compress(original)

dctx = zstd.ZstdDecompressor()
chunks = []

with dctx.stream_reader(frame, read_size=source_read_size) as reader:
while True:
read_size = read_sizes.draw(strategies.integers(1, 16384))
read_size = read_sizes.draw(strategies.integers(1, 131072))
chunk = reader.read(read_size)
if not chunk:
break
Expand All @@ -71,12 +124,46 @@ def test_buffer_source_read_variance(self, original, level, source_read_size,

self.assertEqual(b''.join(chunks), original)

# Similar to above except we have a constant read() size.
@hypothesis.settings(
suppress_health_check=[hypothesis.HealthCheck.large_base_example])
@hypothesis.given(original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
streaming=strategies.booleans(),
source_read_size=strategies.integers(1, 1048576),
read_size=strategies.integers(1, 131072))
def test_buffer_source_constant_read_size(self, original, level, streaming,
source_read_size, read_size):
cctx = zstd.ZstdCompressor(level=level)

if streaming:
source = io.BytesIO()
writer = cctx.stream_writer(source)
writer.write(original)
writer.flush(zstd.FLUSH_FRAME)
frame = source.getvalue()
else:
frame = cctx.compress(original)

dctx = zstd.ZstdDecompressor()
chunks = []

reader = dctx.stream_reader(frame, read_size=source_read_size)
while True:
chunk = reader.read(read_size)
if not chunk:
break

chunks.append(chunk)

self.assertEqual(b''.join(chunks), original)

@hypothesis.settings(
suppress_health_check=[hypothesis.HealthCheck.large_base_example])
@hypothesis.given(
original=strategies.sampled_from(random_input_data()),
level=strategies.integers(min_value=1, max_value=5),
source_read_size=strategies.integers(1, 16384),
source_read_size=strategies.integers(1, 1048576),
seek_amounts=strategies.data(),
read_sizes=strategies.data())
def test_relative_seeks(self, original, level, source_read_size, seek_amounts,
Expand Down
6 changes: 6 additions & 0 deletions zstandard/cffi.py
Original file line number Diff line number Diff line change
Expand Up @@ -1697,9 +1697,15 @@ def decompress():
return ffi.buffer(out_buffer.dst, out_buffer.pos)[:]

def get_input():
# We have data left over in the input buffer. Use it.
if self._in_buffer.pos < self._in_buffer.size:
return

# All input data exhausted. Nothing to do.
if self._finished_input:
return

# Else populate the input buffer from our source.
if hasattr(self._source, 'read'):
data = self._source.read(self._read_size)

Expand Down

0 comments on commit 8eb4029

Please sign in to comment.