diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index eba964f9678b6..1ceae90c8f48c 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -66,6 +66,7 @@ #include "parquet/arrow/writer.h" #include "parquet/column_writer.h" #include "parquet/file_writer.h" +#include "parquet/page_index.h" #include "parquet/test_util.h" using arrow::Array; @@ -5081,10 +5082,21 @@ TEST(TestArrowReadWrite, WriteAndReadRecordBatch) { // Verify the single record batch has been sliced into two row groups by // WriterProperties::max_row_group_length(). - int num_row_groups = arrow_reader->parquet_reader()->metadata()->num_row_groups(); + auto file_metadata = arrow_reader->parquet_reader()->metadata(); + int num_row_groups = file_metadata->num_row_groups(); ASSERT_EQ(2, num_row_groups); - ASSERT_EQ(10, arrow_reader->parquet_reader()->metadata()->RowGroup(0)->num_rows()); - ASSERT_EQ(2, arrow_reader->parquet_reader()->metadata()->RowGroup(1)->num_rows()); + ASSERT_EQ(10, file_metadata->RowGroup(0)->num_rows()); + ASSERT_EQ(2, file_metadata->RowGroup(1)->num_rows()); + + // Verify that page index is not written by default. + for (int i = 0; i < num_row_groups; ++i) { + auto row_group_metadata = file_metadata->RowGroup(i); + for (int j = 0; j < row_group_metadata->num_columns(); ++j) { + auto column_metadata = row_group_metadata->ColumnChunk(j); + EXPECT_FALSE(column_metadata->GetColumnIndexLocation().has_value()); + EXPECT_FALSE(column_metadata->GetOffsetIndexLocation().has_value()); + } + } // Verify batch data read via RecordBatch std::unique_ptr<::arrow::RecordBatchReader> batch_reader; @@ -5146,5 +5158,96 @@ TEST(TestArrowReadWrite, FuzzReader) { } } +TEST(TestArrowReadWrite, WriteReadPageIndexRoundTrip) { + // Enable page index to the writer. + auto writer_properties = WriterProperties::Builder() + .enable_write_page_index() + ->max_row_group_length(4) + ->build(); + auto arrow_writer_properties = default_arrow_writer_properties(); + auto pool = ::arrow::default_memory_pool(); + auto sink = CreateOutputStream(); + auto schema = ::arrow::schema( + {::arrow::field("c0", ::arrow::int64()), ::arrow::field("c1", ::arrow::utf8())}); + std::shared_ptr parquet_schema; + ASSERT_OK_NO_THROW(ToParquetSchema(schema.get(), *writer_properties, + *arrow_writer_properties, &parquet_schema)); + auto schema_node = std::static_pointer_cast(parquet_schema->schema_root()); + + // Prepare data and each row group contains 4 rows. + auto record_batch = ::arrow::RecordBatchFromJSON(schema, R"([ + [1, "a"], + [2, "b"], + [3, "c"], + [null, "d"], + [5, null], + [6, "f"] + ])"); + + // Create writer to write data via RecordBatch. + auto writer = ParquetFileWriter::Open(sink, schema_node, writer_properties); + std::unique_ptr arrow_writer; + ASSERT_OK(FileWriter::Make(pool, std::move(writer), record_batch->schema(), + arrow_writer_properties, &arrow_writer)); + ASSERT_OK_NO_THROW(arrow_writer->WriteRecordBatch(*record_batch)); + ASSERT_OK_NO_THROW(arrow_writer->Close()); + ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish()); + + // Create reader to read page index. + auto read_properties = default_arrow_reader_properties(); + auto reader = ParquetFileReader::Open(std::make_shared(buffer)); + auto metadata = reader->metadata(); + ASSERT_EQ(2, metadata->num_row_groups()); + + // Make sure page index reader is not null. + auto page_index_reader = reader->GetPageIndexReader(); + ASSERT_NE(page_index_reader, nullptr); + + auto encode_int64 = [=](int64_t value) { + return std::string(reinterpret_cast(&value), sizeof(int64_t)); + }; + + const std::vector c0_min_values = {encode_int64(1), encode_int64(5)}; + const std::vector c0_max_values = {encode_int64(3), encode_int64(6)}; + const std::vector c1_min_values = {"a", "f"}; + const std::vector c1_max_values = {"d", "f"}; + const std::vector c0_null_counts = {1, 0}; + const std::vector c1_null_counts = {0, 1}; + + const size_t num_pages = 1; + for (int rg = 0; rg < metadata->num_row_groups(); ++rg) { + auto row_group_index_reader = page_index_reader->RowGroup(rg); + ASSERT_NE(row_group_index_reader, nullptr); + + // Verify offset index. + for (int c = 0; c < metadata->num_columns(); ++c) { + auto offset_index = row_group_index_reader->GetOffsetIndex(c); + ASSERT_NE(offset_index, nullptr); + ASSERT_EQ(num_pages, offset_index->page_locations().size()); + ASSERT_EQ(0, offset_index->page_locations()[0].first_row_index); + } + + // Verify column index of c0. + auto c0_column_index = row_group_index_reader->GetColumnIndex(0); + ASSERT_NE(c0_column_index, nullptr); + ASSERT_EQ(num_pages, c0_column_index->null_pages().size()); + ASSERT_EQ(BoundaryOrder::Ascending, c0_column_index->boundary_order()); + ASSERT_EQ(c0_min_values[rg], c0_column_index->encoded_min_values()[0]); + ASSERT_EQ(c0_max_values[rg], c0_column_index->encoded_max_values()[0]); + ASSERT_TRUE(c0_column_index->has_null_counts()); + ASSERT_EQ(c0_null_counts[rg], c0_column_index->null_counts()[0]); + + // Verify column index of c1. + auto c1_column_index = row_group_index_reader->GetColumnIndex(1); + ASSERT_NE(c1_column_index, nullptr); + ASSERT_EQ(num_pages, c1_column_index->null_pages().size()); + ASSERT_EQ(BoundaryOrder::Ascending, c1_column_index->boundary_order()); + ASSERT_EQ(c1_min_values[rg], c1_column_index->encoded_min_values()[0]); + ASSERT_EQ(c1_max_values[rg], c1_column_index->encoded_max_values()[0]); + ASSERT_TRUE(c1_column_index->has_null_counts()); + ASSERT_EQ(c1_null_counts[rg], c1_column_index->null_counts()[0]); + } +} + } // namespace arrow } // namespace parquet diff --git a/cpp/src/parquet/column_page.h b/cpp/src/parquet/column_page.h index 2fab77ed01ab5..2db904acdc8b5 100644 --- a/cpp/src/parquet/column_page.h +++ b/cpp/src/parquet/column_page.h @@ -23,6 +23,7 @@ #include #include +#include #include #include "parquet/statistics.h" @@ -64,23 +65,28 @@ class DataPage : public Page { Encoding::type encoding() const { return encoding_; } int64_t uncompressed_size() const { return uncompressed_size_; } const EncodedStatistics& statistics() const { return statistics_; } + std::optional first_row_index() const { return first_row_index_; } virtual ~DataPage() = default; protected: DataPage(PageType::type type, const std::shared_ptr& buffer, int32_t num_values, Encoding::type encoding, int64_t uncompressed_size, - const EncodedStatistics& statistics = EncodedStatistics()) + const EncodedStatistics& statistics = EncodedStatistics(), + std::optional first_row_index = std::nullopt) : Page(buffer, type), num_values_(num_values), encoding_(encoding), uncompressed_size_(uncompressed_size), - statistics_(statistics) {} + statistics_(statistics), + first_row_index_(std::move(first_row_index)) {} int32_t num_values_; Encoding::type encoding_; int64_t uncompressed_size_; EncodedStatistics statistics_; + /// Row ordinal within the row group to the first row in the data page. + std::optional first_row_index_; }; class DataPageV1 : public DataPage { @@ -88,9 +94,10 @@ class DataPageV1 : public DataPage { DataPageV1(const std::shared_ptr& buffer, int32_t num_values, Encoding::type encoding, Encoding::type definition_level_encoding, Encoding::type repetition_level_encoding, int64_t uncompressed_size, - const EncodedStatistics& statistics = EncodedStatistics()) + const EncodedStatistics& statistics = EncodedStatistics(), + std::optional first_row_index = std::nullopt) : DataPage(PageType::DATA_PAGE, buffer, num_values, encoding, uncompressed_size, - statistics), + statistics, std::move(first_row_index)), definition_level_encoding_(definition_level_encoding), repetition_level_encoding_(repetition_level_encoding) {} @@ -109,9 +116,10 @@ class DataPageV2 : public DataPage { int32_t num_rows, Encoding::type encoding, int32_t definition_levels_byte_length, int32_t repetition_levels_byte_length, int64_t uncompressed_size, bool is_compressed = false, - const EncodedStatistics& statistics = EncodedStatistics()) + const EncodedStatistics& statistics = EncodedStatistics(), + std::optional first_row_index = std::nullopt) : DataPage(PageType::DATA_PAGE_V2, buffer, num_values, encoding, uncompressed_size, - statistics), + statistics, std::move(first_row_index)), num_nulls_(num_nulls), num_rows_(num_rows), definition_levels_byte_length_(definition_levels_byte_length), diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc index 829beb5325684..96a684e4ef40f 100644 --- a/cpp/src/parquet/column_writer.cc +++ b/cpp/src/parquet/column_writer.cc @@ -49,6 +49,7 @@ #include "parquet/encryption/internal_file_encryptor.h" #include "parquet/level_conversion.h" #include "parquet/metadata.h" +#include "parquet/page_index.h" #include "parquet/platform.h" #include "parquet/properties.h" #include "parquet/schema.h" @@ -253,7 +254,9 @@ class SerializedPageWriter : public PageWriter { bool use_page_checksum_verification, MemoryPool* pool = ::arrow::default_memory_pool(), std::shared_ptr meta_encryptor = nullptr, - std::shared_ptr data_encryptor = nullptr) + std::shared_ptr data_encryptor = nullptr, + ColumnIndexBuilder* column_index_builder = nullptr, + OffsetIndexBuilder* offset_index_builder = nullptr) : sink_(std::move(sink)), metadata_(metadata), pool_(pool), @@ -268,7 +271,9 @@ class SerializedPageWriter : public PageWriter { page_checksum_verification_(use_page_checksum_verification), meta_encryptor_(std::move(meta_encryptor)), data_encryptor_(std::move(data_encryptor)), - encryption_buffer_(AllocateBuffer(pool, 0)) { + encryption_buffer_(AllocateBuffer(pool, 0)), + column_index_builder_(column_index_builder), + offset_index_builder_(offset_index_builder) { if (data_encryptor_ != nullptr || meta_encryptor_ != nullptr) { InitEncryption(); } @@ -335,6 +340,10 @@ class SerializedPageWriter : public PageWriter { if (meta_encryptor_ != nullptr) { UpdateEncryption(encryption::kColumnMetaData); } + + // Serialized page writer does not need to adjust page offsets. + FinishPageIndex(/*final_position=*/0); + // index_page_offset = -1 since they are not supported metadata_->Finish(num_values_, dictionary_page_offset_, -1, data_page_offset_, total_compressed_size_, total_uncompressed_size_, has_dictionary, @@ -413,6 +422,25 @@ class SerializedPageWriter : public PageWriter { thrift_serializer_->Serialize(&page_header, sink_.get(), meta_encryptor_); PARQUET_THROW_NOT_OK(sink_->Write(output_data_buffer, output_data_len)); + /// Collect page index + if (column_index_builder_ != nullptr) { + column_index_builder_->AddPage(page.statistics()); + } + if (offset_index_builder_ != nullptr) { + const int64_t compressed_size = output_data_len + header_size; + if (compressed_size > std::numeric_limits::max()) { + throw ParquetException("Compressed page size overflows to INT32_MAX."); + } + if (!page.first_row_index().has_value()) { + throw ParquetException("First row index is not set in data page."); + } + /// start_pos is a relative offset in the buffered mode. It should be + /// adjusted via OffsetIndexBuilder::Finish() after BufferedPageWriter + /// has flushed all data pages. + offset_index_builder_->AddPage(start_pos, static_cast(compressed_size), + *page.first_row_index()); + } + total_uncompressed_size_ += uncompressed_size + header_size; total_compressed_size_ += output_data_len + header_size; num_values_ += page.num_values(); @@ -454,6 +482,17 @@ class SerializedPageWriter : public PageWriter { page_header.__set_data_page_header_v2(data_page_header); } + /// \brief Finish page index builders and update the stream offset to adjust + /// page offsets. + void FinishPageIndex(int64_t final_position) { + if (column_index_builder_ != nullptr) { + column_index_builder_->Finish(); + } + if (offset_index_builder_ != nullptr) { + offset_index_builder_->Finish(final_position); + } + } + bool has_compressor() override { return (compressor_ != nullptr); } int64_t num_values() { return num_values_; } @@ -559,6 +598,9 @@ class SerializedPageWriter : public PageWriter { std::map dict_encoding_stats_; std::map data_encoding_stats_; + + ColumnIndexBuilder* column_index_builder_; + OffsetIndexBuilder* offset_index_builder_; }; // This implementation of the PageWriter writes to the final sink on Close . @@ -570,13 +612,16 @@ class BufferedPageWriter : public PageWriter { bool use_page_checksum_verification, MemoryPool* pool = ::arrow::default_memory_pool(), std::shared_ptr meta_encryptor = nullptr, - std::shared_ptr data_encryptor = nullptr) + std::shared_ptr data_encryptor = nullptr, + ColumnIndexBuilder* column_index_builder = nullptr, + OffsetIndexBuilder* offset_index_builder = nullptr) : final_sink_(std::move(sink)), metadata_(metadata), has_dictionary_pages_(false) { in_memory_sink_ = CreateOutputStream(pool); pager_ = std::make_unique( in_memory_sink_, codec, compression_level, metadata, row_group_ordinal, current_column_ordinal, use_page_checksum_verification, pool, - std::move(meta_encryptor), std::move(data_encryptor)); + std::move(meta_encryptor), std::move(data_encryptor), column_index_builder, + offset_index_builder); } int64_t WriteDictionaryPage(const DictionaryPage& page) override { @@ -602,6 +647,9 @@ class BufferedPageWriter : public PageWriter { // Write metadata at end of column chunk metadata_->WriteTo(in_memory_sink_.get()); + // Buffered page writer needs to adjust page offsets. + pager_->FinishPageIndex(final_position); + // flush everything to the serialized sink PARQUET_ASSIGN_OR_THROW(auto buffer, in_memory_sink_->Finish()); PARQUET_THROW_NOT_OK(final_sink_->Write(buffer)); @@ -634,17 +682,20 @@ std::unique_ptr PageWriter::Open( int compression_level, ColumnChunkMetaDataBuilder* metadata, int16_t row_group_ordinal, int16_t column_chunk_ordinal, MemoryPool* pool, bool buffered_row_group, std::shared_ptr meta_encryptor, - std::shared_ptr data_encryptor, bool page_write_checksum_enabled) { + std::shared_ptr data_encryptor, bool page_write_checksum_enabled, + ColumnIndexBuilder* column_index_builder, OffsetIndexBuilder* offset_index_builder) { if (buffered_row_group) { return std::unique_ptr(new BufferedPageWriter( std::move(sink), codec, compression_level, metadata, row_group_ordinal, column_chunk_ordinal, page_write_checksum_enabled, pool, - std::move(meta_encryptor), std::move(data_encryptor))); + std::move(meta_encryptor), std::move(data_encryptor), column_index_builder, + offset_index_builder)); } else { return std::unique_ptr(new SerializedPageWriter( std::move(sink), codec, compression_level, metadata, row_group_ordinal, column_chunk_ordinal, page_write_checksum_enabled, pool, - std::move(meta_encryptor), std::move(data_encryptor))); + std::move(meta_encryptor), std::move(data_encryptor), column_index_builder, + offset_index_builder)); } } @@ -916,6 +967,7 @@ void ColumnWriterImpl::BuildDataPageV1(int64_t definition_levels_rle_size, } int32_t num_values = static_cast(num_buffered_values_); + int64_t first_row_index = rows_written_ - num_buffered_rows_; // Write the page to OutputStream eagerly if there is no dictionary or // if dictionary encoding has fallen back to PLAIN @@ -925,13 +977,13 @@ void ColumnWriterImpl::BuildDataPageV1(int64_t definition_levels_rle_size, compressed_data->CopySlice(0, compressed_data->size(), allocator_)); std::unique_ptr page_ptr = std::make_unique( compressed_data_copy, num_values, encoding_, Encoding::RLE, Encoding::RLE, - uncompressed_size, page_stats); + uncompressed_size, page_stats, first_row_index); total_compressed_bytes_ += page_ptr->size() + sizeof(format::PageHeader); data_pages_.push_back(std::move(page_ptr)); } else { // Eagerly write pages DataPageV1 page(compressed_data, num_values, encoding_, Encoding::RLE, Encoding::RLE, - uncompressed_size, page_stats); + uncompressed_size, page_stats, first_row_index); WriteDataPage(page); } } @@ -968,6 +1020,7 @@ void ColumnWriterImpl::BuildDataPageV2(int64_t definition_levels_rle_size, int32_t num_rows = static_cast(num_buffered_rows_); int32_t def_levels_byte_length = static_cast(definition_levels_rle_size); int32_t rep_levels_byte_length = static_cast(repetition_levels_rle_size); + int64_t first_row_index = rows_written_ - num_buffered_rows_; // Write the page to OutputStream eagerly if there is no dictionary or // if dictionary encoding has fallen back to PLAIN @@ -976,13 +1029,14 @@ void ColumnWriterImpl::BuildDataPageV2(int64_t definition_levels_rle_size, combined->CopySlice(0, combined->size(), allocator_)); std::unique_ptr page_ptr = std::make_unique( combined, num_values, null_count, num_rows, encoding_, def_levels_byte_length, - rep_levels_byte_length, uncompressed_size, pager_->has_compressor(), page_stats); + rep_levels_byte_length, uncompressed_size, pager_->has_compressor(), page_stats, + first_row_index); total_compressed_bytes_ += page_ptr->size() + sizeof(format::PageHeader); data_pages_.push_back(std::move(page_ptr)); } else { DataPageV2 page(combined, num_values, null_count, num_rows, encoding_, def_levels_byte_length, rep_levels_byte_length, uncompressed_size, - pager_->has_compressor(), page_stats); + pager_->has_compressor(), page_stats, first_row_index); WriteDataPage(page); } } @@ -1186,11 +1240,12 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< if (bits_buffer_ != nullptr) { WriteValuesSpaced(AddIfNotNull(values, value_offset), batch_num_values, batch_num_spaced_values, bits_buffer_->data(), /*offset=*/0, - /*num_levels=*/batch_size); + /*num_levels=*/batch_size, null_count); } else { WriteValuesSpaced(AddIfNotNull(values, value_offset), batch_num_values, batch_num_spaced_values, valid_bits, - valid_bits_offset + value_offset, /*num_levels=*/batch_size); + valid_bits_offset + value_offset, /*num_levels=*/batch_size, + null_count); } CommitWriteAndCheckPageLimit(batch_size, batch_num_spaced_values, check_page); value_offset += batch_num_spaced_values; @@ -1298,7 +1353,8 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< const WriterProperties* properties() override { return properties_; } bool pages_change_on_record_boundaries() const { - return properties_->data_page_version() == ParquetDataPageVersion::V2; + return properties_->data_page_version() == ParquetDataPageVersion::V2 || + properties_->write_page_index(); } private: @@ -1381,7 +1437,7 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< *out_spaced_values_to_write += def_levels[x] >= level_info_.repeated_ancestor_def_level ? 1 : 0; } - *null_count = *out_values_to_write - *out_spaced_values_to_write; + *null_count = batch_size - *out_values_to_write; } return; } @@ -1499,9 +1555,22 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< } } + /// \brief Write values with spaces and update page statistics accordingly. + /// + /// \param values input buffer of values to write, including spaces. + /// \param num_values number of non-null values in the values buffer. + /// \param num_spaced_values length of values buffer, including spaces and does not + /// count some nulls from ancestor (e.g. empty lists). + /// \param valid_bits validity bitmap of values buffer, which does not include some + /// nulls from ancestor (e.g. empty lists). + /// \param valid_bits_offset offset to valid_bits bitmap. + /// \param num_levels number of levels to write, including nulls from values buffer + /// and nulls from ancestor (e.g. empty lists). + /// \param num_nulls number of nulls in the values buffer as well as nulls from the + /// ancestor (e.g. empty lists). void WriteValuesSpaced(const T* values, int64_t num_values, int64_t num_spaced_values, const uint8_t* valid_bits, int64_t valid_bits_offset, - int64_t num_levels) { + int64_t num_levels, int64_t num_nulls) { if (num_values != num_spaced_values) { current_value_encoder_->PutSpaced(values, static_cast(num_spaced_values), valid_bits, valid_bits_offset); @@ -1509,7 +1578,6 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< current_value_encoder_->Put(values, static_cast(num_values)); } if (page_statistics_ != nullptr) { - const int64_t num_nulls = num_levels - num_values; page_statistics_->UpdateSpaced(values, valid_bits, valid_bits_offset, num_spaced_values, num_values, num_nulls); } diff --git a/cpp/src/parquet/column_writer.h b/cpp/src/parquet/column_writer.h index 4dd4b10ccc2bb..fb2675d88e9cf 100644 --- a/cpp/src/parquet/column_writer.h +++ b/cpp/src/parquet/column_writer.h @@ -42,11 +42,13 @@ class RleEncoder; namespace parquet { struct ArrowWriteContext; +class ColumnChunkMetaDataBuilder; class ColumnDescriptor; +class ColumnIndexBuilder; class DataPage; class DictionaryPage; -class ColumnChunkMetaDataBuilder; class Encryptor; +class OffsetIndexBuilder; class WriterProperties; class PARQUET_EXPORT LevelEncoder { @@ -91,7 +93,9 @@ class PARQUET_EXPORT PageWriter { bool buffered_row_group = false, std::shared_ptr header_encryptor = NULLPTR, std::shared_ptr data_encryptor = NULLPTR, - bool page_write_checksum_enabled = false); + bool page_write_checksum_enabled = false, + ColumnIndexBuilder* column_index_builder = NULLPTR, + OffsetIndexBuilder* offset_index_builder = NULLPTR); // The Column Writer decides if dictionary encoding is used if set and // if the dictionary encoding has fallen back to default encoding on reaching dictionary diff --git a/cpp/src/parquet/file_writer.cc b/cpp/src/parquet/file_writer.cc index ec2e0e8a8a304..481d5b6d30dbb 100644 --- a/cpp/src/parquet/file_writer.cc +++ b/cpp/src/parquet/file_writer.cc @@ -28,9 +28,9 @@ #include "parquet/encryption/encryption_internal.h" #include "parquet/encryption/internal_file_encryptor.h" #include "parquet/exception.h" +#include "parquet/page_index.h" #include "parquet/platform.h" #include "parquet/schema.h" -#include "parquet/types.h" using arrow::MemoryPool; @@ -89,7 +89,8 @@ class RowGroupSerializer : public RowGroupWriter::Contents { RowGroupSerializer(std::shared_ptr sink, RowGroupMetaDataBuilder* metadata, int16_t row_group_ordinal, const WriterProperties* properties, bool buffered_row_group = false, - InternalFileEncryptor* file_encryptor = nullptr) + InternalFileEncryptor* file_encryptor = nullptr, + PageIndexBuilder* page_index_builder = nullptr) : sink_(std::move(sink)), metadata_(metadata), properties_(properties), @@ -100,7 +101,8 @@ class RowGroupSerializer : public RowGroupWriter::Contents { next_column_index_(0), num_rows_(0), buffered_row_group_(buffered_row_group), - file_encryptor_(file_encryptor) { + file_encryptor_(file_encryptor), + page_index_builder_(page_index_builder) { if (buffered_row_group) { InitColumns(); } else { @@ -135,8 +137,7 @@ class RowGroupSerializer : public RowGroupWriter::Contents { column_writers_[0]->total_compressed_bytes_written(); } - ++next_column_index_; - + const int32_t column_ordinal = next_column_index_++; const auto& path = col_meta->descr()->path(); auto meta_encryptor = file_encryptor_ ? file_encryptor_->GetColumnMetaEncryptor(path->ToDotString()) @@ -144,11 +145,17 @@ class RowGroupSerializer : public RowGroupWriter::Contents { auto data_encryptor = file_encryptor_ ? file_encryptor_->GetColumnDataEncryptor(path->ToDotString()) : nullptr; + auto ci_builder = page_index_builder_ + ? page_index_builder_->GetColumnIndexBuilder(column_ordinal) + : nullptr; + auto oi_builder = page_index_builder_ + ? page_index_builder_->GetOffsetIndexBuilder(column_ordinal) + : nullptr; std::unique_ptr pager = PageWriter::Open( sink_, properties_->compression(path), properties_->compression_level(path), - col_meta, row_group_ordinal_, static_cast(next_column_index_ - 1), + col_meta, row_group_ordinal_, static_cast(column_ordinal), properties_->memory_pool(), false, meta_encryptor, data_encryptor, - properties_->page_checksum_enabled()); + properties_->page_checksum_enabled(), ci_builder, oi_builder); column_writers_[0] = ColumnWriter::Make(col_meta, std::move(pager), properties_); return column_writers_[0].get(); } @@ -240,6 +247,7 @@ class RowGroupSerializer : public RowGroupWriter::Contents { mutable int64_t num_rows_; bool buffered_row_group_; InternalFileEncryptor* file_encryptor_; + PageIndexBuilder* page_index_builder_; void CheckRowsWritten() const { // verify when only one column is written at a time @@ -267,18 +275,25 @@ class RowGroupSerializer : public RowGroupWriter::Contents { for (int i = 0; i < num_columns(); i++) { auto col_meta = metadata_->NextColumnChunk(); const auto& path = col_meta->descr()->path(); + const int32_t column_ordinal = next_column_index_++; auto meta_encryptor = file_encryptor_ ? file_encryptor_->GetColumnMetaEncryptor(path->ToDotString()) : nullptr; auto data_encryptor = file_encryptor_ ? file_encryptor_->GetColumnDataEncryptor(path->ToDotString()) : nullptr; + auto ci_builder = page_index_builder_ + ? page_index_builder_->GetColumnIndexBuilder(column_ordinal) + : nullptr; + auto oi_builder = page_index_builder_ + ? page_index_builder_->GetOffsetIndexBuilder(column_ordinal) + : nullptr; std::unique_ptr pager = PageWriter::Open( sink_, properties_->compression(path), properties_->compression_level(path), col_meta, static_cast(row_group_ordinal_), - static_cast(next_column_index_++), properties_->memory_pool(), + static_cast(column_ordinal), properties_->memory_pool(), buffered_row_group_, meta_encryptor, data_encryptor, - properties_->page_checksum_enabled()); + properties_->page_checksum_enabled(), ci_builder, oi_builder); column_writers_.push_back( ColumnWriter::Make(col_meta, std::move(pager), properties_)); } @@ -317,6 +332,8 @@ class FileSerializer : public ParquetFileWriter::Contents { } row_group_writer_.reset(); + WritePageIndex(); + // Write magic bytes and metadata auto file_encryption_properties = properties_->file_encryption_properties(); @@ -345,9 +362,12 @@ class FileSerializer : public ParquetFileWriter::Contents { } num_row_groups_++; auto rg_metadata = metadata_->AppendRowGroup(); + if (page_index_builder_) { + page_index_builder_->AppendRowGroup(); + } std::unique_ptr contents(new RowGroupSerializer( sink_, rg_metadata, static_cast(num_row_groups_ - 1), properties_.get(), - buffered_row_group, file_encryptor_.get())); + buffered_row_group, file_encryptor_.get(), page_index_builder_.get())); row_group_writer_ = std::make_unique(std::move(contents)); return row_group_writer_.get(); } @@ -412,6 +432,21 @@ class FileSerializer : public ParquetFileWriter::Contents { } } + void WritePageIndex() { + if (page_index_builder_ != nullptr) { + if (properties_->file_encryption_properties()) { + throw ParquetException("Encryption is not supported with page index"); + } + + // Serialize page index after all row groups have been written and report + // location to the file metadata. + PageIndexLocation page_index_location; + page_index_builder_->Finish(); + page_index_builder_->WriteTo(sink_.get(), &page_index_location); + metadata_->SetPageIndexLocation(page_index_location); + } + } + std::shared_ptr sink_; bool is_open_; const std::shared_ptr properties_; @@ -420,7 +455,7 @@ class FileSerializer : public ParquetFileWriter::Contents { std::unique_ptr metadata_; // Only one of the row group writers is active at a time std::unique_ptr row_group_writer_; - + std::unique_ptr page_index_builder_; std::unique_ptr file_encryptor_; void StartFile() { @@ -459,6 +494,10 @@ class FileSerializer : public ParquetFileWriter::Contents { PARQUET_THROW_NOT_OK(sink_->Write(kParquetMagic, 4)); } } + + if (properties_->write_page_index()) { + page_index_builder_ = PageIndexBuilder::Make(&schema_); + } } }; diff --git a/cpp/src/parquet/metadata.cc b/cpp/src/parquet/metadata.cc index e47257bf89cee..b6c240115f532 100644 --- a/cpp/src/parquet/metadata.cc +++ b/cpp/src/parquet/metadata.cc @@ -1763,6 +1763,40 @@ class FileMetaDataBuilder::FileMetaDataBuilderImpl { return current_row_group_builder_.get(); } + void SetPageIndexLocation(const PageIndexLocation& location) { + auto set_index_location = + [this](size_t row_group_ordinal, + const PageIndexLocation::FileIndexLocation& file_index_location, + bool column_index) { + auto& row_group_metadata = this->row_groups_.at(row_group_ordinal); + auto iter = file_index_location.find(row_group_ordinal); + if (iter != file_index_location.cend()) { + const auto& row_group_index_location = iter->second; + for (size_t i = 0; i < row_group_index_location.size(); ++i) { + if (i >= row_group_metadata.columns.size()) { + throw ParquetException("Cannot find metadata for column ordinal ", i); + } + auto& column_metadata = row_group_metadata.columns.at(i); + const auto& index_location = row_group_index_location.at(i); + if (index_location.has_value()) { + if (column_index) { + column_metadata.__set_column_index_offset(index_location->offset); + column_metadata.__set_column_index_length(index_location->length); + } else { + column_metadata.__set_offset_index_offset(index_location->offset); + column_metadata.__set_offset_index_length(index_location->length); + } + } + } + } + }; + + for (size_t i = 0; i < row_groups_.size(); ++i) { + set_index_location(i, location.column_index_location, true); + set_index_location(i, location.offset_index_location, false); + } + } + std::unique_ptr Finish() { int64_t total_rows = 0; for (auto row_group : row_groups_) { @@ -1888,6 +1922,10 @@ RowGroupMetaDataBuilder* FileMetaDataBuilder::AppendRowGroup() { return impl_->AppendRowGroup(); } +void FileMetaDataBuilder::SetPageIndexLocation(const PageIndexLocation& location) { + impl_->SetPageIndexLocation(location); +} + std::unique_ptr FileMetaDataBuilder::Finish() { return impl_->Finish(); } std::unique_ptr FileMetaDataBuilder::GetCryptoMetaData() { diff --git a/cpp/src/parquet/metadata.h b/cpp/src/parquet/metadata.h index 277f59a1b6072..210d5fbaed1f9 100644 --- a/cpp/src/parquet/metadata.h +++ b/cpp/src/parquet/metadata.h @@ -506,6 +506,21 @@ class PARQUET_EXPORT RowGroupMetaDataBuilder { std::unique_ptr impl_; }; +/// \brief Public struct for location to all page indexes in a parquet file. +struct PageIndexLocation { + /// Alias type of page index location of a row group. The index location + /// is located by column ordinal. If the column does not have the page index, + /// its value is set to std::nullopt. + using RowGroupIndexLocation = std::vector>; + /// Alis type of page index location of a parquet file. The index location + /// is located by the row group ordinal. + using FileIndexLocation = std::map; + /// Row group column index locations which uses row group ordinal as the key. + FileIndexLocation column_index_location; + /// Row group offset index locations which uses row group ordinal as the key. + FileIndexLocation offset_index_location; +}; + class PARQUET_EXPORT FileMetaDataBuilder { public: // API convenience to get a MetaData reader @@ -518,6 +533,9 @@ class PARQUET_EXPORT FileMetaDataBuilder { // The prior RowGroupMetaDataBuilder (if any) is destroyed RowGroupMetaDataBuilder* AppendRowGroup(); + // Update location to all page indexes in the parquet file + void SetPageIndexLocation(const PageIndexLocation& location); + // Complete the Thrift structure std::unique_ptr Finish(); diff --git a/cpp/src/parquet/page_index.cc b/cpp/src/parquet/page_index.cc index f24cecf43b38e..21175fd6479f8 100644 --- a/cpp/src/parquet/page_index.cc +++ b/cpp/src/parquet/page_index.cc @@ -426,6 +426,354 @@ class PageIndexReaderImpl : public PageIndexReader { std::unordered_map index_read_ranges_; }; +/// \brief Internal state of page index builder. +enum class BuilderState { + /// Created but not yet write any data. + kCreated, + /// Some data are written but not yet finished. + kStarted, + /// All data are written and no more write is allowed. + kFinished, + /// The builder has corrupted data or empty data and therefore discarded. + kDiscarded +}; + +template +class ColumnIndexBuilderImpl final : public ColumnIndexBuilder { + public: + using T = typename DType::c_type; + + explicit ColumnIndexBuilderImpl(const ColumnDescriptor* descr) : descr_(descr) { + /// Initialize the null_counts vector as set. Invalid null_counts vector from + /// any page will invalidate the null_counts vector of the column index. + column_index_.__isset.null_counts = true; + column_index_.boundary_order = format::BoundaryOrder::UNORDERED; + } + + void AddPage(const EncodedStatistics& stats) override { + if (state_ == BuilderState::kFinished) { + throw ParquetException("Cannot add page to finished ColumnIndexBuilder."); + } else if (state_ == BuilderState::kDiscarded) { + /// The offset index is discarded. Do nothing. + return; + } + + state_ = BuilderState::kStarted; + + if (stats.all_null_value) { + column_index_.null_pages.emplace_back(true); + column_index_.min_values.emplace_back(""); + column_index_.max_values.emplace_back(""); + } else if (stats.has_min && stats.has_max) { + const size_t page_ordinal = column_index_.null_pages.size(); + non_null_page_indices_.emplace_back(page_ordinal); + column_index_.min_values.emplace_back(stats.min()); + column_index_.max_values.emplace_back(stats.max()); + column_index_.null_pages.emplace_back(false); + } else { + /// This is a non-null page but it lacks of meaningful min/max values. + /// Discard the column index. + state_ = BuilderState::kDiscarded; + return; + } + + if (column_index_.__isset.null_counts && stats.has_null_count) { + column_index_.null_counts.emplace_back(stats.null_count); + } else { + column_index_.__isset.null_counts = false; + } + } + + void Finish() override { + switch (state_) { + case BuilderState::kCreated: { + /// No page is added. Discard the column index. + state_ = BuilderState::kDiscarded; + return; + } + case BuilderState::kFinished: + throw ParquetException("ColumnIndexBuilder is already finished."); + case BuilderState::kDiscarded: + // The column index is discarded. Do nothing. + return; + case BuilderState::kStarted: + break; + } + + state_ = BuilderState::kFinished; + + /// Clear null_counts vector because at least one page does not provide it. + if (!column_index_.__isset.null_counts) { + column_index_.null_counts.clear(); + } + + /// Decode min/max values according to the data type. + const size_t non_null_page_count = non_null_page_indices_.size(); + std::vector min_values, max_values; + min_values.resize(non_null_page_count); + max_values.resize(non_null_page_count); + auto decoder = MakeTypedDecoder(Encoding::PLAIN, descr_); + for (size_t i = 0; i < non_null_page_count; ++i) { + auto page_ordinal = non_null_page_indices_.at(i); + Decode(decoder, column_index_.min_values.at(page_ordinal), &min_values, i); + Decode(decoder, column_index_.max_values.at(page_ordinal), &max_values, i); + } + + /// Decide the boundary order from decoded min/max values. + auto boundary_order = DetermineBoundaryOrder(min_values, max_values); + column_index_.__set_boundary_order(ToThrift(boundary_order)); + } + + void WriteTo(::arrow::io::OutputStream* sink) const override { + if (state_ == BuilderState::kFinished) { + ThriftSerializer{}.Serialize(&column_index_, sink); + } + } + + std::unique_ptr Build() const override { + if (state_ == BuilderState::kFinished) { + return std::make_unique>(*descr_, column_index_); + } + return nullptr; + } + + private: + BoundaryOrder::type DetermineBoundaryOrder(const std::vector& min_values, + const std::vector& max_values) const { + DCHECK_EQ(min_values.size(), max_values.size()); + if (min_values.empty()) { + return BoundaryOrder::Unordered; + } + + std::shared_ptr> comparator; + try { + comparator = MakeComparator(descr_); + } catch (const ParquetException&) { + /// Simply return unordered for unsupported comparator. + return BoundaryOrder::Unordered; + } + + /// Check if both min_values and max_values are in ascending order. + bool is_ascending = true; + for (size_t i = 1; i < min_values.size(); ++i) { + if (comparator->Compare(min_values[i], min_values[i - 1]) || + comparator->Compare(max_values[i], max_values[i - 1])) { + is_ascending = false; + break; + } + } + if (is_ascending) { + return BoundaryOrder::Ascending; + } + + /// Check if both min_values and max_values are in descending order. + bool is_descending = true; + for (size_t i = 1; i < min_values.size(); ++i) { + if (comparator->Compare(min_values[i - 1], min_values[i]) || + comparator->Compare(max_values[i - 1], max_values[i])) { + is_descending = false; + break; + } + } + if (is_descending) { + return BoundaryOrder::Descending; + } + + /// Neither ascending nor descending is detected. + return BoundaryOrder::Unordered; + } + + const ColumnDescriptor* descr_; + format::ColumnIndex column_index_; + std::vector non_null_page_indices_; + BuilderState state_ = BuilderState::kCreated; +}; + +class OffsetIndexBuilderImpl final : public OffsetIndexBuilder { + public: + OffsetIndexBuilderImpl() = default; + + void AddPage(int64_t offset, int32_t compressed_page_size, + int64_t first_row_index) override { + if (state_ == BuilderState::kFinished) { + throw ParquetException("Cannot add page to finished OffsetIndexBuilder."); + } else if (state_ == BuilderState::kDiscarded) { + /// The offset index is discarded. Do nothing. + return; + } + + state_ = BuilderState::kStarted; + + format::PageLocation page_location; + page_location.__set_offset(offset); + page_location.__set_compressed_page_size(compressed_page_size); + page_location.__set_first_row_index(first_row_index); + offset_index_.page_locations.emplace_back(std::move(page_location)); + } + + void Finish(int64_t final_position) override { + switch (state_) { + case BuilderState::kCreated: { + /// No pages are added. Simply discard the offset index. + state_ = BuilderState::kDiscarded; + break; + } + case BuilderState::kStarted: { + /// Adjust page offsets according the final position. + if (final_position > 0) { + for (auto& page_location : offset_index_.page_locations) { + page_location.__set_offset(page_location.offset + final_position); + } + } + state_ = BuilderState::kFinished; + break; + } + case BuilderState::kFinished: + case BuilderState::kDiscarded: + throw ParquetException("OffsetIndexBuilder is already finished"); + } + } + + void WriteTo(::arrow::io::OutputStream* sink) const override { + if (state_ == BuilderState::kFinished) { + ThriftSerializer{}.Serialize(&offset_index_, sink); + } + } + + std::unique_ptr Build() const override { + if (state_ == BuilderState::kFinished) { + return std::make_unique(offset_index_); + } + return nullptr; + } + + private: + format::OffsetIndex offset_index_; + BuilderState state_ = BuilderState::kCreated; +}; + +class PageIndexBuilderImpl final : public PageIndexBuilder { + public: + explicit PageIndexBuilderImpl(const SchemaDescriptor* schema) : schema_(schema) {} + + void AppendRowGroup() override { + if (finished_) { + throw ParquetException( + "Cannot call AppendRowGroup() to finished PageIndexBuilder."); + } + + // Append new builders of next row group. + const auto num_columns = static_cast(schema_->num_columns()); + column_index_builders_.emplace_back(); + offset_index_builders_.emplace_back(); + column_index_builders_.back().resize(num_columns); + offset_index_builders_.back().resize(num_columns); + + DCHECK_EQ(column_index_builders_.size(), offset_index_builders_.size()); + DCHECK_EQ(column_index_builders_.back().size(), num_columns); + DCHECK_EQ(offset_index_builders_.back().size(), num_columns); + } + + ColumnIndexBuilder* GetColumnIndexBuilder(int32_t i) override { + CheckState(i); + std::unique_ptr& builder = column_index_builders_.back()[i]; + if (builder == nullptr) { + builder = ColumnIndexBuilder::Make(schema_->Column(i)); + } + return builder.get(); + } + + OffsetIndexBuilder* GetOffsetIndexBuilder(int32_t i) override { + CheckState(i); + std::unique_ptr& builder = offset_index_builders_.back()[i]; + if (builder == nullptr) { + builder = OffsetIndexBuilder::Make(); + } + return builder.get(); + } + + void Finish() override { finished_ = true; } + + void WriteTo(::arrow::io::OutputStream* sink, + PageIndexLocation* location) const override { + if (!finished_) { + throw ParquetException("Cannot call WriteTo() to unfinished PageIndexBuilder."); + } + + location->column_index_location.clear(); + location->offset_index_location.clear(); + + /// Serialize column index ordered by row group ordinal and then column ordinal. + SerializeIndex(column_index_builders_, sink, &location->column_index_location); + + /// Serialize offset index ordered by row group ordinal and then column ordinal. + SerializeIndex(offset_index_builders_, sink, &location->offset_index_location); + } + + private: + /// Make sure column ordinal is not out of bound and the builder is in good state. + void CheckState(int32_t column_ordinal) const { + if (finished_) { + throw ParquetException("PageIndexBuilder is already finished."); + } + if (column_ordinal < 0 || column_ordinal >= schema_->num_columns()) { + throw ParquetException("Invalid column ordinal: ", column_ordinal); + } + if (offset_index_builders_.empty() || column_index_builders_.empty()) { + throw ParquetException("No row group appended to PageIndexBuilder."); + } + } + + template + void SerializeIndex( + const std::vector>>& page_index_builders, + ::arrow::io::OutputStream* sink, + std::map>>* location) const { + const auto num_columns = static_cast(schema_->num_columns()); + + /// Serialize the same kind of page index row group by row group. + for (size_t row_group = 0; row_group < page_index_builders.size(); ++row_group) { + const auto& row_group_page_index_builders = page_index_builders[row_group]; + DCHECK_EQ(row_group_page_index_builders.size(), num_columns); + + bool has_valid_index = false; + std::vector> locations(num_columns, std::nullopt); + + /// In the same row group, serialize the same kind of page index column by column. + for (size_t column = 0; column < num_columns; ++column) { + const auto& column_page_index_builder = row_group_page_index_builders[column]; + if (column_page_index_builder != nullptr) { + /// Try serializing the page index. + PARQUET_ASSIGN_OR_THROW(int64_t pos_before_write, sink->Tell()); + column_page_index_builder->WriteTo(sink); + PARQUET_ASSIGN_OR_THROW(int64_t pos_after_write, sink->Tell()); + int64_t len = pos_after_write - pos_before_write; + + /// The page index is not serialized and skip reporting its location + if (len == 0) { + continue; + } + + if (len > std::numeric_limits::max()) { + throw ParquetException("Page index size overflows to INT32_MAX"); + } + locations[column] = {pos_before_write, static_cast(len)}; + has_valid_index = true; + } + } + + if (has_valid_index) { + location->emplace(row_group, std::move(locations)); + } + } + } + + const SchemaDescriptor* schema_; + std::vector>> column_index_builders_; + std::vector>> offset_index_builders_; + bool finished_ = false; +}; + } // namespace RowGroupIndexReadRange PageIndexReader::DeterminePageIndexRangesInRowGroup( @@ -531,4 +879,38 @@ std::shared_ptr PageIndexReader::Make( std::move(file_decryptor)); } +std::unique_ptr ColumnIndexBuilder::Make( + const ColumnDescriptor* descr) { + switch (descr->physical_type()) { + case Type::BOOLEAN: + return std::make_unique>(descr); + case Type::INT32: + return std::make_unique>(descr); + case Type::INT64: + return std::make_unique>(descr); + case Type::INT96: + return std::make_unique>(descr); + case Type::FLOAT: + return std::make_unique>(descr); + case Type::DOUBLE: + return std::make_unique>(descr); + case Type::BYTE_ARRAY: + return std::make_unique>(descr); + case Type::FIXED_LEN_BYTE_ARRAY: + return std::make_unique>(descr); + case Type::UNDEFINED: + return nullptr; + } + ::arrow::Unreachable("Cannot make ColumnIndexBuilder of an unknown type"); + return nullptr; +} + +std::unique_ptr OffsetIndexBuilder::Make() { + return std::make_unique(); +} + +std::unique_ptr PageIndexBuilder::Make(const SchemaDescriptor* schema) { + return std::make_unique(schema); +} + } // namespace parquet diff --git a/cpp/src/parquet/page_index.h b/cpp/src/parquet/page_index.h index 79fbce20ed391..c2351d8c0c6f8 100644 --- a/cpp/src/parquet/page_index.h +++ b/cpp/src/parquet/page_index.h @@ -26,8 +26,10 @@ namespace parquet { class ColumnDescriptor; +class EncodedStatistics; class FileMetaData; class InternalFileDecryptor; +struct PageIndexLocation; class ReaderProperties; class RowGroupMetaData; class RowGroupPageIndexReader; @@ -250,4 +252,116 @@ class PARQUET_EXPORT PageIndexReader { const RowGroupMetaData& row_group_metadata, const std::vector& columns); }; +/// \brief Interface for collecting column index of data pages in a column chunk. +class PARQUET_EXPORT ColumnIndexBuilder { + public: + /// \brief API convenience to create a ColumnIndexBuilder. + static std::unique_ptr Make(const ColumnDescriptor* descr); + + virtual ~ColumnIndexBuilder() = default; + + /// \brief Add statistics of a data page. + /// + /// If the ColumnIndexBuilder has seen any corrupted statistics, it will + /// not update statistics any more. + /// + /// \param stats Page statistics in the encoded form. + virtual void AddPage(const EncodedStatistics& stats) = 0; + + /// \brief Complete the column index. + /// + /// Once called, AddPage() can no longer be called. + /// WriteTo() and Build() can only called after Finish() has been called. + virtual void Finish() = 0; + + /// \brief Serialize the column index thrift message. + /// + /// If the ColumnIndexBuilder has seen any corrupted statistics, it will + /// not write any data to the sink. + /// + /// \param[out] sink output stream to write the serialized message. + virtual void WriteTo(::arrow::io::OutputStream* sink) const = 0; + + /// \brief Create a ColumnIndex directly. + /// + /// \return If the ColumnIndexBuilder has seen any corrupted statistics, it simply + /// returns nullptr. Otherwise the column index is built and returned. + virtual std::unique_ptr Build() const = 0; +}; + +/// \brief Interface for collecting offset index of data pages in a column chunk. +class PARQUET_EXPORT OffsetIndexBuilder { + public: + /// \brief API convenience to create a OffsetIndexBuilder. + static std::unique_ptr Make(); + + virtual ~OffsetIndexBuilder() = default; + + /// \brief Add page location of a data page. + virtual void AddPage(int64_t offset, int32_t compressed_page_size, + int64_t first_row_index) = 0; + + /// \brief Add page location of a data page. + void AddPage(const PageLocation& page_location) { + AddPage(page_location.offset, page_location.compressed_page_size, + page_location.first_row_index); + } + + /// \brief Complete the offset index. + /// + /// In the buffered row group mode, data pages are flushed into memory + /// sink and the OffsetIndexBuilder has only collected the relative offset + /// which requires adjustment once they are flushed to the file. + /// + /// \param final_position Final stream offset to add for page offset adjustment. + virtual void Finish(int64_t final_position) = 0; + + /// \brief Serialize the offset index thrift message. + /// + /// \param[out] sink output stream to write the serialized message. + virtual void WriteTo(::arrow::io::OutputStream* sink) const = 0; + + /// \brief Create an OffsetIndex directly. + virtual std::unique_ptr Build() const = 0; +}; + +/// \brief Interface for collecting page index of a parquet file. +class PARQUET_EXPORT PageIndexBuilder { + public: + /// \brief API convenience to create a PageIndexBuilder. + static std::unique_ptr Make(const SchemaDescriptor* schema); + + virtual ~PageIndexBuilder() = default; + + /// \brief Start a new row group. + virtual void AppendRowGroup() = 0; + + /// \brief Get the ColumnIndexBuilder from column ordinal. + /// + /// \param i Column ordinal. + /// \return ColumnIndexBuilder for the column and its memory ownership belongs to + /// the PageIndexBuilder. + virtual ColumnIndexBuilder* GetColumnIndexBuilder(int32_t i) = 0; + + /// \brief Get the OffsetIndexBuilder from column ordinal. + /// + /// \param i Column ordinal. + /// \return OffsetIndexBuilder for the column and its memory ownership belongs to + /// the PageIndexBuilder. + virtual OffsetIndexBuilder* GetOffsetIndexBuilder(int32_t i) = 0; + + /// \brief Complete the page index builder and no more write is allowed. + virtual void Finish() = 0; + + /// \brief Serialize the page index thrift message. + /// + /// Only valid column indexes and offset indexes are serialized and their locations + /// are set. + /// + /// \param[out] sink The output stream to write the page index. + /// \param[out] location The location of all page index to the start of sink. + virtual void WriteTo(::arrow::io::OutputStream* sink, + PageIndexLocation* location) const = 0; +}; + } // namespace parquet diff --git a/cpp/src/parquet/page_index_test.cc b/cpp/src/parquet/page_index_test.cc index 46599960b8a53..6634d9232ec64 100644 --- a/cpp/src/parquet/page_index_test.cc +++ b/cpp/src/parquet/page_index_test.cc @@ -416,4 +416,420 @@ TEST(PageIndex, DeterminePageIndexRangesInRowGroupWithMissingPageIndex) { -1); } +TEST(PageIndex, WriteOffsetIndex) { + /// Create offset index via the OffsetIndexBuilder interface. + auto builder = OffsetIndexBuilder::Make(); + const size_t num_pages = 5; + const std::vector offsets = {100, 200, 300, 400, 500}; + const std::vector page_sizes = {1024, 2048, 3072, 4096, 8192}; + const std::vector first_row_indices = {0, 10000, 20000, 30000, 40000}; + for (size_t i = 0; i < num_pages; ++i) { + builder->AddPage(offsets[i], page_sizes[i], first_row_indices[i]); + } + const int64_t final_position = 4096; + builder->Finish(final_position); + + std::vector> offset_indexes; + /// 1st element is the offset index just built. + offset_indexes.emplace_back(builder->Build()); + /// 2nd element is the offset index restored by serialize-then-deserialize round trip. + auto sink = CreateOutputStream(); + builder->WriteTo(sink.get()); + PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish()); + offset_indexes.emplace_back(OffsetIndex::Make(buffer->data(), + static_cast(buffer->size()), + default_reader_properties())); + + /// Verify the data of the offset index. + for (const auto& offset_index : offset_indexes) { + ASSERT_EQ(num_pages, offset_index->page_locations().size()); + for (size_t i = 0; i < num_pages; ++i) { + const auto& page_location = offset_index->page_locations().at(i); + ASSERT_EQ(offsets[i] + final_position, page_location.offset); + ASSERT_EQ(page_sizes[i], page_location.compressed_page_size); + ASSERT_EQ(first_row_indices[i], page_location.first_row_index); + } + } +} + +void TestWriteTypedColumnIndex(schema::NodePtr node, + const std::vector& page_stats, + BoundaryOrder::type boundary_order, bool has_null_counts) { + auto descr = std::make_unique(node, /*max_definition_level=*/1, 0); + + auto builder = ColumnIndexBuilder::Make(descr.get()); + for (const auto& stats : page_stats) { + builder->AddPage(stats); + } + ASSERT_NO_THROW(builder->Finish()); + + std::vector> column_indexes; + /// 1st element is the column index just built. + column_indexes.emplace_back(builder->Build()); + /// 2nd element is the column index restored by serialize-then-deserialize round trip. + auto sink = CreateOutputStream(); + builder->WriteTo(sink.get()); + PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish()); + column_indexes.emplace_back(ColumnIndex::Make(*descr, buffer->data(), + static_cast(buffer->size()), + default_reader_properties())); + + /// Verify the data of the column index. + for (const auto& column_index : column_indexes) { + ASSERT_EQ(boundary_order, column_index->boundary_order()); + ASSERT_EQ(has_null_counts, column_index->has_null_counts()); + const size_t num_pages = column_index->null_pages().size(); + for (size_t i = 0; i < num_pages; ++i) { + ASSERT_EQ(page_stats[i].all_null_value, column_index->null_pages()[i]); + ASSERT_EQ(page_stats[i].min(), column_index->encoded_min_values()[i]); + ASSERT_EQ(page_stats[i].max(), column_index->encoded_max_values()[i]); + if (has_null_counts) { + ASSERT_EQ(page_stats[i].null_count, column_index->null_counts()[i]); + } + } + } +} + +TEST(PageIndex, WriteInt32ColumnIndex) { + auto encode = [=](int32_t value) { + return std::string(reinterpret_cast(&value), sizeof(int32_t)); + }; + + // Integer values in the ascending order. + std::vector page_stats(3); + page_stats.at(0).set_null_count(1).set_min(encode(1)).set_max(encode(2)); + page_stats.at(1).set_null_count(2).set_min(encode(2)).set_max(encode(3)); + page_stats.at(2).set_null_count(3).set_min(encode(3)).set_max(encode(4)); + + TestWriteTypedColumnIndex(schema::Int32("c1"), page_stats, BoundaryOrder::Ascending, + /*has_null_counts=*/true); +} + +TEST(PageIndex, WriteInt64ColumnIndex) { + auto encode = [=](int64_t value) { + return std::string(reinterpret_cast(&value), sizeof(int64_t)); + }; + + // Integer values in the descending order. + std::vector page_stats(3); + page_stats.at(0).set_null_count(4).set_min(encode(-1)).set_max(encode(-2)); + page_stats.at(1).set_null_count(0).set_min(encode(-2)).set_max(encode(-3)); + page_stats.at(2).set_null_count(4).set_min(encode(-3)).set_max(encode(-4)); + + TestWriteTypedColumnIndex(schema::Int64("c1"), page_stats, BoundaryOrder::Descending, + /*has_null_counts=*/true); +} + +TEST(PageIndex, WriteFloatColumnIndex) { + auto encode = [=](float value) { + return std::string(reinterpret_cast(&value), sizeof(float)); + }; + + // Float values with no specific order. + std::vector page_stats(3); + page_stats.at(0).set_null_count(0).set_min(encode(2.2F)).set_max(encode(4.4F)); + page_stats.at(1).set_null_count(0).set_min(encode(1.1F)).set_max(encode(5.5F)); + page_stats.at(2).set_null_count(0).set_min(encode(3.3F)).set_max(encode(6.6F)); + + TestWriteTypedColumnIndex(schema::Float("c1"), page_stats, BoundaryOrder::Unordered, + /*has_null_counts=*/true); +} + +TEST(PageIndex, WriteDoubleColumnIndex) { + auto encode = [=](double value) { + return std::string(reinterpret_cast(&value), sizeof(double)); + }; + + // Double values with no specific order and without null count. + std::vector page_stats(3); + page_stats.at(0).set_min(encode(1.2)).set_max(encode(4.4)); + page_stats.at(1).set_min(encode(2.2)).set_max(encode(5.5)); + page_stats.at(2).set_min(encode(3.3)).set_max(encode(-6.6)); + + TestWriteTypedColumnIndex(schema::Double("c1"), page_stats, BoundaryOrder::Unordered, + /*has_null_counts=*/false); +} + +TEST(PageIndex, WriteByteArrayColumnIndex) { + // Byte array values with identical min/max. + std::vector page_stats(3); + page_stats.at(0).set_min("bar").set_max("foo"); + page_stats.at(1).set_min("bar").set_max("foo"); + page_stats.at(2).set_min("bar").set_max("foo"); + + TestWriteTypedColumnIndex(schema::ByteArray("c1"), page_stats, BoundaryOrder::Ascending, + /*has_null_counts=*/false); +} + +TEST(PageIndex, WriteFLBAColumnIndex) { + // FLBA values in the ascending order with some null pages + std::vector page_stats(5); + page_stats.at(0).set_min("abc").set_max("ABC"); + page_stats.at(1).all_null_value = true; + page_stats.at(2).set_min("foo").set_max("FOO"); + page_stats.at(3).all_null_value = true; + page_stats.at(4).set_min("xyz").set_max("XYZ"); + + auto node = + schema::PrimitiveNode::Make("c1", Repetition::OPTIONAL, Type::FIXED_LEN_BYTE_ARRAY, + ConvertedType::NONE, /*length=*/3); + TestWriteTypedColumnIndex(std::move(node), page_stats, BoundaryOrder::Ascending, + /*has_null_counts=*/false); +} + +TEST(PageIndex, WriteColumnIndexWithAllNullPages) { + // All values are null. + std::vector page_stats(3); + page_stats.at(0).set_null_count(100).all_null_value = true; + page_stats.at(1).set_null_count(100).all_null_value = true; + page_stats.at(2).set_null_count(100).all_null_value = true; + + TestWriteTypedColumnIndex(schema::Int32("c1"), page_stats, BoundaryOrder::Unordered, + /*has_null_counts=*/true); +} + +TEST(PageIndex, WriteColumnIndexWithInvalidNullCounts) { + auto encode = [=](int32_t value) { + return std::string(reinterpret_cast(&value), sizeof(int32_t)); + }; + + // Some pages do not provide null_count + std::vector page_stats(3); + page_stats.at(0).set_min(encode(1)).set_max(encode(2)).set_null_count(0); + page_stats.at(1).set_min(encode(1)).set_max(encode(3)); + page_stats.at(2).set_min(encode(2)).set_max(encode(3)).set_null_count(0); + + TestWriteTypedColumnIndex(schema::Int32("c1"), page_stats, BoundaryOrder::Ascending, + /*has_null_counts=*/false); +} + +TEST(PageIndex, WriteColumnIndexWithCorruptedStats) { + auto encode = [=](int32_t value) { + return std::string(reinterpret_cast(&value), sizeof(int32_t)); + }; + + // 2nd page does not set anything + std::vector page_stats(3); + page_stats.at(0).set_min(encode(1)).set_max(encode(2)); + page_stats.at(2).set_min(encode(3)).set_max(encode(4)); + + ColumnDescriptor descr(schema::Int32("c1"), /*max_definition_level=*/1, 0); + auto builder = ColumnIndexBuilder::Make(&descr); + for (const auto& stats : page_stats) { + builder->AddPage(stats); + } + ASSERT_NO_THROW(builder->Finish()); + ASSERT_EQ(nullptr, builder->Build()); + + auto sink = CreateOutputStream(); + builder->WriteTo(sink.get()); + PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish()); + EXPECT_EQ(0, buffer->size()); +} + +TEST(PageIndex, TestPageIndexBuilderWithZeroRowGroup) { + schema::NodeVector fields = {schema::Int32("c1"), schema::ByteArray("c2")}; + schema::NodePtr root = schema::GroupNode::Make("schema", Repetition::REPEATED, fields); + SchemaDescriptor schema; + schema.Init(root); + + auto builder = PageIndexBuilder::Make(&schema); + + // AppendRowGroup() is not called and expect throw. + ASSERT_THROW(builder->GetColumnIndexBuilder(0), ParquetException); + ASSERT_THROW(builder->GetOffsetIndexBuilder(0), ParquetException); + + // Finish the builder without calling AppendRowGroup(). + ASSERT_NO_THROW(builder->Finish()); + + // Verify WriteTo does not write anything. + auto sink = CreateOutputStream(); + PageIndexLocation location; + builder->WriteTo(sink.get(), &location); + PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish()); + ASSERT_EQ(0, buffer->size()); + ASSERT_TRUE(location.column_index_location.empty()); + ASSERT_TRUE(location.offset_index_location.empty()); +} + +TEST(PageIndex, TestPageIndexBuilderWithSingleRowGroup) { + schema::NodePtr root = schema::GroupNode::Make( + "schema", Repetition::REPEATED, + {schema::ByteArray("c1"), schema::ByteArray("c2"), schema::ByteArray("c3")}); + SchemaDescriptor schema; + schema.Init(root); + + // Prepare page stats and page locations. + const std::vector page_stats = { + EncodedStatistics().set_null_count(0).set_min("a").set_max("b"), + EncodedStatistics().set_null_count(0).set_min("A").set_max("B")}; + const std::vector page_locations = { + {/*offset=*/128, /*compressed_page_size=*/512, + /*first_row_index=*/0}, + {/*offset=*/1024, /*compressed_page_size=*/512, + /*first_row_index=*/0}}; + const int64_t final_position = 200; + + // Create builder and add pages of single row group. + // Note that the 3rd column does not add any pages and its page index is disabled. + auto builder = PageIndexBuilder::Make(&schema); + ASSERT_NO_THROW(builder->AppendRowGroup()); + for (int i = 0; i < 2; ++i) { + ASSERT_NO_THROW(builder->GetColumnIndexBuilder(i)->AddPage(page_stats.at(i))); + ASSERT_NO_THROW(builder->GetColumnIndexBuilder(i)->Finish()); + ASSERT_NO_THROW(builder->GetOffsetIndexBuilder(i)->AddPage(page_locations.at(i))); + ASSERT_NO_THROW(builder->GetOffsetIndexBuilder(i)->Finish(final_position)); + } + ASSERT_NO_THROW(builder->Finish()); + + // Verify WriteTo only serializes page index of first two columns. + auto sink = CreateOutputStream(); + PageIndexLocation location; + builder->WriteTo(sink.get(), &location); + PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish()); + + const size_t num_row_groups = 1; + const size_t num_columns = 3; + ASSERT_EQ(num_row_groups, location.column_index_location.size()); + ASSERT_EQ(num_row_groups, location.offset_index_location.size()); + auto column_index_locations = location.column_index_location[0]; + auto offset_index_locations = location.offset_index_location[0]; + ASSERT_EQ(num_columns, column_index_locations.size()); + ASSERT_EQ(num_columns, offset_index_locations.size()); + + auto properties = default_reader_properties(); + for (int i = 0; i < 3; i++) { + if (i < 2) { + ASSERT_TRUE(column_index_locations[i].has_value()); + ASSERT_TRUE(offset_index_locations[i].has_value()); + auto ci_location = column_index_locations[i].value(); + auto oi_location = offset_index_locations[i].value(); + + auto column_index = + ColumnIndex::Make(*schema.Column(i), buffer->data() + ci_location.offset, + static_cast(ci_location.length), properties); + const size_t num_pages = 1; + ASSERT_EQ(num_pages, column_index->null_pages().size()); + ASSERT_EQ(page_stats[i].all_null_value, column_index->null_pages()[0]); + ASSERT_EQ(page_stats[i].min(), column_index->encoded_min_values()[0]); + ASSERT_EQ(page_stats[i].max(), column_index->encoded_max_values()[0]); + ASSERT_TRUE(column_index->has_null_counts()); + ASSERT_EQ(page_stats[i].null_count, column_index->null_counts()[0]); + + auto offset_index = + OffsetIndex::Make(buffer->data() + oi_location.offset, + static_cast(oi_location.length), properties); + ASSERT_EQ(num_pages, offset_index->page_locations().size()); + ASSERT_EQ(page_locations[i].offset + final_position, + offset_index->page_locations()[0].offset); + ASSERT_EQ(page_locations[i].compressed_page_size, + offset_index->page_locations()[0].compressed_page_size); + ASSERT_EQ(page_locations[i].first_row_index, + offset_index->page_locations()[0].first_row_index); + } else { + ASSERT_FALSE(column_index_locations[i].has_value()); + ASSERT_FALSE(offset_index_locations[i].has_value()); + } + } +} + +TEST(PageIndex, TestPageIndexBuilderWithTwoRowGroups) { + schema::NodePtr root = schema::GroupNode::Make( + "schema", Repetition::REPEATED, {schema::ByteArray("c1"), schema::ByteArray("c2")}); + SchemaDescriptor schema; + schema.Init(root); + + // Prepare page stats and page locations for two row groups. + const std::vector> page_stats = { + /* 1st row group */ + {EncodedStatistics().set_min("a").set_max("b"), + EncodedStatistics().set_null_count(0).set_min("A").set_max("B")}, + /* 2nd row group */ + {EncodedStatistics() /* corrupted stats */, + EncodedStatistics().set_null_count(0).set_min("bar").set_max("foo")}}; + const std::vector> page_locations = { + /* 1st row group */ + {{/*offset=*/128, /*compressed_page_size=*/512, + /*first_row_index=*/0}, + {/*offset=*/1024, /*compressed_page_size=*/512, + /*first_row_index=*/0}}, + /* 2nd row group */ + {{/*offset=*/128, /*compressed_page_size=*/512, + /*first_row_index=*/0}, + {/*offset=*/1024, /*compressed_page_size=*/512, + /*first_row_index=*/0}}}; + const std::vector final_positions = {1024, 2048}; + + // Create builder and add pages of two row groups. + const size_t num_row_groups = 2; + const size_t num_columns = 2; + const size_t num_pages = 1; + auto builder = PageIndexBuilder::Make(&schema); + for (size_t rg = 0; rg < num_row_groups; ++rg) { + ASSERT_NO_THROW(builder->AppendRowGroup()); + for (int c = 0; c < static_cast(num_columns); ++c) { + ASSERT_NO_THROW(builder->GetColumnIndexBuilder(c)->AddPage(page_stats[rg][c])); + ASSERT_NO_THROW(builder->GetColumnIndexBuilder(c)->Finish()); + ASSERT_NO_THROW(builder->GetOffsetIndexBuilder(c)->AddPage(page_locations[rg][c])); + ASSERT_NO_THROW(builder->GetOffsetIndexBuilder(c)->Finish(final_positions[rg])); + } + } + ASSERT_NO_THROW(builder->Finish()); + + // Verify WriteTo only serializes valid page index. + auto sink = CreateOutputStream(); + PageIndexLocation location; + builder->WriteTo(sink.get(), &location); + PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish()); + ASSERT_EQ(num_row_groups, location.column_index_location.size()); + ASSERT_EQ(num_row_groups, location.offset_index_location.size()); + + // Verify data of deserialized page index. + auto properties = default_reader_properties(); + for (size_t rg = 0; rg < num_row_groups; ++rg) { + auto column_index_locations = location.column_index_location[rg]; + auto offset_index_locations = location.offset_index_location[rg]; + ASSERT_EQ(num_columns, column_index_locations.size()); + ASSERT_EQ(num_columns, offset_index_locations.size()); + + for (int c = 0; c < static_cast(num_columns); c++) { + ASSERT_TRUE(offset_index_locations[c].has_value()); + auto oi_location = offset_index_locations[c].value(); + + auto offset_index = + OffsetIndex::Make(buffer->data() + oi_location.offset, + static_cast(oi_location.length), properties); + ASSERT_EQ(num_pages, offset_index->page_locations().size()); + ASSERT_EQ(page_locations[rg][c].offset + final_positions[rg], + offset_index->page_locations()[0].offset); + ASSERT_EQ(page_locations[rg][c].compressed_page_size, + offset_index->page_locations()[0].compressed_page_size); + ASSERT_EQ(page_locations[rg][c].first_row_index, + offset_index->page_locations()[0].first_row_index); + + if (rg == 1 && c == 0) { + // Corrupted stats. + ASSERT_FALSE(column_index_locations[c].has_value()); + } else { + ASSERT_TRUE(column_index_locations[c].has_value()); + auto ci_location = column_index_locations[c].value(); + + auto column_index = + ColumnIndex::Make(*schema.Column(c), buffer->data() + ci_location.offset, + static_cast(ci_location.length), properties); + ASSERT_EQ(num_pages, column_index->null_pages().size()); + ASSERT_EQ(page_stats[rg][c].all_null_value, column_index->null_pages()[0]); + ASSERT_EQ(page_stats[rg][c].min(), column_index->encoded_min_values()[0]); + ASSERT_EQ(page_stats[rg][c].max(), column_index->encoded_max_values()[0]); + if (c == 1) { + ASSERT_TRUE(column_index->has_null_counts()); + ASSERT_EQ(page_stats[rg][c].null_count, column_index->null_counts()[0]); + } else { + ASSERT_FALSE(column_index->has_null_counts()); + } + } + } + } +} + } // namespace parquet diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index 572194f4ee39e..d89278896050d 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -208,7 +208,8 @@ class PARQUET_EXPORT WriterProperties { data_page_version_(ParquetDataPageVersion::V1), created_by_(DEFAULT_CREATED_BY), store_decimal_as_integer_(false), - page_checksum_enabled_(false) {} + page_checksum_enabled_(false), + write_page_index_(false) {} virtual ~Builder() {} /// Specify the memory pool for the writer. Default default_memory_pool. @@ -501,6 +502,28 @@ class PARQUET_EXPORT WriterProperties { return this; } + /// Enable writing page index. + /// + /// Page index contains statistics for data pages and can be used to skip pages + /// when scanning data in ordered and unordered columns. + /// + /// Please check the link below for more details: + /// https://github.com/apache/parquet-format/blob/master/PageIndex.md + /// + /// Default disabled. + Builder* enable_write_page_index() { + write_page_index_ = true; + return this; + } + + /// Disable writing page index. + /// + /// Default disabled. + Builder* disable_write_page_index() { + write_page_index_ = false; + return this; + } + /// \brief Build the WriterProperties with the builder parameters. /// \return The WriterProperties defined by the builder. std::shared_ptr build() { @@ -526,7 +549,8 @@ class PARQUET_EXPORT WriterProperties { pool_, dictionary_pagesize_limit_, write_batch_size_, max_row_group_length_, pagesize_, version_, created_by_, page_checksum_enabled_, std::move(file_encryption_properties_), default_column_properties_, - column_properties, data_page_version_, store_decimal_as_integer_)); + column_properties, data_page_version_, store_decimal_as_integer_, + write_page_index_)); } private: @@ -540,6 +564,7 @@ class PARQUET_EXPORT WriterProperties { std::string created_by_; bool store_decimal_as_integer_; bool page_checksum_enabled_; + bool write_page_index_; std::shared_ptr file_encryption_properties_; @@ -574,6 +599,8 @@ class PARQUET_EXPORT WriterProperties { inline bool page_checksum_enabled() const { return page_checksum_enabled_; } + inline bool write_page_index() const { return write_page_index_; } + inline Encoding::type dictionary_index_encoding() const { if (parquet_version_ == ParquetVersion::PARQUET_1_0) { return Encoding::PLAIN_DICTIONARY; @@ -642,7 +669,8 @@ class PARQUET_EXPORT WriterProperties { std::shared_ptr file_encryption_properties, const ColumnProperties& default_column_properties, const std::unordered_map& column_properties, - ParquetDataPageVersion data_page_version, bool store_short_decimal_as_integer) + ParquetDataPageVersion data_page_version, bool store_short_decimal_as_integer, + bool write_page_index) : pool_(pool), dictionary_pagesize_limit_(dictionary_pagesize_limit), write_batch_size_(write_batch_size), @@ -653,6 +681,7 @@ class PARQUET_EXPORT WriterProperties { parquet_created_by_(created_by), store_decimal_as_integer_(store_short_decimal_as_integer), page_checksum_enabled_(page_write_checksum_enabled), + write_page_index_(write_page_index), file_encryption_properties_(file_encryption_properties), default_column_properties_(default_column_properties), column_properties_(column_properties) {} @@ -667,6 +696,7 @@ class PARQUET_EXPORT WriterProperties { std::string parquet_created_by_; bool store_decimal_as_integer_; bool page_checksum_enabled_; + bool write_page_index_; std::shared_ptr file_encryption_properties_; diff --git a/cpp/src/parquet/statistics.cc b/cpp/src/parquet/statistics.cc index d0130605cd887..aa6df7e32a98c 100644 --- a/cpp/src/parquet/statistics.cc +++ b/cpp/src/parquet/statistics.cc @@ -610,6 +610,8 @@ class TypedStatisticsImpl : public TypedStatistics { if (HasNullCount()) { s.set_null_count(this->null_count()); } + // num_values_ is reliable and it means number of non-null values. + s.all_null_value = num_values_ == 0; return s; } @@ -625,7 +627,7 @@ class TypedStatisticsImpl : public TypedStatistics { T min_; T max_; ::arrow::MemoryPool* pool_; - int64_t num_values_ = 0; + int64_t num_values_ = 0; // # of non-null values. EncodedStatistics statistics_; std::shared_ptr> comparator_; std::shared_ptr min_buffer_, max_buffer_; diff --git a/cpp/src/parquet/statistics.h b/cpp/src/parquet/statistics.h index 71d9b662baa40..3f168a938ffbe 100644 --- a/cpp/src/parquet/statistics.h +++ b/cpp/src/parquet/statistics.h @@ -137,6 +137,12 @@ class PARQUET_EXPORT EncodedStatistics { bool has_null_count = false; bool has_distinct_count = false; + // When all values in the statistics are null, it is set to true. + // Otherwise, at least one value is not null, or we are not sure at all. + // Page index requires this information to decide whether a data page + // is a null page or not. + bool all_null_value = false; + // From parquet-mr // Don't write stats larger than the max size rather than truncating. The // rationale is that some engines may use the minimum value in the page as diff --git a/cpp/src/parquet/thrift_internal.h b/cpp/src/parquet/thrift_internal.h index 9cc702dfcdda4..56e2a67c8abd6 100644 --- a/cpp/src/parquet/thrift_internal.h +++ b/cpp/src/parquet/thrift_internal.h @@ -295,6 +295,18 @@ static inline format::CompressionCodec::type ToThrift(Compression::type type) { } } +static inline format::BoundaryOrder::type ToThrift(BoundaryOrder::type type) { + switch (type) { + case BoundaryOrder::Unordered: + case BoundaryOrder::Ascending: + case BoundaryOrder::Descending: + return static_cast(type); + default: + DCHECK(false) << "Cannot reach here"; + return format::BoundaryOrder::UNORDERED; + } +} + static inline format::Statistics ToThrift(const EncodedStatistics& stats) { format::Statistics statistics; if (stats.has_min) {