Skip to content

Commit

Permalink
GH-36284: [Python][Parquet] Support write page index in Python API (#…
Browse files Browse the repository at this point in the history
…36290)

### Rationale for this change

Support `write_page_index` in Parquet Python API

### What changes are included in this PR?

support `write_page_index` in properties

### Are these changes tested?

Currently not

### Are there any user-facing changes?

User can generate page index here.

* Closes: #36284

Lead-authored-by: mwish <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Co-authored-by: mwish <[email protected]>
Co-authored-by: Alenka Frim <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
  • Loading branch information
4 people authored Jul 10, 2023
1 parent 087249b commit 12f45ba
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 6 deletions.
11 changes: 10 additions & 1 deletion python/pyarrow/_parquet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,10 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil:
c_bool encrypted_with_footer_key() const
const c_string& key_metadata() const

cdef cppclass ParquetIndexLocation" parquet::IndexLocation":
int64_t offset
int32_t length

cdef cppclass CColumnChunkMetaData" parquet::ColumnChunkMetaData":
int64_t file_offset() const
const c_string& file_path() const
Expand All @@ -321,6 +325,8 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil:
int64_t total_compressed_size() const
int64_t total_uncompressed_size() const
unique_ptr[CColumnCryptoMetaData] crypto_metadata() const
optional[ParquetIndexLocation] GetColumnIndexLocation() const
optional[ParquetIndexLocation] GetOffsetIndexLocation() const

cdef cppclass CRowGroupMetaData" parquet::RowGroupMetaData":
c_bool Equals(const CRowGroupMetaData&) const
Expand Down Expand Up @@ -420,6 +426,8 @@ cdef extern from "parquet/api/writer.h" namespace "parquet" nogil:
Builder* max_row_group_length(int64_t size)
Builder* write_batch_size(int64_t batch_size)
Builder* dictionary_pagesize_limit(int64_t dictionary_pagesize_limit)
Builder* enable_write_page_index()
Builder* disable_write_page_index()
shared_ptr[WriterProperties] build()

cdef cppclass ArrowWriterProperties:
Expand Down Expand Up @@ -567,7 +575,8 @@ cdef shared_ptr[WriterProperties] _create_writer_properties(
data_page_version=*,
FileEncryptionProperties encryption_properties=*,
write_batch_size=*,
dictionary_pagesize_limit=*) except *
dictionary_pagesize_limit=*,
write_page_index=*) except *


cdef shared_ptr[ArrowWriterProperties] _create_arrow_writer_properties(
Expand Down
30 changes: 25 additions & 5 deletions python/pyarrow/_parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -462,15 +462,15 @@ cdef class ColumnChunkMetaData(_Weakrefable):

@property
def dictionary_page_offset(self):
"""Offset of dictionary page reglative to column chunk offset (int)."""
"""Offset of dictionary page relative to column chunk offset (int)."""
if self.has_dictionary_page:
return self.metadata.dictionary_page_offset()
else:
return None

@property
def data_page_offset(self):
"""Offset of data page reglative to column chunk offset (int)."""
"""Offset of data page relative to column chunk offset (int)."""
return self.metadata.data_page_offset()

@property
Expand All @@ -493,6 +493,16 @@ cdef class ColumnChunkMetaData(_Weakrefable):
"""Uncompressed size in bytes (int)."""
return self.metadata.total_uncompressed_size()

@property
def has_offset_index(self):
"""Whether the column chunk has an offset index"""
return self.metadata.GetOffsetIndexLocation().has_value()

@property
def has_column_index(self):
"""Whether the column chunk has a column index"""
return self.metadata.GetColumnIndexLocation().has_value()


cdef class RowGroupMetaData(_Weakrefable):
"""Metadata for a single row group."""
Expand Down Expand Up @@ -1455,7 +1465,8 @@ cdef shared_ptr[WriterProperties] _create_writer_properties(
data_page_version=None,
FileEncryptionProperties encryption_properties=None,
write_batch_size=None,
dictionary_pagesize_limit=None) except *:
dictionary_pagesize_limit=None,
write_page_index=False) except *:
"""General writer properties"""
cdef:
shared_ptr[WriterProperties] properties
Expand Down Expand Up @@ -1599,6 +1610,13 @@ cdef shared_ptr[WriterProperties] _create_writer_properties(
# a size larger than this then it will be latched to this value.
props.max_row_group_length(_MAX_ROW_GROUP_SIZE)

# page index

if write_page_index:
props.enable_write_page_index()
else:
props.disable_write_page_index()

properties = props.build()

return properties
Expand Down Expand Up @@ -1710,7 +1728,8 @@ cdef class ParquetWriter(_Weakrefable):
encryption_properties=None,
write_batch_size=None,
dictionary_pagesize_limit=None,
store_schema=True):
store_schema=True,
write_page_index=False):
cdef:
shared_ptr[WriterProperties] properties
shared_ptr[ArrowWriterProperties] arrow_properties
Expand Down Expand Up @@ -1740,7 +1759,8 @@ cdef class ParquetWriter(_Weakrefable):
data_page_version=data_page_version,
encryption_properties=encryption_properties,
write_batch_size=write_batch_size,
dictionary_pagesize_limit=dictionary_pagesize_limit
dictionary_pagesize_limit=dictionary_pagesize_limit,
write_page_index=write_page_index
)
arrow_properties = _create_arrow_writer_properties(
use_deprecated_int96_timestamps=use_deprecated_int96_timestamps,
Expand Down
11 changes: 11 additions & 0 deletions python/pyarrow/parquet/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -874,6 +874,13 @@ def _sanitize_table(table, new_schema, flavor):
it will restore the timezone (Parquet only stores the UTC values without
timezone), or columns with duration type will be restored from the int64
Parquet column.
write_page_index : bool, default False
Whether to write a page index in general for all columns.
Writing statistics to the page index disables the old method of writing
statistics to each data page header. The page index makes statistics-based
filtering more efficient than the page header, as it gathers all the
statistics for a Parquet file in a single place, avoiding scattered I/O.
Note that the page index is not yet used on the read size by PyArrow.
"""

_parquet_writer_example_doc = """\
Expand Down Expand Up @@ -966,6 +973,7 @@ def __init__(self, where, schema, filesystem=None,
write_batch_size=None,
dictionary_pagesize_limit=None,
store_schema=True,
write_page_index=False,
**options):
if use_deprecated_int96_timestamps is None:
# Use int96 timestamps for Spark
Expand Down Expand Up @@ -1022,6 +1030,7 @@ def __init__(self, where, schema, filesystem=None,
write_batch_size=write_batch_size,
dictionary_pagesize_limit=dictionary_pagesize_limit,
store_schema=store_schema,
write_page_index=write_page_index,
**options)
self.is_open = True

Expand Down Expand Up @@ -3084,6 +3093,7 @@ def write_table(table, where, row_group_size=None, version='2.6',
write_batch_size=None,
dictionary_pagesize_limit=None,
store_schema=True,
write_page_index=False,
**kwargs):
# Implementor's note: when adding keywords here / updating defaults, also
# update it in write_to_dataset and _dataset_parquet.pyx ParquetFileWriteOptions
Expand Down Expand Up @@ -3111,6 +3121,7 @@ def write_table(table, where, row_group_size=None, version='2.6',
write_batch_size=write_batch_size,
dictionary_pagesize_limit=dictionary_pagesize_limit,
store_schema=store_schema,
write_page_index=write_page_index,
**kwargs) as writer:
writer.write_table(table, row_group_size=row_group_size)
except Exception:
Expand Down
15 changes: 15 additions & 0 deletions python/pyarrow/tests/parquet/test_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,21 @@ def test_field_id_metadata():
assert schema[5].metadata[field_id] == b'-1000'


def test_parquet_file_page_index():
for write_page_index in (False, True):
table = pa.table({'a': [1, 2, 3]})

writer = pa.BufferOutputStream()
_write_table(table, writer, write_page_index=write_page_index)
reader = pa.BufferReader(writer.getvalue())

# Can retrieve sorting columns from metadata
metadata = pq.read_metadata(reader)
cc = metadata.row_group(0).column(0)
assert cc.has_offset_index is write_page_index
assert cc.has_column_index is write_page_index


@pytest.mark.pandas
def test_multi_dataset_metadata(tempdir):
filenames = ["ARROW-1983-dataset.0", "ARROW-1983-dataset.1"]
Expand Down

0 comments on commit 12f45ba

Please sign in to comment.