Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-36284: [Python][Parquet] Support write page index in Python API #36290

Merged
merged 10 commits into from
Jul 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion python/pyarrow/_parquet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,10 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil:
c_bool encrypted_with_footer_key() const
const c_string& key_metadata() const

cdef cppclass ParquetIndexLocation" parquet::IndexLocation":
int64_t offset
int32_t length

cdef cppclass CColumnChunkMetaData" parquet::ColumnChunkMetaData":
int64_t file_offset() const
const c_string& file_path() const
Expand All @@ -321,6 +325,8 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil:
int64_t total_compressed_size() const
int64_t total_uncompressed_size() const
unique_ptr[CColumnCryptoMetaData] crypto_metadata() const
optional[ParquetIndexLocation] GetColumnIndexLocation() const
optional[ParquetIndexLocation] GetOffsetIndexLocation() const

cdef cppclass CRowGroupMetaData" parquet::RowGroupMetaData":
c_bool Equals(const CRowGroupMetaData&) const
Expand Down Expand Up @@ -420,6 +426,8 @@ cdef extern from "parquet/api/writer.h" namespace "parquet" nogil:
Builder* max_row_group_length(int64_t size)
Builder* write_batch_size(int64_t batch_size)
Builder* dictionary_pagesize_limit(int64_t dictionary_pagesize_limit)
Builder* enable_write_page_index()
Builder* disable_write_page_index()
shared_ptr[WriterProperties] build()

cdef cppclass ArrowWriterProperties:
Expand Down Expand Up @@ -567,7 +575,8 @@ cdef shared_ptr[WriterProperties] _create_writer_properties(
data_page_version=*,
FileEncryptionProperties encryption_properties=*,
write_batch_size=*,
dictionary_pagesize_limit=*) except *
dictionary_pagesize_limit=*,
write_page_index=*) except *


