Skip to content

Commit

Permalink
apacheGH-34053: [C++][Parquet] Write parquet page index
Browse files Browse the repository at this point in the history
  • Loading branch information
wgtmac committed Feb 28, 2023
1 parent 0e752c8 commit 363dcef
Show file tree
Hide file tree
Showing 14 changed files with 1,283 additions and 43 deletions.
109 changes: 106 additions & 3 deletions cpp/src/parquet/arrow/arrow_reader_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
#include "parquet/arrow/writer.h"
#include "parquet/column_writer.h"
#include "parquet/file_writer.h"
#include "parquet/page_index.h"
#include "parquet/test_util.h"

using arrow::Array;
Expand Down Expand Up @@ -5081,10 +5082,21 @@ TEST(TestArrowReadWrite, WriteAndReadRecordBatch) {

// Verify the single record batch has been sliced into two row groups by
// WriterProperties::max_row_group_length().
int num_row_groups = arrow_reader->parquet_reader()->metadata()->num_row_groups();
auto file_metadata = arrow_reader->parquet_reader()->metadata();
int num_row_groups = file_metadata->num_row_groups();
ASSERT_EQ(2, num_row_groups);
ASSERT_EQ(10, arrow_reader->parquet_reader()->metadata()->RowGroup(0)->num_rows());
ASSERT_EQ(2, arrow_reader->parquet_reader()->metadata()->RowGroup(1)->num_rows());
ASSERT_EQ(10, file_metadata->RowGroup(0)->num_rows());
ASSERT_EQ(2, file_metadata->RowGroup(1)->num_rows());

// Verify that page index is not written by default.
for (int i = 0; i < num_row_groups; ++i) {
auto row_group_metadata = file_metadata->RowGroup(i);
for (int j = 0; j < row_group_metadata->num_columns(); ++j) {
auto column_metadata = row_group_metadata->ColumnChunk(j);
EXPECT_FALSE(column_metadata->GetColumnIndexLocation().has_value());
EXPECT_FALSE(column_metadata->GetOffsetIndexLocation().has_value());
}
}

// Verify batch data read via RecordBatch
std::unique_ptr<::arrow::RecordBatchReader> batch_reader;
Expand Down Expand Up @@ -5146,5 +5158,96 @@ TEST(TestArrowReadWrite, FuzzReader) {
}
}

TEST(TestArrowReadWrite, WriteReadPageIndexRoundTrip) {
// Enable page index to the writer.
auto writer_properties = WriterProperties::Builder()
.enable_write_page_index()
->max_row_group_length(4)
->build();
auto arrow_writer_properties = default_arrow_writer_properties();
auto pool = ::arrow::default_memory_pool();
auto sink = CreateOutputStream();
auto schema = ::arrow::schema(
{::arrow::field("c0", ::arrow::int64()), ::arrow::field("c1", ::arrow::utf8())});
std::shared_ptr<SchemaDescriptor> parquet_schema;
ASSERT_OK_NO_THROW(ToParquetSchema(schema.get(), *writer_properties,
*arrow_writer_properties, &parquet_schema));
auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());

// Prepare data and each row group contains 4 rows.
auto record_batch = ::arrow::RecordBatchFromJSON(schema, R"([
[1, "a"],
[2, "b"],
[3, "c"],
[null, "d"],
[5, null],
[6, "f"]
])");

// Create writer to write data via RecordBatch.
auto writer = ParquetFileWriter::Open(sink, schema_node, writer_properties);
std::unique_ptr<FileWriter> arrow_writer;
ASSERT_OK(FileWriter::Make(pool, std::move(writer), record_batch->schema(),
arrow_writer_properties, &arrow_writer));
ASSERT_OK_NO_THROW(arrow_writer->WriteRecordBatch(*record_batch));
ASSERT_OK_NO_THROW(arrow_writer->Close());
ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());

// Create reader to read page index.
auto read_properties = default_arrow_reader_properties();
auto reader = ParquetFileReader::Open(std::make_shared<BufferReader>(buffer));
auto metadata = reader->metadata();
ASSERT_EQ(2, metadata->num_row_groups());

// Make sure page index reader is not null.
auto page_index_reader = reader->GetPageIndexReader();
ASSERT_NE(page_index_reader, nullptr);

auto encode_int64 = [=](int64_t value) {
return std::string(reinterpret_cast<const char*>(&value), sizeof(int64_t));
};

const std::vector<std::string> c0_min_values = {encode_int64(1), encode_int64(5)};
const std::vector<std::string> c0_max_values = {encode_int64(3), encode_int64(6)};
const std::vector<std::string> c1_min_values = {"a", "f"};
const std::vector<std::string> c1_max_values = {"d", "f"};
const std::vector<int64_t> c0_null_counts = {1, 0};
const std::vector<int64_t> c1_null_counts = {0, 1};

