diff --git a/fastparquet/api.py b/fastparquet/api.py index 5b646299..54979e3e 100644 --- a/fastparquet/api.py +++ b/fastparquet/api.py @@ -33,7 +33,9 @@ class ParquetFile(object): fn: path/URL string or list of paths Location of the data. If a directory, will attempt to read a file "_metadata" within that directory. If a list of paths, will assume - that they make up a single parquet data set. + that they make up a single parquet data set. This parameter can also + be any file-like object, in which case this must be a single-file + dataset. verify: bool [False] test file start/end byte markers open_with: function @@ -87,6 +89,14 @@ def __init__(self, fn, verify=False, open_with=default_open, self.fn = '_metadata' self.fmd = fmd self._set_attrs() + elif hasattr(fn, 'read'): + # file-like + self._parse_header(fn, verify) + if self.file_scheme not in ['simple', 'empty']: + raise ValueError('Cannot use file-like input ' + 'with multi-file data') + open_with = lambda *args, **kwargs: fn + self.fn = None else: try: fn2 = join_path(fn, '_metadata') diff --git a/fastparquet/compression.py b/fastparquet/compression.py index a5e37e96..bf03dfa5 100644 --- a/fastparquet/compression.py +++ b/fastparquet/compression.py @@ -74,18 +74,32 @@ def lz4_decompress(data, uncompressed_size): except ImportError: pass try: - import zstd + import zstandard def zstd_compress(data, **kwargs): kwargs['write_content_size'] = False - cctx = zstd.ZstdCompressor(**kwargs) - return cctx.compress(data, allow_empty=True) + cctx = zstandard.ZstdCompressor(**kwargs) + return cctx.compress(data) def zstd_decompress(data, uncompressed_size): - dctx = zstd.ZstdDecompressor() + dctx = zstandard.ZstdDecompressor() return dctx.decompress(data, max_output_size=uncompressed_size) compressions['ZSTD'] = zstd_compress decompressions['ZSTD'] = zstd_decompress except ImportError: pass +if 'ZSTD' not in compressions: + try: + import zstd + def zstd_compress(data, **kwargs): + kwargs['write_content_size'] = False + cctx = zstd.ZstdCompressor(**kwargs) + return cctx.compress(data, allow_empty=True) + def zstd_decompress(data, uncompressed_size): + dctx = zstd.ZstdDecompressor() + return dctx.decompress(data, max_output_size=uncompressed_size) + compressions['ZSTD'] = zstd_compress + decompressions['ZSTD'] = zstd_decompress + except ImportError: + pass compressions = {k.upper(): v for k, v in compressions.items()} decompressions = {k.upper(): v for k, v in decompressions.items()} diff --git a/fastparquet/test/test_api.py b/fastparquet/test/test_api.py index 207acbbe..c4233189 100644 --- a/fastparquet/test/test_api.py +++ b/fastparquet/test/test_api.py @@ -1,5 +1,6 @@ from __future__ import unicode_literals +import io import os import numpy as np @@ -142,6 +143,23 @@ def test_open_standard(tempdir): pd.util.testing.assert_frame_equal(d2, df) +def test_filelike(tempdir): + df = pd.DataFrame({'x': [1, 2, 3, 4], + 'y': [1.0, 2.0, 1.0, 2.0], + 'z': ['a', 'b', 'c', 'd']}) + fn = os.path.join(tempdir, 'foo.parquet') + write(fn, df, row_group_offsets=[0, 2]) + with open(fn, 'rb') as f: + pf = ParquetFile(f, open_with=open) + d2 = pf.to_pandas() + pd.util.testing.assert_frame_equal(d2, df) + + b = io.BytesIO(open(fn, 'rb').read()) + pf = ParquetFile(b, open_with=open) + d2 = pf.to_pandas() + pd.util.testing.assert_frame_equal(d2, df) + + def test_cast_index(tempdir): df = pd.DataFrame({'i8': np.array([1, 2, 3, 4], dtype='uint8'), 'i16': np.array([1, 2, 3, 4], dtype='int16'),