From 8a0093b778c4369a2d67343ae91722164b20f8bd Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Fri, 27 Apr 2018 09:34:08 -0400 Subject: [PATCH 1/4] Allow reading from any file-like --- fastparquet/api.py | 12 +++++++++++- fastparquet/test/test_api.py | 18 ++++++++++++++++++ 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/fastparquet/api.py b/fastparquet/api.py index 5b646299..54979e3e 100644 --- a/fastparquet/api.py +++ b/fastparquet/api.py @@ -33,7 +33,9 @@ class ParquetFile(object): fn: path/URL string or list of paths Location of the data. If a directory, will attempt to read a file "_metadata" within that directory. If a list of paths, will assume - that they make up a single parquet data set. + that they make up a single parquet data set. This parameter can also + be any file-like object, in which case this must be a single-file + dataset. verify: bool [False] test file start/end byte markers open_with: function @@ -87,6 +89,14 @@ def __init__(self, fn, verify=False, open_with=default_open, self.fn = '_metadata' self.fmd = fmd self._set_attrs() + elif hasattr(fn, 'read'): + # file-like + self._parse_header(fn, verify) + if self.file_scheme not in ['simple', 'empty']: + raise ValueError('Cannot use file-like input ' + 'with multi-file data') + open_with = lambda *args, **kwargs: fn + self.fn = None else: try: fn2 = join_path(fn, '_metadata') diff --git a/fastparquet/test/test_api.py b/fastparquet/test/test_api.py index 207acbbe..c4233189 100644 --- a/fastparquet/test/test_api.py +++ b/fastparquet/test/test_api.py @@ -1,5 +1,6 @@ from __future__ import unicode_literals +import io import os import numpy as np @@ -142,6 +143,23 @@ def test_open_standard(tempdir): pd.util.testing.assert_frame_equal(d2, df) +def test_filelike(tempdir): + df = pd.DataFrame({'x': [1, 2, 3, 4], + 'y': [1.0, 2.0, 1.0, 2.0], + 'z': ['a', 'b', 'c', 'd']}) + fn = os.path.join(tempdir, 'foo.parquet') + write(fn, df, row_group_offsets=[0, 2]) + with open(fn, 'rb') as f: + pf = ParquetFile(f, open_with=open) + d2 = pf.to_pandas() + pd.util.testing.assert_frame_equal(d2, df) + + b = io.BytesIO(open(fn, 'rb').read()) + pf = ParquetFile(b, open_with=open) + d2 = pf.to_pandas() + pd.util.testing.assert_frame_equal(d2, df) + + def test_cast_index(tempdir): df = pd.DataFrame({'i8': np.array([1, 2, 3, 4], dtype='uint8'), 'i16': np.array([1, 2, 3, 4], dtype='int16'), From db5c82f32f2cbbbdff1fd293eb8482bb6ba15293 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Fri, 27 Apr 2018 09:46:52 -0400 Subject: [PATCH 2/4] new zstd --- fastparquet/compression.py | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/fastparquet/compression.py b/fastparquet/compression.py index a5e37e96..4fad32f5 100644 --- a/fastparquet/compression.py +++ b/fastparquet/compression.py @@ -74,18 +74,32 @@ def lz4_decompress(data, uncompressed_size): except ImportError: pass try: - import zstd + import zstandard def zstd_compress(data, **kwargs): kwargs['write_content_size'] = False cctx = zstd.ZstdCompressor(**kwargs) - return cctx.compress(data, allow_empty=True) + return cctx.compress(data) def zstd_decompress(data, uncompressed_size): dctx = zstd.ZstdDecompressor() - return dctx.decompress(data, max_output_size=uncompressed_size) + return dctx.decompress(data) compressions['ZSTD'] = zstd_compress decompressions['ZSTD'] = zstd_decompress except ImportError: pass +if 'ZSTD' not in compressions: + try: + import zstd + def zstd_compress(data, **kwargs): + kwargs['write_content_size'] = False + cctx = zstd.ZstdCompressor(**kwargs) + return cctx.compress(data, allow_empty=True) + def zstd_decompress(data, uncompressed_size): + dctx = zstd.ZstdDecompressor() + return dctx.decompress(data, max_output_size=uncompressed_size) + compressions['ZSTD'] = zstd_compress + decompressions['ZSTD'] = zstd_decompress + except ImportError: + pass compressions = {k.upper(): v for k, v in compressions.items()} decompressions = {k.upper(): v for k, v in decompressions.items()} From a823aa714fae3a905e4d76c1d0ea4388ab3c195e Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Fri, 27 Apr 2018 09:51:35 -0400 Subject: [PATCH 3/4] fix zstandard --- fastparquet/compression.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fastparquet/compression.py b/fastparquet/compression.py index 4fad32f5..47248162 100644 --- a/fastparquet/compression.py +++ b/fastparquet/compression.py @@ -77,10 +77,10 @@ def lz4_decompress(data, uncompressed_size): import zstandard def zstd_compress(data, **kwargs): kwargs['write_content_size'] = False - cctx = zstd.ZstdCompressor(**kwargs) + cctx = zstandard.ZstdCompressor(**kwargs) return cctx.compress(data) def zstd_decompress(data, uncompressed_size): - dctx = zstd.ZstdDecompressor() + dctx = zstandard.ZstdDecompressor() return dctx.decompress(data) compressions['ZSTD'] = zstd_compress decompressions['ZSTD'] = zstd_decompress From 09c7d81bf5ba5053af372524eebbe2ea8898ebd4 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Fri, 27 Apr 2018 10:08:57 -0400 Subject: [PATCH 4/4] reinstate option --- fastparquet/compression.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fastparquet/compression.py b/fastparquet/compression.py index 47248162..bf03dfa5 100644 --- a/fastparquet/compression.py +++ b/fastparquet/compression.py @@ -81,7 +81,7 @@ def zstd_compress(data, **kwargs): return cctx.compress(data) def zstd_decompress(data, uncompressed_size): dctx = zstandard.ZstdDecompressor() - return dctx.decompress(data) + return dctx.decompress(data, max_output_size=uncompressed_size) compressions['ZSTD'] = zstd_compress decompressions['ZSTD'] = zstd_decompress except ImportError: