Skip to content

Commit

Permalink
apacheGH-34053: [C++][Parquet] Write parquet page index (apache#34054)
Browse files Browse the repository at this point in the history
### Rationale for this change

Parquet C++ reader supports reading page index from file, but the writer does not yet support writing it.

### What changes are included in this PR?

Parquet file writer collects page index from all data pages and serializes page index into the file.

### Are these changes tested?

Not yet, will be added later.

### Are there any user-facing changes?

`WriterProperties::enable_write_page_index()` and `WriterProperties::disable_write_page_index()` have been added to toggle it on and off.
* Closes: apache#34053

Authored-by: Gang Wu <[email protected]>
Signed-off-by: Will Jones <[email protected]>
  • Loading branch information
wgtmac authored and rtpsw committed May 16, 2023
1 parent 2bf6004 commit 251a982
Show file tree
Hide file tree
Showing 14 changed files with 1,458 additions and 38 deletions.
285 changes: 282 additions & 3 deletions cpp/src/parquet/arrow/arrow_reader_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#pragma warning(disable : 4800)
#endif

#include "gmock/gmock.h"
#include "gtest/gtest.h"

#include <cstdint>
Expand Down Expand Up @@ -66,6 +67,7 @@
#include "parquet/arrow/writer.h"
#include "parquet/column_writer.h"
#include "parquet/file_writer.h"
#include "parquet/page_index.h"
#include "parquet/test_util.h"

using arrow::Array;
Expand Down Expand Up @@ -5081,10 +5083,21 @@ TEST(TestArrowReadWrite, WriteAndReadRecordBatch) {

// Verify the single record batch has been sliced into two row groups by
// WriterProperties::max_row_group_length().
int num_row_groups = arrow_reader->parquet_reader()->metadata()->num_row_groups();
auto file_metadata = arrow_reader->parquet_reader()->metadata();
int num_row_groups = file_metadata->num_row_groups();
ASSERT_EQ(2, num_row_groups);
ASSERT_EQ(10, arrow_reader->parquet_reader()->metadata()->RowGroup(0)->num_rows());
ASSERT_EQ(2, arrow_reader->parquet_reader()->metadata()->RowGroup(1)->num_rows());
ASSERT_EQ(10, file_metadata->RowGroup(0)->num_rows());
ASSERT_EQ(2, file_metadata->RowGroup(1)->num_rows());

// Verify that page index is not written by default.
for (int i = 0; i < num_row_groups; ++i) {
auto row_group_metadata = file_metadata->RowGroup(i);
for (int j = 0; j < row_group_metadata->num_columns(); ++j) {
auto column_metadata = row_group_metadata->ColumnChunk(j);
EXPECT_FALSE(column_metadata->GetColumnIndexLocation().has_value());
EXPECT_FALSE(column_metadata->GetOffsetIndexLocation().has_value());
}
}

// Verify batch data read via RecordBatch
std::unique_ptr<::arrow::RecordBatchReader> batch_reader;
Expand Down Expand Up @@ -5146,5 +5159,271 @@ TEST(TestArrowReadWrite, FuzzReader) {
}
}

namespace {

struct ColumnIndexObject {
std::vector<bool> null_pages;
std::vector<std::string> min_values;
std::vector<std::string> max_values;
BoundaryOrder::type boundary_order = BoundaryOrder::Unordered;
std::vector<int64_t> null_counts;

ColumnIndexObject() = default;

ColumnIndexObject(const std::vector<bool>& null_pages,
const std::vector<std::string>& min_values,
const std::vector<std::string>& max_values,
BoundaryOrder::type boundary_order,
const std::vector<int64_t>& null_counts)
: null_pages(null_pages),
min_values(min_values),
max_values(max_values),
boundary_order(boundary_order),
null_counts(null_counts) {}

explicit ColumnIndexObject(const ColumnIndex* column_index) {
if (column_index == nullptr) {
return;
}
null_pages = column_index->null_pages();
min_values = column_index->encoded_min_values();
max_values = column_index->encoded_max_values();
boundary_order = column_index->boundary_order();
if (column_index->has_null_counts()) {
null_counts = column_index->null_counts();
}
}

bool operator==(const ColumnIndexObject& b) const {
return null_pages == b.null_pages && min_values == b.min_values &&
max_values == b.max_values && boundary_order == b.boundary_order &&
null_counts == b.null_counts;
}
};

auto encode_int64 = [](int64_t value) {
return std::string(reinterpret_cast<const char*>(&value), sizeof(int64_t));
};

auto encode_double = [](double value) {
return std::string(reinterpret_cast<const char*>(&value), sizeof(double));
};

} // namespace

class ParquetPageIndexRoundTripTest : public ::testing::Test {
public:
void WriteFile(const std::shared_ptr<WriterProperties>& writer_properties,
const std::shared_ptr<::arrow::Table>& table) {
// Get schema from table.
auto schema = table->schema();
std::shared_ptr<SchemaDescriptor> parquet_schema;
auto arrow_writer_properties = default_arrow_writer_properties();
ASSERT_OK_NO_THROW(ToParquetSchema(schema.get(), *writer_properties,
*arrow_writer_properties, &parquet_schema));
auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());

// Write table to buffer.
auto sink = CreateOutputStream();
auto pool = ::arrow::default_memory_pool();
auto writer = ParquetFileWriter::Open(sink, schema_node, writer_properties);
std::unique_ptr<FileWriter> arrow_writer;
ASSERT_OK(FileWriter::Make(pool, std::move(writer), schema, arrow_writer_properties,
&arrow_writer));
ASSERT_OK_NO_THROW(arrow_writer->WriteTable(*table));
ASSERT_OK_NO_THROW(arrow_writer->Close());
ASSERT_OK_AND_ASSIGN(buffer_, sink->Finish());
}

void ReadPageIndexes(int expect_num_row_groups, int expect_num_pages) {
auto read_properties = default_arrow_reader_properties();
auto reader = ParquetFileReader::Open(std::make_shared<BufferReader>(buffer_));

auto metadata = reader->metadata();
ASSERT_EQ(expect_num_row_groups, metadata->num_row_groups());

auto page_index_reader = reader->GetPageIndexReader();
ASSERT_NE(page_index_reader, nullptr);

int64_t offset_lower_bound = 0;
for (int rg = 0; rg < metadata->num_row_groups(); ++rg) {
auto row_group_index_reader = page_index_reader->RowGroup(rg);
ASSERT_NE(row_group_index_reader, nullptr);

for (int col = 0; col < metadata->num_columns(); ++col) {
auto column_index = row_group_index_reader->GetColumnIndex(col);
column_indexes_.emplace_back(column_index.get());

auto offset_index = row_group_index_reader->GetOffsetIndex(col);
CheckOffsetIndex(offset_index.get(), expect_num_pages, &offset_lower_bound);
}
}
}

private:
void CheckOffsetIndex(const OffsetIndex* offset_index, int expect_num_pages,
int64_t* offset_lower_bound_in_out) {
ASSERT_NE(offset_index, nullptr);
const auto& locations = offset_index->page_locations();
ASSERT_EQ(static_cast<size_t>(expect_num_pages), locations.size());
int64_t prev_first_row_index = -1;
for (const auto& location : locations) {
// Make sure first_row_index is in the ascending order within a row group.
ASSERT_GT(location.first_row_index, prev_first_row_index);
// Make sure page offset is in the ascending order across the file.
ASSERT_GE(location.offset, *offset_lower_bound_in_out);
// Make sure page size is positive.
ASSERT_GT(location.compressed_page_size, 0);
prev_first_row_index = location.first_row_index;
*offset_lower_bound_in_out = location.offset + location.compressed_page_size;
}
}

protected:
std::shared_ptr<Buffer> buffer_;
std::vector<ColumnIndexObject> column_indexes_;
};