cdef shared_ptr[ArrowWriterProperties] _create_arrow_writer_properties(
Expand Down
30 changes: 25 additions & 5 deletions python/pyarrow/_parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -462,15 +462,15 @@ cdef class ColumnChunkMetaData(_Weakrefable):

@property
def dictionary_page_offset(self):
"""Offset of dictionary page reglative to column chunk offset (int)."""
"""Offset of dictionary page relative to column chunk offset (int)."""
if self.has_dictionary_page:
return self.metadata.dictionary_page_offset()
else:
return None

@property
def data_page_offset(self):
"""Offset of data page reglative to column chunk offset (int)."""
"""Offset of data page relative to column chunk offset (int)."""
return self.metadata.data_page_offset()

@property
Expand All @@ -493,6 +493,16 @@ cdef class ColumnChunkMetaData(_Weakrefable):
"""Uncompressed size in bytes (int)."""
return self.metadata.total_uncompressed_size()

@property
def has_offset_index(self):
"""Whether the column chunk has an offset index"""
return self.metadata.GetOffsetIndexLocation().has_value()

@property
def has_column_index(self):
"""Whether the column chunk has a column index"""
return self.metadata.GetColumnIndexLocation().has_value()


cdef class RowGroupMetaData(_Weakrefable):
"""Metadata for a single row group."""
Expand Down Expand Up @@ -1455,7 +1465,8 @@ cdef shared_ptr[WriterProperties] _create_writer_properties(
data_page_version=None,
FileEncryptionProperties encryption_properties=None,
write_batch_size=None,
dictionary_pagesize_limit=None) except *:
dictionary_pagesize_limit=None,
write_page_index=False) except *:
"""General writer properties"""
cdef:
shared_ptr[WriterProperties] properties
Expand Down Expand Up @@ -1599,6 +1610,13 @@ cdef shared_ptr[WriterProperties] _create_writer_properties(
# a size larger than this then it will be latched to this value.
props.max_row_group_length(_MAX_ROW_GROUP_SIZE)

# page index

if write_page_index:
props.enable_write_page_index()
else:
props.disable_write_page_index()

properties = props.build()

return properties
Expand Down Expand Up @@ -1710,7 +1728,8 @@ cdef class ParquetWriter(_Weakrefable):
encryption_properties=None,
write_batch_size=None,
dictionary_pagesize_limit=None,
store_schema=True):
store_schema=True,
write_page_index=False):
cdef:
shared_ptr[WriterProperties] properties
shared_ptr[ArrowWriterProperties] arrow_properties
Expand Down Expand Up @@ -1740,7 +1759,8 @@ cdef class ParquetWriter(_Weakrefable):
data_page_version=data_page_version,
encryption_properties=encryption_properties,
write_batch_size=write_batch_size,
dictionary_pagesize_limit=dictionary_pagesize_limit
dictionary_pagesize_limit=dictionary_pagesize_limit,
write_page_index=write_page_index
)
arrow_properties = _create_arrow_writer_properties(
use_deprecated_int96_timestamps=use_deprecated_int96_timestamps,
Expand Down
11 changes: 11 additions & 0 deletions python/pyarrow/parquet/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -874,6 +874,13 @@ def _sanitize_table(table, new_schema, flavor):
it will restore the timezone (Parquet only stores the UTC values without
timezone), or columns with duration type will be restored from the int64
Parquet column.
write_page_index : bool, default False
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side question: should we consider making this turned on by default at some point?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently not, I found it's hard to implement page index pruning in current implementions. If we implements it, maybe we can change it to default.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if it's not used already, it would probably be beneficial to write files with the index enabled, for future use.
Is there a performance issue with enabling it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess most time there is no performance issue. But when user has extremly long string, we might write to much data.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are allowed to trim the min/max values, right?

Copy link
Member Author

@mapleFU mapleFU Jul 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Here it will "discard" too long statistics, and discard the page index. I will implement truncate in the future

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if I understand correctly, we are currently not yet using the PageIndex when reading files (through the python APIs) for pruning pages when given a filter?

Should we mention that in the docstring to note that you can already write a PageIndex, but it will not yet be used when reading using pyarrow?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I'll

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jorisvandenbossche I've done that. By the way, we cannot filter using pyarrow, but parquet-rs and parquet-mr can optimize by it.

Whether to write a page index in general for all columns.
Writing statistics to the page index disables the old method of writing
statistics to each data page header. The page index makes statistics-based
filtering more efficient than the page header, as it gathers all the
statistics for a Parquet file in a single place, avoiding scattered I/O.
Note that the page index is not yet used on the read size by PyArrow.
"""

_parquet_writer_example_doc = """\
Expand Down Expand Up @@ -966,6 +973,7 @@ def __init__(self, where, schema, filesystem=None,
write_batch_size=None,
dictionary_pagesize_limit=None,
store_schema=True,
write_page_index=False,
**options):
if use_deprecated_int96_timestamps is None:
# Use int96 timestamps for Spark
Expand Down Expand Up @@ -1022,6 +1030,7 @@ def __init__(self, where, schema, filesystem=None,
write_batch_size=write_batch_size,
dictionary_pagesize_limit=dictionary_pagesize_limit,
store_schema=store_schema,
write_page_index=write_page_index,
**options)
self.is_open = True

Expand Down Expand Up @@ -3084,6 +3093,7 @@ def write_table(table, where, row_group_size=None, version='2.6',
write_batch_size=None,
dictionary_pagesize_limit=None,
store_schema=True,
write_page_index=False,
**kwargs):
# Implementor's note: when adding keywords here / updating defaults, also
# update it in write_to_dataset and _dataset_parquet.pyx ParquetFileWriteOptions
Expand Down Expand Up @@ -3111,6 +3121,7 @@ def write_table(table, where, row_group_size=None, version='2.6',
write_batch_size=write_batch_size,
dictionary_pagesize_limit=dictionary_pagesize_limit,
store_schema=store_schema,
write_page_index=write_page_index,
**kwargs) as writer:
writer.write_table(table, row_group_size=row_group_size)
except Exception:
Expand Down
15 changes: 15 additions & 0 deletions python/pyarrow/tests/parquet/test_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,21 @@ def test_field_id_metadata():
assert schema[5].metadata[field_id] == b'-1000'


def test_parquet_file_page_index():
for write_page_index in (False, True):
table = pa.table({'a': [1, 2, 3]})

writer = pa.BufferOutputStream()
_write_table(table, writer, write_page_index=write_page_index)
reader = pa.BufferReader(writer.getvalue())

# Can retrieve sorting columns from metadata
metadata = pq.read_metadata(reader)
cc = metadata.row_group(0).column(0)
assert cc.has_offset_index is write_page_index
assert cc.has_column_index is write_page_index


@pytest.mark.pandas
def test_multi_dataset_metadata(tempdir):
filenames = ["ARROW-1983-dataset.0", "ARROW-1983-dataset.1"]
Expand Down