Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-2057: [Python] Expose option to configure data page size threshold in parquet.write_table #4597

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp/src/parquet/properties-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ TEST(TestReaderProperties, Basics) {
TEST(TestWriterProperties, Basics) {
std::shared_ptr<WriterProperties> props = WriterProperties::Builder().build();

ASSERT_EQ(DEFAULT_PAGE_SIZE, props->data_pagesize());
ASSERT_EQ(kDefaultDataPageSize, props->data_pagesize());
ASSERT_EQ(DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT, props->dictionary_pagesize_limit());
ASSERT_EQ(DEFAULT_WRITER_VERSION, props->version());
}
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/parquet/properties.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ class PARQUET_EXPORT ReaderProperties {

ReaderProperties PARQUET_EXPORT default_reader_properties();

static constexpr int64_t DEFAULT_PAGE_SIZE = 1024 * 1024;
static constexpr int64_t kDefaultDataPageSize = 1024 * 1024;
static constexpr bool DEFAULT_IS_DICTIONARY_ENABLED = true;
static constexpr int64_t DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT = DEFAULT_PAGE_SIZE;
static constexpr int64_t DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT = kDefaultDataPageSize;
static constexpr int64_t DEFAULT_WRITE_BATCH_SIZE = 1024;
static constexpr int64_t DEFAULT_MAX_ROW_GROUP_LENGTH = 64 * 1024 * 1024;
static constexpr bool DEFAULT_ARE_STATISTICS_ENABLED = true;
Expand Down Expand Up @@ -137,7 +137,7 @@ class PARQUET_EXPORT WriterProperties {
dictionary_pagesize_limit_(DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT),
write_batch_size_(DEFAULT_WRITE_BATCH_SIZE),
max_row_group_length_(DEFAULT_MAX_ROW_GROUP_LENGTH),
pagesize_(DEFAULT_PAGE_SIZE),
pagesize_(kDefaultDataPageSize),
version_(DEFAULT_WRITER_VERSION),
created_by_(DEFAULT_CREATED_BY) {}
virtual ~Builder() {}
Expand Down
29 changes: 28 additions & 1 deletion docs/source/python/parquet.rst
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,23 @@ In general, a Python file object will have the worst read performance, while a
string file path or an instance of :class:`~.NativeFile` (especially memory
maps) will perform the best.

Parquet file writing options
~~~~~~~~~~~~~~~~~~~~~~~~~~~~

:func:`~pyarrow.parquet.write_table()` has a number of options to
control various settings when writing a Parquet file.

* ``version``, the Parquet format version to use, whether ``'1.0'``
for compatibility with older readers, or ``'2.0'`` to unlock more
recent features.
* ``data_page_size``, to control the approximate size of encoded data
pages within a column chunk. This currently defaults to 1MB
* ``flavor``, to set compatibility options particular to a Parquet
consumer like ``'spark'`` for Apache Spark.

There are some additional data type handling-specific options
described below.

Omitting the DataFrame index
~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Expand Down Expand Up @@ -214,6 +231,16 @@ an exception will be raised. This can be suppressed by passing
pq.write_table(table, where, coerce_timestamps='ms',
allow_truncated_timestamps=True)

Older Parquet implementations use ``INT96`` based storage of
timestamps, but this is now deprecated. This includes some older
versions of Apache Impala and Apache Spark. To write timestamps in
this format, set the ``use_deprecated_int96_timestamps`` option to
``True`` in ``write_table``.

.. code-block:: python

pq.write_table(table, where, use_deprecated_int96_timestamps=True)

Compression, Encoding, and File Compatibility
---------------------------------------------

Expand Down Expand Up @@ -351,7 +378,7 @@ Each of the reading functions by default use multi-threading for reading
columns in parallel. Depending on the speed of IO
and how expensive it is to decode the columns in a particular file
(particularly with GZIP compression), this can yield significantly higher data
throughput.
throughput.

This can be disabled by specifying ``use_threads=False``.

Expand Down
2 changes: 2 additions & 0 deletions python/pyarrow/_parquet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,8 @@ cdef extern from "parquet/api/writer.h" namespace "parquet" nogil:
Builder* disable_dictionary()
Builder* enable_dictionary()
Builder* enable_dictionary(const c_string& path)
Builder* data_pagesize(int64_t size)
Builder* write_batch_size(int64_t batch_size)
shared_ptr[WriterProperties] build()


Expand Down
7 changes: 7 additions & 0 deletions python/pyarrow/_parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1007,12 +1007,14 @@ cdef class ParquetWriter:
object compression
object version
int row_group_size
int64_t data_page_size

def __cinit__(self, where, Schema schema, use_dictionary=None,
compression=None, version=None,
MemoryPool memory_pool=None,
use_deprecated_int96_timestamps=False,
coerce_timestamps=None,
data_page_size=None,
allow_truncated_timestamps=False):
cdef:
shared_ptr[WriterProperties] properties
Expand Down Expand Up @@ -1042,12 +1044,17 @@ cdef class ParquetWriter:
self._set_version(&properties_builder)
self._set_compression_props(&properties_builder)
self._set_dictionary_props(&properties_builder)

if data_page_size is not None:
properties_builder.data_pagesize(data_page_size)

properties = properties_builder.build()

cdef ArrowWriterProperties.Builder arrow_properties_builder
self._set_int96_support(&arrow_properties_builder)
self._set_coerce_timestamps(&arrow_properties_builder)
self._set_allow_truncated_timestamps(&arrow_properties_builder)

arrow_properties = arrow_properties_builder.build()

pool = maybe_unbox_memory_pool(memory_pool)
Expand Down
15 changes: 11 additions & 4 deletions python/pyarrow/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,10 @@ def _sanitize_table(table, new_schema, flavor):
coerce_timestamps : string, default None
Cast timestamps a particular resolution.
Valid values: {None, 'ms', 'us'}
data_page_size : int, default None
Set a target threshhold for the approximate encoded size of data
pages within a column chunk. If None, use the default data page
size of 1MByte.
allow_truncated_timestamps : boolean, default False
Allow loss of data when coercing timestamps to a particular
resolution. E.g. if microsecond or nanosecond data is lost when coercing to
Expand All @@ -326,7 +330,8 @@ def _sanitize_table(table, new_schema, flavor):
Specify the compression codec, either on a general basis or per-column.
Valid values: {'NONE', 'SNAPPY', 'GZIP', 'LZO', 'BROTLI', 'LZ4', 'ZSTD'}
flavor : {'spark'}, default None
Sanitize schema or set other compatibility options for compatibility
Sanitize schema or set other compatibility options to work with
various target systems
filesystem : FileSystem, default None
If nothing passed, will be inferred from `where` if path-like, else
`where` is already a file-like object so no filesystem is needed."""
Expand Down Expand Up @@ -1238,7 +1243,8 @@ def write_table(table, where, row_group_size=None, version='1.0',
use_deprecated_int96_timestamps=None,
coerce_timestamps=None,
allow_truncated_timestamps=False,
flavor=None, filesystem=None, **kwargs):
data_page_size=None, flavor=None,
filesystem=None, **kwargs):
row_group_size = kwargs.pop('chunk_size', row_group_size)
use_int96 = use_deprecated_int96_timestamps
try:
Expand All @@ -1249,6 +1255,7 @@ def write_table(table, where, row_group_size=None, version='1.0',
flavor=flavor,
use_dictionary=use_dictionary,
coerce_timestamps=coerce_timestamps,
data_page_size=data_page_size,
allow_truncated_timestamps=allow_truncated_timestamps,
compression=compression,
use_deprecated_int96_timestamps=use_int96,
Expand Down Expand Up @@ -1316,8 +1323,8 @@ def write_to_dataset(table, root_path, partition_cols=None, filesystem=None,
**kwargs : dict,
kwargs for write_table function. Using `metadata_collector` in
kwargs allows one to collect the file metadata instances of
dataset pieces. See `ParquetWriter.__doc__` for more
information.
dataset pieces. See docstring for `write_table` or
`ParquetWriter` for more information.
"""
if preserve_index is not None:
warnings.warn('preserve_index argument is deprecated as of 0.13.0 and '
Expand Down
23 changes: 17 additions & 6 deletions python/pyarrow/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,28 +73,29 @@ def _read_table(*args, **kwargs):


def _roundtrip_table(table, read_table_kwargs=None,
**params):
write_table_kwargs=None):
read_table_kwargs = read_table_kwargs or {}
write_table_kwargs = write_table_kwargs or {}

buf = io.BytesIO()
_write_table(table, buf, **params)
_write_table(table, buf, **write_table_kwargs)
buf.seek(0)
return _read_table(buf, **read_table_kwargs)


def _check_roundtrip(table, expected=None, read_table_kwargs=None,
**params):
**write_table_kwargs):
if expected is None:
expected = table

read_table_kwargs = read_table_kwargs or {}

# intentionally check twice
result = _roundtrip_table(table, read_table_kwargs=read_table_kwargs,
**params)
write_table_kwargs=write_table_kwargs)
assert result.equals(expected)
result = _roundtrip_table(result, read_table_kwargs=read_table_kwargs,
**params)
write_table_kwargs=write_table_kwargs)
assert result.equals(expected)


Expand Down Expand Up @@ -174,6 +175,16 @@ def test_pandas_parquet_2_0_rountrip(tempdir, chunk_size):
tm.assert_frame_equal(df, df_read, check_categorical=False)


def test_set_data_page_size():
arr = pa.array([1, 2, 3] * 1000000)
t = pa.Table.from_arrays([arr], names=['f0'])

# 128K, 256K, 512K
page_sizes = [2 << 16, 2 << 17, 2 << 18]
for target_page_size in page_sizes:
_check_roundtrip(t, data_page_size=target_page_size)


@pytest.mark.pandas
def test_chunked_table_write():
# ARROW-232
Expand Down Expand Up @@ -1013,7 +1024,7 @@ def test_sanitized_spark_field_names():
name = 'prohib; ,\t{}'
table = pa.Table.from_arrays([a0], [name])

result = _roundtrip_table(table, flavor='spark')
result = _roundtrip_table(table, write_table_kwargs={'flavor': 'spark'})

expected_name = 'prohib______'
assert result.schema[0].name == expected_name
Expand Down