TEST_F(ParquetPageIndexRoundTripTest, SimpleRoundTrip) {
auto writer_properties = WriterProperties::Builder()
.enable_write_page_index()
->max_row_group_length(4)
->build();
auto schema = ::arrow::schema({::arrow::field("c0", ::arrow::int64()),
::arrow::field("c1", ::arrow::utf8()),
::arrow::field("c2", ::arrow::list(::arrow::int64()))});
WriteFile(writer_properties, ::arrow::TableFromJSON(schema, {R"([
[1, "a", [1] ],
[2, "b", [1, 2] ],
[3, "c", [null] ],
[null, "d", [] ],
[5, null, [3, 3, 3]],
[6, "f", null ]
])"}));

ReadPageIndexes(/*expect_num_row_groups=*/2, /*expect_num_pages=*/1);

EXPECT_THAT(
column_indexes_,
::testing::ElementsAre(
ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{encode_int64(1)},
/*max_values=*/{encode_int64(3)}, BoundaryOrder::Ascending,
/*null_counts=*/{1}},
ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{"a"},
/*max_values=*/{"d"}, BoundaryOrder::Ascending,
/*null_counts=*/{0}},
ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{encode_int64(1)},
/*max_values=*/{encode_int64(2)}, BoundaryOrder::Ascending,
/*null_counts=*/{2}},
ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{encode_int64(5)},
/*max_values=*/{encode_int64(6)}, BoundaryOrder::Ascending,
/*null_counts=*/{0}},
ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{"f"},
/*max_values=*/{"f"}, BoundaryOrder::Ascending,
/*null_counts=*/{1}},
ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{encode_int64(3)},
/*max_values=*/{encode_int64(3)}, BoundaryOrder::Ascending,
/*null_counts=*/{1}}));
}

TEST_F(ParquetPageIndexRoundTripTest, DropLargeStats) {
auto writer_properties = WriterProperties::Builder()
.enable_write_page_index()
->max_row_group_length(1) /* write single-row row group */
->max_statistics_size(20) /* drop stats larger than it */
->build();
auto schema = ::arrow::schema({::arrow::field("c0", ::arrow::utf8())});
WriteFile(writer_properties, ::arrow::TableFromJSON(schema, {R"([
["short_string"],
["very_large_string_to_drop_stats"]
])"}));

ReadPageIndexes(/*expect_num_row_groups=*/2, /*expect_num_pages=*/1);

EXPECT_THAT(
column_indexes_,
::testing::ElementsAre(
ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{"short_string"},
/*max_values=*/{"short_string"}, BoundaryOrder::Ascending,
/*null_counts=*/{0}},
ColumnIndexObject{}));
}

TEST_F(ParquetPageIndexRoundTripTest, MultiplePages) {
auto writer_properties = WriterProperties::Builder()
.enable_write_page_index()
->data_pagesize(1) /* write multiple pages */
->build();
auto schema = ::arrow::schema(
{::arrow::field("c0", ::arrow::int64()), ::arrow::field("c1", ::arrow::utf8())});
WriteFile(
writer_properties,
::arrow::TableFromJSON(
schema, {R"([[1, "a"], [2, "b"]])", R"([[3, "c"], [4, "d"]])",
R"([[null, null], [6, "f"]])", R"([[null, null], [null, null]])"}));

ReadPageIndexes(/*expect_num_row_groups=*/1, /*expect_num_pages=*/4);

EXPECT_THAT(
column_indexes_,
::testing::ElementsAre(
ColumnIndexObject{
/*null_pages=*/{false, false, false, true},
/*min_values=*/{encode_int64(1), encode_int64(3), encode_int64(6), ""},
/*max_values=*/{encode_int64(2), encode_int64(4), encode_int64(6), ""},
BoundaryOrder::Ascending,
/*null_counts=*/{0, 0, 1, 2}},
ColumnIndexObject{/*null_pages=*/{false, false, false, true},
/*min_values=*/{"a", "c", "f", ""},
/*max_values=*/{"b", "d", "f", ""}, BoundaryOrder::Ascending,
/*null_counts=*/{0, 0, 1, 2}}));
}