const size_t num_pages = 1;
for (int rg = 0; rg < metadata->num_row_groups(); ++rg) {
auto row_group_index_reader = page_index_reader->RowGroup(rg);
ASSERT_NE(row_group_index_reader, nullptr);

// Verify offset index.
for (int c = 0; c < metadata->num_columns(); ++c) {
auto offset_index = row_group_index_reader->GetOffsetIndex(c);
ASSERT_NE(offset_index, nullptr);
ASSERT_EQ(num_pages, offset_index->page_locations().size());
ASSERT_EQ(0, offset_index->page_locations()[0].first_row_index);
}

// Verify column index of c0.
auto c0_column_index = row_group_index_reader->GetColumnIndex(0);
ASSERT_NE(c0_column_index, nullptr);
ASSERT_EQ(num_pages, c0_column_index->null_pages().size());
ASSERT_EQ(BoundaryOrder::Ascending, c0_column_index->boundary_order());
ASSERT_EQ(c0_min_values[rg], c0_column_index->encoded_min_values()[0]);
ASSERT_EQ(c0_max_values[rg], c0_column_index->encoded_max_values()[0]);
ASSERT_TRUE(c0_column_index->has_null_counts());
ASSERT_EQ(c0_null_counts[rg], c0_column_index->null_counts()[0]);

// Verify column index of c1.
auto c1_column_index = row_group_index_reader->GetColumnIndex(1);
ASSERT_NE(c1_column_index, nullptr);
ASSERT_EQ(num_pages, c1_column_index->null_pages().size());
ASSERT_EQ(BoundaryOrder::Ascending, c1_column_index->boundary_order());
ASSERT_EQ(c1_min_values[rg], c1_column_index->encoded_min_values()[0]);
ASSERT_EQ(c1_max_values[rg], c1_column_index->encoded_max_values()[0]);
ASSERT_TRUE(c1_column_index->has_null_counts());
ASSERT_EQ(c1_null_counts[rg], c1_column_index->null_counts()[0]);
}
}

} // namespace arrow
} // namespace parquet
20 changes: 14 additions & 6 deletions cpp/src/parquet/column_page.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include <cstdint>
#include <memory>
#include <optional>
#include <string>

#include "parquet/statistics.h"
Expand Down Expand Up @@ -64,33 +65,39 @@ class DataPage : public Page {
Encoding::type encoding() const { return encoding_; }
int64_t uncompressed_size() const { return uncompressed_size_; }
const EncodedStatistics& statistics() const { return statistics_; }
std::optional<int64_t> first_row_index() const { return first_row_index_; }

virtual ~DataPage() = default;

protected:
DataPage(PageType::type type, const std::shared_ptr<Buffer>& buffer, int32_t num_values,
Encoding::type encoding, int64_t uncompressed_size,
const EncodedStatistics& statistics = EncodedStatistics())
const EncodedStatistics& statistics = EncodedStatistics(),
std::optional<int64_t> first_row_index = std::nullopt)
: Page(buffer, type),
num_values_(num_values),
encoding_(encoding),
uncompressed_size_(uncompressed_size),
statistics_(statistics) {}
statistics_(statistics),
first_row_index_(std::move(first_row_index)) {}

int32_t num_values_;
Encoding::type encoding_;
int64_t uncompressed_size_;
EncodedStatistics statistics_;
/// Row ordinal within the row group to the first row in the data page.
std::optional<int64_t> first_row_index_;
};

class DataPageV1 : public DataPage {
public:
DataPageV1(const std::shared_ptr<Buffer>& buffer, int32_t num_values,
Encoding::type encoding, Encoding::type definition_level_encoding,
Encoding::type repetition_level_encoding, int64_t uncompressed_size,
const EncodedStatistics& statistics = EncodedStatistics())
const EncodedStatistics& statistics = EncodedStatistics(),
std::optional<int64_t> first_row_index = std::nullopt)
: DataPage(PageType::DATA_PAGE, buffer, num_values, encoding, uncompressed_size,
statistics),
statistics, std::move(first_row_index)),
definition_level_encoding_(definition_level_encoding),
repetition_level_encoding_(repetition_level_encoding) {}

Expand All @@ -109,9 +116,10 @@ class DataPageV2 : public DataPage {
int32_t num_rows, Encoding::type encoding,
int32_t definition_levels_byte_length, int32_t repetition_levels_byte_length,
int64_t uncompressed_size, bool is_compressed = false,
const EncodedStatistics& statistics = EncodedStatistics())
const EncodedStatistics& statistics = EncodedStatistics(),
std::optional<int64_t> first_row_index = std::nullopt)
: DataPage(PageType::DATA_PAGE_V2, buffer, num_values, encoding, uncompressed_size,
statistics),
statistics, std::move(first_row_index)),
num_nulls_(num_nulls),
num_rows_(num_rows),
definition_levels_byte_length_(definition_levels_byte_length),
Expand Down
Loading

0 comments on commit 363dcef

Please sign in to comment.