Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow reading from any file-like #330

Merged
merged 4 commits into from
Apr 27, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion fastparquet/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ class ParquetFile(object):
fn: path/URL string or list of paths
Location of the data. If a directory, will attempt to read a file
"_metadata" within that directory. If a list of paths, will assume
that they make up a single parquet data set.
that they make up a single parquet data set. This parameter can also
be any file-like object, in which case this must be a single-file
dataset.
verify: bool [False]
test file start/end byte markers
open_with: function
Expand Down Expand Up @@ -87,6 +89,14 @@ def __init__(self, fn, verify=False, open_with=default_open,
self.fn = '_metadata'
self.fmd = fmd
self._set_attrs()
elif hasattr(fn, 'read'):
# file-like
self._parse_header(fn, verify)
if self.file_scheme not in ['simple', 'empty']:
raise ValueError('Cannot use file-like input '
'with multi-file data')
open_with = lambda *args, **kwargs: fn
self.fn = None
else:
try:
fn2 = join_path(fn, '_metadata')
Expand Down
22 changes: 18 additions & 4 deletions fastparquet/compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,18 +74,32 @@ def lz4_decompress(data, uncompressed_size):
except ImportError:
pass
try:
import zstd
import zstandard
def zstd_compress(data, **kwargs):
kwargs['write_content_size'] = False
cctx = zstd.ZstdCompressor(**kwargs)
return cctx.compress(data, allow_empty=True)
cctx = zstandard.ZstdCompressor(**kwargs)
return cctx.compress(data)
def zstd_decompress(data, uncompressed_size):
dctx = zstd.ZstdDecompressor()
dctx = zstandard.ZstdDecompressor()
return dctx.decompress(data, max_output_size=uncompressed_size)
compressions['ZSTD'] = zstd_compress
decompressions['ZSTD'] = zstd_decompress
except ImportError:
pass
if 'ZSTD' not in compressions:
try:
import zstd
def zstd_compress(data, **kwargs):
kwargs['write_content_size'] = False
cctx = zstd.ZstdCompressor(**kwargs)
return cctx.compress(data, allow_empty=True)
def zstd_decompress(data, uncompressed_size):
dctx = zstd.ZstdDecompressor()
return dctx.decompress(data, max_output_size=uncompressed_size)
compressions['ZSTD'] = zstd_compress
decompressions['ZSTD'] = zstd_decompress
except ImportError:
pass

compressions = {k.upper(): v for k, v in compressions.items()}
decompressions = {k.upper(): v for k, v in decompressions.items()}
Expand Down
18 changes: 18 additions & 0 deletions fastparquet/test/test_api.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import unicode_literals

import io
import os

import numpy as np
Expand Down Expand Up @@ -142,6 +143,23 @@ def test_open_standard(tempdir):
pd.util.testing.assert_frame_equal(d2, df)


def test_filelike(tempdir):
df = pd.DataFrame({'x': [1, 2, 3, 4],
'y': [1.0, 2.0, 1.0, 2.0],
'z': ['a', 'b', 'c', 'd']})
fn = os.path.join(tempdir, 'foo.parquet')
write(fn, df, row_group_offsets=[0, 2])
with open(fn, 'rb') as f:
pf = ParquetFile(f, open_with=open)
d2 = pf.to_pandas()
pd.util.testing.assert_frame_equal(d2, df)

b = io.BytesIO(open(fn, 'rb').read())
pf = ParquetFile(b, open_with=open)
d2 = pf.to_pandas()
pd.util.testing.assert_frame_equal(d2, df)


def test_cast_index(tempdir):
df = pd.DataFrame({'i8': np.array([1, 2, 3, 4], dtype='uint8'),
'i16': np.array([1, 2, 3, 4], dtype='int16'),
Expand Down