TEST_F(ParquetPageIndexRoundTripTest, DoubleWithNaNs) {
auto writer_properties = WriterProperties::Builder()
.enable_write_page_index()
->max_row_group_length(3) /* 3 rows per row group */
->build();

// Create table to write with NaNs.
auto vectors = std::vector<std::shared_ptr<Array>>(4);
// NaN will be ignored in min/max stats.
::arrow::ArrayFromVector<::arrow::DoubleType>({1.0, NAN, 0.1}, &vectors[0]);
// Lower bound will use -0.0.
::arrow::ArrayFromVector<::arrow::DoubleType>({+0.0, NAN, +0.0}, &vectors[1]);
// Upper bound will use -0.0.
::arrow::ArrayFromVector<::arrow::DoubleType>({-0.0, NAN, -0.0}, &vectors[2]);
// Pages with all NaNs will not build column index.
::arrow::ArrayFromVector<::arrow::DoubleType>({NAN, NAN, NAN}, &vectors[3]);
ASSERT_OK_AND_ASSIGN(auto chunked_array,
arrow::ChunkedArray::Make(vectors, ::arrow::float64()));

auto schema = ::arrow::schema({::arrow::field("c0", ::arrow::float64())});
auto table = Table::Make(schema, {chunked_array});
WriteFile(writer_properties, table);

ReadPageIndexes(/*expect_num_row_groups=*/4, /*expect_num_pages=*/1);

EXPECT_THAT(
column_indexes_,
::testing::ElementsAre(
ColumnIndexObject{/*null_pages=*/{false},
/*min_values=*/{encode_double(0.1)},
/*max_values=*/{encode_double(1.0)}, BoundaryOrder::Ascending,
/*null_counts=*/{0}},
ColumnIndexObject{/*null_pages=*/{false},
/*min_values=*/{encode_double(-0.0)},
/*max_values=*/{encode_double(+0.0)},
BoundaryOrder::Ascending,
/*null_counts=*/{0}},
ColumnIndexObject{/*null_pages=*/{false},
/*min_values=*/{encode_double(-0.0)},
/*max_values=*/{encode_double(+0.0)},
BoundaryOrder::Ascending,
/*null_counts=*/{0}},
ColumnIndexObject{
/* Page with only NaN values does not have column index built */}));
}

} // namespace arrow
} // namespace parquet
23 changes: 17 additions & 6 deletions cpp/src/parquet/column_page.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include <cstdint>
#include <memory>
#include <optional>
#include <string>

#include "parquet/statistics.h"
Expand Down Expand Up @@ -64,33 +65,42 @@ class DataPage : public Page {
Encoding::type encoding() const { return encoding_; }
int64_t uncompressed_size() const { return uncompressed_size_; }
const EncodedStatistics& statistics() const { return statistics_; }
/// Return the row ordinal within the row group to the first row in the data page.
/// Currently it is only present from data pages created by ColumnWriter in order
/// to collect page index.
std::optional<int64_t> first_row_index() const { return first_row_index_; }

virtual ~DataPage() = default;

protected:
DataPage(PageType::type type, const std::shared_ptr<Buffer>& buffer, int32_t num_values,
Encoding::type encoding, int64_t uncompressed_size,
const EncodedStatistics& statistics = EncodedStatistics())
const EncodedStatistics& statistics = EncodedStatistics(),
std::optional<int64_t> first_row_index = std::nullopt)
: Page(buffer, type),
num_values_(num_values),
encoding_(encoding),
uncompressed_size_(uncompressed_size),
statistics_(statistics) {}
statistics_(statistics),
first_row_index_(std::move(first_row_index)) {}

int32_t num_values_;
Encoding::type encoding_;
int64_t uncompressed_size_;
EncodedStatistics statistics_;
/// Row ordinal within the row group to the first row in the data page.
std::optional<int64_t> first_row_index_;
};

class DataPageV1 : public DataPage {
public:
DataPageV1(const std::shared_ptr<Buffer>& buffer, int32_t num_values,
Encoding::type encoding, Encoding::type definition_level_encoding,
Encoding::type repetition_level_encoding, int64_t uncompressed_size,
const EncodedStatistics& statistics = EncodedStatistics())
const EncodedStatistics& statistics = EncodedStatistics(),
std::optional<int64_t> first_row_index = std::nullopt)
: DataPage(PageType::DATA_PAGE, buffer, num_values, encoding, uncompressed_size,
statistics),
statistics, std::move(first_row_index)),
definition_level_encoding_(definition_level_encoding),
repetition_level_encoding_(repetition_level_encoding) {}

Expand All @@ -109,9 +119,10 @@ class DataPageV2 : public DataPage {
int32_t num_rows, Encoding::type encoding,
int32_t definition_levels_byte_length, int32_t repetition_levels_byte_length,
int64_t uncompressed_size, bool is_compressed = false,
const EncodedStatistics& statistics = EncodedStatistics())
const EncodedStatistics& statistics = EncodedStatistics(),
std::optional<int64_t> first_row_index = std::nullopt)
: DataPage(PageType::DATA_PAGE_V2, buffer, num_values, encoding, uncompressed_size,
statistics),
statistics, std::move(first_row_index)),
num_nulls_(num_nulls),
num_rows_(num_rows),
definition_levels_byte_length_(definition_levels_byte_length),
Expand Down
Loading

0 comments on commit 251a982

Please sign in to comment.