From abf6967ba0ff40486adfb571a8859e18fdf70785 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Sat, 16 Mar 2024 23:50:02 +0800 Subject: [PATCH] GH-40592: [C++][Parquet] Implement SizeStatistics --- cpp/src/parquet/CMakeLists.txt | 2 + cpp/src/parquet/column_page.h | 24 ++- cpp/src/parquet/column_writer.cc | 134 +++++++++--- cpp/src/parquet/metadata.cc | 20 ++ cpp/src/parquet/metadata.h | 3 + cpp/src/parquet/page_index.cc | 79 ++++++- cpp/src/parquet/page_index.h | 36 +++- cpp/src/parquet/properties.h | 34 ++- cpp/src/parquet/size_statistics.cc | 266 ++++++++++++++++++++++++ cpp/src/parquet/size_statistics.h | 156 ++++++++++++++ cpp/src/parquet/size_statistics_test.cc | 105 ++++++++++ cpp/src/parquet/thrift_internal.h | 14 ++ cpp/submodules/parquet-testing | 2 +- 13 files changed, 826 insertions(+), 49 deletions(-) create mode 100644 cpp/src/parquet/size_statistics.cc create mode 100644 cpp/src/parquet/size_statistics.h create mode 100644 cpp/src/parquet/size_statistics_test.cc diff --git a/cpp/src/parquet/CMakeLists.txt b/cpp/src/parquet/CMakeLists.txt index b984ef77adbe0..7b2b7ed9f8b8b 100644 --- a/cpp/src/parquet/CMakeLists.txt +++ b/cpp/src/parquet/CMakeLists.txt @@ -184,6 +184,7 @@ set(PARQUET_SRCS printer.cc properties.cc schema.cc + size_statistics.cc statistics.cc stream_reader.cc stream_writer.cc @@ -377,6 +378,7 @@ add_parquet_test(internals-test metadata_test.cc page_index_test.cc public_api_test.cc + size_statistics_test.cc types_test.cc) set_source_files_properties(public_api_test.cc PROPERTIES SKIP_PRECOMPILE_HEADERS ON diff --git a/cpp/src/parquet/column_page.h b/cpp/src/parquet/column_page.h index b389ffd98e6c7..8f9588ea33566 100644 --- a/cpp/src/parquet/column_page.h +++ b/cpp/src/parquet/column_page.h @@ -26,6 +26,7 @@ #include #include +#include "parquet/size_statistics.h" #include "parquet/statistics.h" #include "parquet/types.h" @@ -69,20 +70,24 @@ class DataPage : public Page { /// Currently it is only present from data pages created by ColumnWriter in order /// to collect page index. std::optional first_row_index() const { return first_row_index_; } + const std::shared_ptr& size_statistics() const { + return size_statistics_; + } virtual ~DataPage() = default; protected: DataPage(PageType::type type, const std::shared_ptr& buffer, int32_t num_values, Encoding::type encoding, int64_t uncompressed_size, - EncodedStatistics statistics = EncodedStatistics(), - std::optional first_row_index = std::nullopt) + EncodedStatistics statistics, std::optional first_row_index, + std::shared_ptr size_statistics) : Page(buffer, type), num_values_(num_values), encoding_(encoding), uncompressed_size_(uncompressed_size), statistics_(std::move(statistics)), - first_row_index_(std::move(first_row_index)) {} + first_row_index_(std::move(first_row_index)), + size_statistics_(std::move(size_statistics)) {} int32_t num_values_; Encoding::type encoding_; @@ -90,6 +95,7 @@ class DataPage : public Page { EncodedStatistics statistics_; /// Row ordinal within the row group to the first row in the data page. std::optional first_row_index_; + std::shared_ptr size_statistics_; }; class DataPageV1 : public DataPage { @@ -98,9 +104,11 @@ class DataPageV1 : public DataPage { Encoding::type encoding, Encoding::type definition_level_encoding, Encoding::type repetition_level_encoding, int64_t uncompressed_size, EncodedStatistics statistics = EncodedStatistics(), - std::optional first_row_index = std::nullopt) + std::optional first_row_index = std::nullopt, + std::shared_ptr size_statistics = NULLPTR) : DataPage(PageType::DATA_PAGE, buffer, num_values, encoding, uncompressed_size, - std::move(statistics), std::move(first_row_index)), + std::move(statistics), std::move(first_row_index), + std::move(size_statistics)), definition_level_encoding_(definition_level_encoding), repetition_level_encoding_(repetition_level_encoding) {} @@ -120,9 +128,11 @@ class DataPageV2 : public DataPage { int32_t definition_levels_byte_length, int32_t repetition_levels_byte_length, int64_t uncompressed_size, bool is_compressed = false, EncodedStatistics statistics = EncodedStatistics(), - std::optional first_row_index = std::nullopt) + std::optional first_row_index = std::nullopt, + std::shared_ptr size_statistics = NULLPTR) : DataPage(PageType::DATA_PAGE_V2, buffer, num_values, encoding, uncompressed_size, - std::move(statistics), std::move(first_row_index)), + std::move(statistics), std::move(first_row_index), + std::move(size_statistics)), num_nulls_(num_nulls), num_rows_(num_rows), definition_levels_byte_length_(definition_levels_byte_length), diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc index b7ff712abebe9..513ece50f616c 100644 --- a/cpp/src/parquet/column_writer.cc +++ b/cpp/src/parquet/column_writer.cc @@ -55,6 +55,7 @@ #include "parquet/platform.h" #include "parquet/properties.h" #include "parquet/schema.h" +#include "parquet/size_statistics.h" #include "parquet/statistics.h" #include "parquet/thrift_internal.h" #include "parquet/types.h" @@ -434,10 +435,11 @@ class SerializedPageWriter : public PageWriter { const int64_t header_size = thrift_serializer_->Serialize(&page_header, sink_.get(), meta_encryptor_.get()); PARQUET_THROW_NOT_OK(sink_->Write(output_data_buffer, output_data_len)); + const auto& page_size_stats = page.size_statistics(); /// Collect page index if (column_index_builder_ != nullptr) { - column_index_builder_->AddPage(page.statistics()); + column_index_builder_->AddPage(page.statistics(), page_size_stats.get()); } if (offset_index_builder_ != nullptr) { const int64_t compressed_size = output_data_len + header_size; @@ -451,8 +453,10 @@ class SerializedPageWriter : public PageWriter { /// start_pos is a relative offset in the buffered mode. It should be /// adjusted via OffsetIndexBuilder::Finish() after BufferedPageWriter /// has flushed all data pages. - offset_index_builder_->AddPage(start_pos, static_cast(compressed_size), - *page.first_row_index()); + offset_index_builder_->AddPage( + start_pos, static_cast(compressed_size), *page.first_row_index(), + page_size_stats ? page_size_stats->unencoded_byte_array_data_bytes() + : std::nullopt); } total_uncompressed_size_ += uncompressed_size + header_size; @@ -774,11 +778,13 @@ class ColumnWriterImpl { // Serializes Dictionary Page if enabled virtual void WriteDictionaryPage() = 0; + using StatisticsPair = std::pair>; + // Plain-encoded statistics of the current page - virtual EncodedStatistics GetPageStatistics() = 0; + virtual StatisticsPair GetPageStatistics() = 0; // Plain-encoded statistics of the whole chunk - virtual EncodedStatistics GetChunkStatistics() = 0; + virtual StatisticsPair GetChunkStatistics() = 0; // Merges page statistics into chunk statistics, then resets the values virtual void ResetPageStatistics() = 0; @@ -982,7 +988,9 @@ void ColumnWriterImpl::BuildDataPageV1(int64_t definition_levels_rle_size, ConcatenateBuffers(definition_levels_rle_size, repetition_levels_rle_size, values, uncompressed_data_->mutable_data()); - EncodedStatistics page_stats = GetPageStatistics(); + EncodedStatistics page_stats; + std::shared_ptr page_size_stats; + std::tie(page_stats, page_size_stats) = GetPageStatistics(); page_stats.ApplyStatSizeLimits(properties_->max_statistics_size(descr_->path())); page_stats.set_is_signed(SortOrder::SIGNED == descr_->sort_order()); ResetPageStatistics(); @@ -1006,13 +1014,15 @@ void ColumnWriterImpl::BuildDataPageV1(int64_t definition_levels_rle_size, compressed_data->CopySlice(0, compressed_data->size(), allocator_)); std::unique_ptr page_ptr = std::make_unique( compressed_data_copy, num_values, encoding_, Encoding::RLE, Encoding::RLE, - uncompressed_size, std::move(page_stats), first_row_index); + uncompressed_size, std::move(page_stats), first_row_index, + std::move(page_size_stats)); total_compressed_bytes_ += page_ptr->size() + sizeof(format::PageHeader); data_pages_.push_back(std::move(page_ptr)); } else { // Eagerly write pages DataPageV1 page(compressed_data, num_values, encoding_, Encoding::RLE, Encoding::RLE, - uncompressed_size, std::move(page_stats), first_row_index); + uncompressed_size, std::move(page_stats), first_row_index, + std::move(page_size_stats)); WriteDataPage(page); } } @@ -1039,7 +1049,9 @@ void ColumnWriterImpl::BuildDataPageV2(int64_t definition_levels_rle_size, ConcatenateBuffers(definition_levels_rle_size, repetition_levels_rle_size, compressed_values, combined->mutable_data()); - EncodedStatistics page_stats = GetPageStatistics(); + EncodedStatistics page_stats; + std::shared_ptr page_size_stats; + std::tie(page_stats, page_size_stats) = GetPageStatistics(); page_stats.ApplyStatSizeLimits(properties_->max_statistics_size(descr_->path())); page_stats.set_is_signed(SortOrder::SIGNED == descr_->sort_order()); ResetPageStatistics(); @@ -1063,13 +1075,14 @@ void ColumnWriterImpl::BuildDataPageV2(int64_t definition_levels_rle_size, std::unique_ptr page_ptr = std::make_unique( combined, num_values, null_count, num_rows, encoding_, def_levels_byte_length, rep_levels_byte_length, uncompressed_size, pager_->has_compressor(), page_stats, - first_row_index); + first_row_index, std::move(page_size_stats)); total_compressed_bytes_ += page_ptr->size() + sizeof(format::PageHeader); data_pages_.push_back(std::move(page_ptr)); } else { DataPageV2 page(combined, num_values, null_count, num_rows, encoding_, def_levels_byte_length, rep_levels_byte_length, uncompressed_size, - pager_->has_compressor(), page_stats, first_row_index); + pager_->has_compressor(), page_stats, first_row_index, + std::move(page_size_stats)); WriteDataPage(page); } } @@ -1083,7 +1096,9 @@ int64_t ColumnWriterImpl::Close() { FlushBufferedDataPages(); - EncodedStatistics chunk_statistics = GetChunkStatistics(); + EncodedStatistics chunk_statistics; + std::shared_ptr chunk_size_stats; + std::tie(chunk_statistics, chunk_size_stats) = GetChunkStatistics(); chunk_statistics.ApplyStatSizeLimits( properties_->max_statistics_size(descr_->path())); chunk_statistics.set_is_signed(SortOrder::SIGNED == descr_->sort_order()); @@ -1092,6 +1107,9 @@ int64_t ColumnWriterImpl::Close() { if (rows_written_ > 0 && chunk_statistics.is_set()) { metadata_->SetStatistics(chunk_statistics); } + if (rows_written_ > 0 && chunk_size_stats) { + metadata_->SetSizeStatistics(*chunk_size_stats); + } metadata_->SetKeyValueMetadata(key_value_metadata_); pager_->Close(has_dictionary_, fallback_); } @@ -1220,6 +1238,11 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< pages_change_on_record_boundaries_ = properties->data_page_version() == ParquetDataPageVersion::V2 || properties->page_index_enabled(descr_->path()); + + if (properties->size_statistics_level() != SizeStatisticsLevel::NONE) { + page_size_stats_builder_ = SizeStatisticsBuilder::Make(descr_); + chunk_size_stats_ = page_size_stats_builder_->Build(); + } } int64_t Close() override { return ColumnWriterImpl::Close(); } @@ -1351,15 +1374,19 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< total_bytes_written_ += pager_->WriteDictionaryPage(page); } - EncodedStatistics GetPageStatistics() override { - EncodedStatistics result; - if (page_statistics_) result = page_statistics_->Encode(); + StatisticsPair GetPageStatistics() override { + StatisticsPair result; + if (page_statistics_) result.first = page_statistics_->Encode(); + if (properties_->size_statistics_level() == SizeStatisticsLevel::PAGE) { + result.second = page_size_stats_builder_->Build(); + } return result; } - EncodedStatistics GetChunkStatistics() override { - EncodedStatistics result; - if (chunk_statistics_) result = chunk_statistics_->Encode(); + StatisticsPair GetChunkStatistics() override { + StatisticsPair result; + if (chunk_statistics_) result.first = chunk_statistics_->Encode(); + if (chunk_size_stats_) result.second = chunk_size_stats_; return result; } @@ -1368,6 +1395,11 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< chunk_statistics_->Merge(*page_statistics_); page_statistics_->Reset(); } + if (page_size_stats_builder_ != nullptr) { + auto page_size_stats = page_size_stats_builder_->Build(); + chunk_size_stats_->Merge(*page_size_stats); + page_size_stats_builder_->Reset(); + } } Type::type type() const override { return descr_->physical_type(); } @@ -1429,6 +1461,10 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< // which case we call back to the dense write path) std::shared_ptr<::arrow::Array> preserved_dictionary_; + // Utility to collect and store SizeStatistics of page and chunk. + std::unique_ptr page_size_stats_builder_; + std::shared_ptr chunk_size_stats_; + int64_t WriteLevels(int64_t num_values, const int16_t* def_levels, const int16_t* rep_levels) { int64_t values_to_write = 0; @@ -1463,6 +1499,8 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< rows_written_ += num_values; num_buffered_rows_ += num_values; } + + CollectLevelHistogram(num_values, def_levels, rep_levels); return values_to_write; } @@ -1554,6 +1592,27 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< rows_written_ += num_levels; num_buffered_rows_ += num_levels; } + + CollectLevelHistogram(num_levels, def_levels, rep_levels); + } + + void CollectLevelHistogram(int64_t num_levels, const int16_t* def_levels, + const int16_t* rep_levels) { + if (page_size_stats_builder_ == nullptr) { + return; + } + + if (descr_->max_definition_level() > 0) { + page_size_stats_builder_->WriteDefinitionLevels(num_levels, def_levels); + } else { + page_size_stats_builder_->WriteDefinitionLevel(num_levels, /*def_level=*/0); + } + + if (descr_->max_repetition_level() > 0) { + page_size_stats_builder_->WriteRepetitionLevels(num_levels, rep_levels); + } else { + page_size_stats_builder_->WriteRepetitionLevel(num_levels, /*rep_level=*/0); + } } void CommitWriteAndCheckPageLimit(int64_t num_levels, int64_t num_values, @@ -1607,6 +1666,11 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< if (page_statistics_ != nullptr) { page_statistics_->Update(values, num_values, num_nulls); } + if constexpr (std::is_same_v) { + if (page_size_stats_builder_ != nullptr) { + page_size_stats_builder_->WriteValues(values, num_values); + } + } } /// \brief Write values with spaces and update page statistics accordingly. @@ -1635,6 +1699,12 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< page_statistics_->UpdateSpaced(values, valid_bits, valid_bits_offset, num_spaced_values, num_values, num_nulls); } + if constexpr (std::is_same_v) { + if (page_size_stats_builder_ != nullptr) { + page_size_stats_builder_->WriteValuesSpaced(values, valid_bits, valid_bits_offset, + num_spaced_values); + } + } } }; @@ -1693,8 +1763,14 @@ Status TypedColumnWriterImpl::WriteArrowDictionary( exec_ctx.set_use_threads(false); std::shared_ptr<::arrow::Array> referenced_dictionary; - PARQUET_ASSIGN_OR_THROW(::arrow::Datum referenced_indices, - ::arrow::compute::Unique(*chunk_indices, &exec_ctx)); + ::arrow::Datum referenced_indices; + if (page_size_stats_builder_) { + // SizeStatistics need to compute total bytes, so we cannot extract unique values. + referenced_indices = *chunk_indices; + } else { + PARQUET_ASSIGN_OR_THROW(referenced_indices, + ::arrow::compute::Unique(*chunk_indices, &exec_ctx)); + } // On first run, we might be able to re-use the existing dictionary if (referenced_indices.length() == dictionary->length()) { @@ -1708,10 +1784,15 @@ Status TypedColumnWriterImpl::WriteArrowDictionary( referenced_dictionary = referenced_dictionary_datum.make_array(); } - int64_t non_null_count = chunk_indices->length() - chunk_indices->null_count(); - page_statistics_->IncrementNullCount(num_chunk_levels - non_null_count); - page_statistics_->IncrementNumValues(non_null_count); - page_statistics_->Update(*referenced_dictionary, /*update_counts=*/false); + if (page_statistics_) { + int64_t non_null_count = chunk_indices->length() - chunk_indices->null_count(); + page_statistics_->IncrementNullCount(num_chunk_levels - non_null_count); + page_statistics_->IncrementNumValues(non_null_count); + page_statistics_->Update(*referenced_dictionary, /*update_counts=*/false); + } + if (page_size_stats_builder_) { + page_size_stats_builder_->WriteValues(*referenced_dictionary); + } }; int64_t value_offset = 0; @@ -1728,7 +1809,7 @@ Status TypedColumnWriterImpl::WriteArrowDictionary( AddIfNotNull(rep_levels, offset)); std::shared_ptr writeable_indices = indices->Slice(value_offset, batch_num_spaced_values); - if (page_statistics_) { + if (page_statistics_ || page_size_stats_builder_) { update_stats(/*num_chunk_levels=*/batch_size, writeable_indices); } PARQUET_ASSIGN_OR_THROW( @@ -2215,6 +2296,9 @@ Status TypedColumnWriterImpl::WriteArrowDense( page_statistics_->IncrementNullCount(batch_size - non_null); page_statistics_->IncrementNumValues(non_null); } + if (page_size_stats_builder_ != nullptr) { + page_size_stats_builder_->WriteValues(*data_slice); + } CommitWriteAndCheckPageLimit(batch_size, batch_num_values, batch_size - non_null, check_page); CheckDictionarySizeLimit(); diff --git a/cpp/src/parquet/metadata.cc b/cpp/src/parquet/metadata.cc index 8f577be45b96d..e4009e15ebb40 100644 --- a/cpp/src/parquet/metadata.cc +++ b/cpp/src/parquet/metadata.cc @@ -37,6 +37,7 @@ #include "parquet/exception.h" #include "parquet/schema.h" #include "parquet/schema_internal.h" +#include "parquet/size_statistics.h" #include "parquet/thrift_internal.h" namespace parquet { @@ -308,6 +309,13 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl { return is_stats_set() ? possible_stats_ : nullptr; } + inline std::unique_ptr size_statistics() const { + if (!column_metadata_->__isset.size_statistics) { + return nullptr; + } + return SizeStatistics::Make(&column_metadata_->size_statistics, descr_); + } + inline Compression::type compression() const { return LoadEnumSafe(&column_metadata_->codec); } @@ -439,6 +447,10 @@ std::shared_ptr ColumnChunkMetaData::statistics() const { bool ColumnChunkMetaData::is_stats_set() const { return impl_->is_stats_set(); } +std::unique_ptr ColumnChunkMetaData::size_statistics() const { + return impl_->size_statistics(); +} + std::optional ColumnChunkMetaData::bloom_filter_offset() const { return impl_->bloom_filter_offset(); } @@ -1543,6 +1555,10 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl { column_chunk_->meta_data.__set_statistics(ToThrift(val)); } + void SetSizeStatistics(const SizeStatistics& size_stats) { + column_chunk_->meta_data.__set_size_statistics(ToThrift(size_stats)); + } + void Finish(int64_t num_values, int64_t dictionary_page_offset, int64_t index_page_offset, int64_t data_page_offset, int64_t compressed_size, int64_t uncompressed_size, bool has_dictionary, @@ -1752,6 +1768,10 @@ void ColumnChunkMetaDataBuilder::SetStatistics(const EncodedStatistics& result) impl_->SetStatistics(result); } +void ColumnChunkMetaDataBuilder::SetSizeStatistics(const SizeStatistics& size_stats) { + impl_->SetSizeStatistics(size_stats); +} + void ColumnChunkMetaDataBuilder::SetKeyValueMetadata( std::shared_ptr key_value_metadata) { impl_->SetKeyValueMetadata(std::move(key_value_metadata)); diff --git a/cpp/src/parquet/metadata.h b/cpp/src/parquet/metadata.h index dc97d816daa74..e330440786599 100644 --- a/cpp/src/parquet/metadata.h +++ b/cpp/src/parquet/metadata.h @@ -36,6 +36,7 @@ namespace parquet { class ColumnDescriptor; class EncodedStatistics; class FileCryptoMetaData; +class SizeStatistics; class Statistics; class SchemaDescriptor; @@ -156,6 +157,7 @@ class PARQUET_EXPORT ColumnChunkMetaData { std::shared_ptr path_in_schema() const; bool is_stats_set() const; std::shared_ptr statistics() const; + std::unique_ptr size_statistics() const; Compression::type compression() const; // Indicate if the ColumnChunk compression is supported by the current @@ -451,6 +453,7 @@ class PARQUET_EXPORT ColumnChunkMetaDataBuilder { // column metadata void SetStatistics(const EncodedStatistics& stats); + void SetSizeStatistics(const SizeStatistics& size_stats); void SetKeyValueMetadata(std::shared_ptr key_value_metadata); diff --git a/cpp/src/parquet/page_index.cc b/cpp/src/parquet/page_index.cc index afda4c6064b36..92e0afb0dbbfb 100644 --- a/cpp/src/parquet/page_index.cc +++ b/cpp/src/parquet/page_index.cc @@ -159,6 +159,22 @@ class TypedColumnIndexImpl : public TypedColumnIndex { const std::vector& max_values() const override { return max_values_; } + bool has_repetition_level_histograms() const override { + return column_index_.__isset.repetition_level_histograms; + } + + bool has_definition_level_histograms() const override { + return column_index_.__isset.definition_level_histograms; + } + + const std::vector& repetition_level_histograms() const override { + return column_index_.repetition_level_histograms; + } + + const std::vector& definition_level_histograms() const override { + return column_index_.definition_level_histograms; + } + private: /// Wrapped thrift column index. const format::ColumnIndex column_index_; @@ -178,14 +194,22 @@ class OffsetIndexImpl : public OffsetIndex { page_location.compressed_page_size, page_location.first_row_index}); } + if (offset_index.__isset.unencoded_byte_array_data_bytes) { + unencoded_byte_array_data_bytes_ = offset_index.unencoded_byte_array_data_bytes; + } } const std::vector& page_locations() const override { return page_locations_; } + const std::vector& unencoded_byte_array_data_bytes() const override { + return unencoded_byte_array_data_bytes_; + } + private: std::vector page_locations_; + std::vector unencoded_byte_array_data_bytes_; }; class RowGroupPageIndexReaderImpl : public RowGroupPageIndexReader { @@ -460,7 +484,8 @@ class ColumnIndexBuilderImpl final : public ColumnIndexBuilder { column_index_.boundary_order = format::BoundaryOrder::UNORDERED; } - void AddPage(const EncodedStatistics& stats) override { + void AddPage(const EncodedStatistics& stats, + const SizeStatistics* size_stats) override { if (state_ == BuilderState::kFinished) { throw ParquetException("Cannot add page to finished ColumnIndexBuilder."); } else if (state_ == BuilderState::kDiscarded) { @@ -493,6 +518,17 @@ class ColumnIndexBuilderImpl final : public ColumnIndexBuilder { column_index_.__isset.null_counts = false; column_index_.null_counts.clear(); } + + if (size_stats) { + const auto& page_ref_level_hist = size_stats->repetition_level_histogram(); + const auto& page_def_level_hist = size_stats->definition_level_histogram(); + column_index_.repetition_level_histograms.insert( + column_index_.repetition_level_histograms.end(), page_ref_level_hist.cbegin(), + page_ref_level_hist.cend()); + column_index_.definition_level_histograms.insert( + column_index_.definition_level_histograms.end(), page_def_level_hist.cbegin(), + page_def_level_hist.cend()); + } } void Finish() override { @@ -533,6 +569,21 @@ class ColumnIndexBuilderImpl final : public ColumnIndexBuilder { /// Decide the boundary order from decoded min/max values. auto boundary_order = DetermineBoundaryOrder(min_values, max_values); column_index_.__set_boundary_order(ToThrift(boundary_order)); + + /// Finalize level histogram. + const int64_t num_pages = column_index_.null_pages.size(); + const int64_t rep_level_hist_size = column_index_.repetition_level_histograms.size(); + const int64_t def_level_hist_size = column_index_.definition_level_histograms.size(); + if (rep_level_hist_size == (descr_->max_repetition_level() + 1) * num_pages) { + column_index_.__isset.repetition_level_histograms = true; + } else { + column_index_.repetition_level_histograms.clear(); + } + if (def_level_hist_size == (descr_->max_definition_level() + 1) * num_pages) { + column_index_.__isset.definition_level_histograms = true; + } else { + column_index_.definition_level_histograms.clear(); + } } void WriteTo(::arrow::io::OutputStream* sink, Encryptor* encryptor) const override { @@ -604,8 +655,8 @@ class OffsetIndexBuilderImpl final : public OffsetIndexBuilder { public: OffsetIndexBuilderImpl() = default; - void AddPage(int64_t offset, int32_t compressed_page_size, - int64_t first_row_index) override { + void AddPage(int64_t offset, int32_t compressed_page_size, int64_t first_row_index, + std::optional unencoded_byte_array_length) override { if (state_ == BuilderState::kFinished) { throw ParquetException("Cannot add page to finished OffsetIndexBuilder."); } else if (state_ == BuilderState::kDiscarded) { @@ -620,6 +671,10 @@ class OffsetIndexBuilderImpl final : public OffsetIndexBuilder { page_location.__set_compressed_page_size(compressed_page_size); page_location.__set_first_row_index(first_row_index); offset_index_.page_locations.emplace_back(std::move(page_location)); + if (unencoded_byte_array_length.has_value()) { + offset_index_.unencoded_byte_array_data_bytes.emplace_back( + unencoded_byte_array_length.value()); + } } void Finish(int64_t final_position) override { @@ -636,6 +691,16 @@ class OffsetIndexBuilderImpl final : public OffsetIndexBuilder { page_location.__set_offset(page_location.offset + final_position); } } + + /// Finalize unencoded_byte_array_data_bytes and make sure page sizes match. + if (offset_index_.page_locations.size() == + offset_index_.unencoded_byte_array_data_bytes.size()) { + offset_index_.__isset.unencoded_byte_array_data_bytes = true; + } else { + /// Discard unencoded_byte_array_data_bytes if its size is abnormal. + offset_index_.unencoded_byte_array_data_bytes.clear(); + } + state_ = BuilderState::kFinished; break; } @@ -813,6 +878,14 @@ class PageIndexBuilderImpl final : public PageIndexBuilder { } // namespace +void OffsetIndexBuilder::AddPage(const PageLocation& page_location, + const SizeStatistics* size_stats) { + this->AddPage( + page_location.offset, page_location.compressed_page_size, + page_location.first_row_index, + size_stats ? size_stats->unencoded_byte_array_data_bytes() : std::nullopt); +} + RowGroupIndexReadRange PageIndexReader::DeterminePageIndexRangesInRowGroup( const RowGroupMetaData& row_group_metadata, const std::vector& columns) { int64_t ci_start = std::numeric_limits::max(); diff --git a/cpp/src/parquet/page_index.h b/cpp/src/parquet/page_index.h index d45c59cab223f..b8291160d5c64 100644 --- a/cpp/src/parquet/page_index.h +++ b/cpp/src/parquet/page_index.h @@ -27,6 +27,7 @@ namespace parquet { class EncodedStatistics; +class SizeStatistics; struct PageIndexLocation; /// \brief ColumnIndex is a proxy around format::ColumnIndex. @@ -76,6 +77,18 @@ class PARQUET_EXPORT ColumnIndex { /// \brief A vector of page indices for non-null pages. virtual const std::vector& non_null_page_indices() const = 0; + + /// \brief Whether repetition level histogram is available. + virtual bool has_repetition_level_histograms() const = 0; + + /// \brief Whether definition level histogram is available. + virtual bool has_definition_level_histograms() const = 0; + + /// \brief List of repetition level histograms for each page concatenated together. + virtual const std::vector& repetition_level_histograms() const = 0; + + /// \brief List of definition level histograms for each page concatenated together. + virtual const std::vector& definition_level_histograms() const = 0; }; /// \brief Typed implementation of ColumnIndex. @@ -129,6 +142,10 @@ class PARQUET_EXPORT OffsetIndex { /// \brief A vector of locations for each data page in this column. virtual const std::vector& page_locations() const = 0; + + /// \brief A vector of unencoded/uncompressed size of each page for BYTE_ARRAY types, + /// or empty for other types. + virtual const std::vector& unencoded_byte_array_data_bytes() const = 0; }; /// \brief Interface for reading the page index for a Parquet row group. @@ -266,7 +283,9 @@ class PARQUET_EXPORT ColumnIndexBuilder { /// not update statistics anymore. /// /// \param stats Page statistics in the encoded form. - virtual void AddPage(const EncodedStatistics& stats) = 0; + /// \param size_stats Size statistics of the page if available. + virtual void AddPage(const EncodedStatistics& stats, + const SizeStatistics* size_stats = NULLPTR) = 0; /// \brief Complete the column index. /// @@ -299,15 +318,14 @@ class PARQUET_EXPORT OffsetIndexBuilder { virtual ~OffsetIndexBuilder() = default; - /// \brief Add page location of a data page. - virtual void AddPage(int64_t offset, int32_t compressed_page_size, - int64_t first_row_index) = 0; + /// \brief Add page location and size stats of a data page. + virtual void AddPage( + int64_t offset, int32_t compressed_page_size, int64_t first_row_index, + std::optional unencoded_byte_array_length = std::nullopt) = 0; - /// \brief Add page location of a data page. - void AddPage(const PageLocation& page_location) { - AddPage(page_location.offset, page_location.compressed_page_size, - page_location.first_row_index); - } + /// \brief Add page location and size stats of a data page. + void AddPage(const PageLocation& page_location, + const SizeStatistics* size_stats = NULLPTR); /// \brief Complete the offset index. /// diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index 7f2e371df66d7..5c96a6e1b2964 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -47,6 +47,13 @@ namespace parquet { /// DataPageV2 at all. enum class ParquetDataPageVersion { V1, V2 }; +/// Controls the level of size statistics that are written to the file. +enum class SizeStatisticsLevel : uint8_t { + NONE = 0, // No size statistics are written. + CHUNK, // Only column chunk size statistics are written. + PAGE // Both size statistics in the column chunk and page index are written. +}; + /// Align the default buffer size to a small multiple of a page size. constexpr int64_t kDefaultBufferSize = 4096 * 4; @@ -237,7 +244,8 @@ class PARQUET_EXPORT WriterProperties { data_page_version_(ParquetDataPageVersion::V1), created_by_(DEFAULT_CREATED_BY), store_decimal_as_integer_(false), - page_checksum_enabled_(false) {} + page_checksum_enabled_(false), + size_statistics_level_(SizeStatisticsLevel::NONE) {} explicit Builder(const WriterProperties& properties) : pool_(properties.memory_pool()), @@ -639,6 +647,16 @@ class PARQUET_EXPORT WriterProperties { return this->disable_write_page_index(path->ToDotString()); } + /// \brief Set the level to write size statistics for all columns. Default is NONE. + /// + /// \param level The level to write size statistics. Note that if page index is not + /// enabled, page level size statistcis will not be written even if the level + /// is set to PAGE. + Builder* set_size_statistics_level(SizeStatisticsLevel level) { + size_statistics_level_ = level; + return this; + } + /// \brief Build the WriterProperties with the builder parameters. /// \return The WriterProperties defined by the builder. std::shared_ptr build() { @@ -665,9 +683,9 @@ class PARQUET_EXPORT WriterProperties { return std::shared_ptr(new WriterProperties( pool_, dictionary_pagesize_limit_, write_batch_size_, max_row_group_length_, pagesize_, version_, created_by_, page_checksum_enabled_, - std::move(file_encryption_properties_), default_column_properties_, - column_properties, data_page_version_, store_decimal_as_integer_, - std::move(sorting_columns_))); + size_statistics_level_, std::move(file_encryption_properties_), + default_column_properties_, column_properties, data_page_version_, + store_decimal_as_integer_, std::move(sorting_columns_))); } private: @@ -681,6 +699,7 @@ class PARQUET_EXPORT WriterProperties { std::string created_by_; bool store_decimal_as_integer_; bool page_checksum_enabled_; + SizeStatisticsLevel size_statistics_level_; std::shared_ptr file_encryption_properties_; @@ -719,6 +738,10 @@ class PARQUET_EXPORT WriterProperties { inline bool page_checksum_enabled() const { return page_checksum_enabled_; } + inline SizeStatisticsLevel size_statistics_level() const { + return size_statistics_level_; + } + inline Encoding::type dictionary_index_encoding() const { if (parquet_version_ == ParquetVersion::PARQUET_1_0) { return Encoding::PLAIN_DICTIONARY; @@ -812,6 +835,7 @@ class PARQUET_EXPORT WriterProperties { MemoryPool* pool, int64_t dictionary_pagesize_limit, int64_t write_batch_size, int64_t max_row_group_length, int64_t pagesize, ParquetVersion::type version, const std::string& created_by, bool page_write_checksum_enabled, + SizeStatisticsLevel size_statistics_level, std::shared_ptr file_encryption_properties, const ColumnProperties& default_column_properties, const std::unordered_map& column_properties, @@ -827,6 +851,7 @@ class PARQUET_EXPORT WriterProperties { parquet_created_by_(created_by), store_decimal_as_integer_(store_short_decimal_as_integer), page_checksum_enabled_(page_write_checksum_enabled), + size_statistics_level_(size_statistics_level), file_encryption_properties_(file_encryption_properties), sorting_columns_(std::move(sorting_columns)), default_column_properties_(default_column_properties), @@ -842,6 +867,7 @@ class PARQUET_EXPORT WriterProperties { std::string parquet_created_by_; bool store_decimal_as_integer_; bool page_checksum_enabled_; + SizeStatisticsLevel size_statistics_level_; std::shared_ptr file_encryption_properties_; diff --git a/cpp/src/parquet/size_statistics.cc b/cpp/src/parquet/size_statistics.cc new file mode 100644 index 0000000000000..86a0cb66902cc --- /dev/null +++ b/cpp/src/parquet/size_statistics.cc @@ -0,0 +1,266 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliancec +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/size_statistics.h" + +#include + +#include "arrow/type_traits.h" +#include "arrow/util/bit_run_reader.h" +#include "arrow/util/int_util_overflow.h" +#include "arrow/visit_data_inline.h" +#include "parquet/exception.h" +#include "parquet/schema.h" +#include "parquet/thrift_internal.h" +#include "parquet/types.h" + +namespace parquet { + +class SizeStatistics::SizeStatisticsImpl { + public: + SizeStatisticsImpl() = default; + + SizeStatisticsImpl(const format::SizeStatistics* size_stats, + const ColumnDescriptor* descr) + : rep_level_histogram_(size_stats->repetition_level_histogram), + def_level_histogram_(size_stats->definition_level_histogram) { + if (descr->physical_type() == Type::BYTE_ARRAY && + size_stats->__isset.unencoded_byte_array_data_bytes) { + unencoded_byte_array_data_bytes_ = size_stats->unencoded_byte_array_data_bytes; + } + } + + const std::vector& repetition_level_histogram() const { + return rep_level_histogram_; + } + + const std::vector& definition_level_histogram() const { + return def_level_histogram_; + } + + std::optional unencoded_byte_array_data_bytes() const { + return unencoded_byte_array_data_bytes_; + } + + void Merge(const SizeStatistics& other) { + if (rep_level_histogram_.size() != other.repetition_level_histogram().size() || + def_level_histogram_.size() != other.definition_level_histogram().size() || + unencoded_byte_array_data_bytes_.has_value() != + other.unencoded_byte_array_data_bytes().has_value()) { + throw ParquetException("Cannot merge incompatible SizeStatistics"); + } + + std::transform(rep_level_histogram_.begin(), rep_level_histogram_.end(), + other.repetition_level_histogram().begin(), + rep_level_histogram_.begin(), std::plus<>()); + + std::transform(def_level_histogram_.begin(), def_level_histogram_.end(), + other.definition_level_histogram().begin(), + def_level_histogram_.begin(), std::plus<>()); + if (unencoded_byte_array_data_bytes_.has_value()) { + unencoded_byte_array_data_bytes_ = unencoded_byte_array_data_bytes_.value() + + other.unencoded_byte_array_data_bytes().value(); + } + } + + private: + friend class SizeStatisticsBuilder; + std::vector rep_level_histogram_; + std::vector def_level_histogram_; + std::optional unencoded_byte_array_data_bytes_; +}; + +const std::vector& SizeStatistics::repetition_level_histogram() const { + return impl_->repetition_level_histogram(); +} + +const std::vector& SizeStatistics::definition_level_histogram() const { + return impl_->definition_level_histogram(); +} + +std::optional SizeStatistics::unencoded_byte_array_data_bytes() const { + return impl_->unencoded_byte_array_data_bytes(); +} + +void SizeStatistics::Merge(const SizeStatistics& other) { return impl_->Merge(other); } + +SizeStatistics::SizeStatistics(const void* size_statistics, const ColumnDescriptor* descr) + : impl_(std::make_unique( + reinterpret_cast(size_statistics), descr)) {} + +SizeStatistics::SizeStatistics() : impl_(std::make_unique()) {} + +SizeStatistics::~SizeStatistics() = default; + +std::unique_ptr SizeStatistics::Make(const void* size_statistics, + const ColumnDescriptor* descr) { + return std::unique_ptr(new SizeStatistics(size_statistics, descr)); +} + +class SizeStatisticsBuilder::SizeStatisticsBuilderImpl { + public: + SizeStatisticsBuilderImpl(const ColumnDescriptor* descr) + : rep_level_histogram_(descr->max_repetition_level() + 1, 0), + def_level_histogram_(descr->max_definition_level() + 1, 0) { + if (descr->physical_type() == Type::BYTE_ARRAY) { + unencoded_byte_array_data_bytes_ = 0; + } + } + + void WriteRepetitionLevels(int64_t num_levels, const int16_t* rep_levels) { + for (int64_t i = 0; i < num_levels; ++i) { + ARROW_DCHECK_LT(rep_levels[i], static_cast(rep_level_histogram_.size())); + rep_level_histogram_[rep_levels[i]]++; + } + } + + void WriteDefinitionLevels(int64_t num_levels, const int16_t* def_levels) { + for (int64_t i = 0; i < num_levels; ++i) { + ARROW_DCHECK_LT(def_levels[i], static_cast(def_level_histogram_.size())); + def_level_histogram_[def_levels[i]]++; + } + } + + void WriteRepetitionLevel(int64_t num_levels, int16_t rep_level) { + ARROW_DCHECK_LT(rep_level, static_cast(rep_level_histogram_.size())); + rep_level_histogram_[rep_level] += num_levels; + } + + void WriteDefinitionLevel(int64_t num_levels, int16_t def_level) { + ARROW_DCHECK_LT(def_level, static_cast(def_level_histogram_.size())); + def_level_histogram_[def_level] += num_levels; + } + + void WriteValuesSpaced(const ByteArray* values, const uint8_t* valid_bits, + int64_t valid_bits_offset, int64_t num_spaced_values) { + int64_t total_bytes = 0; + ::arrow::internal::VisitSetBitRunsVoid(valid_bits, valid_bits_offset, + num_spaced_values, + [&](int64_t pos, int64_t length) { + for (int64_t i = 0; i < length; i++) { + // Don't bother to check unlikely overflow. + total_bytes += values[i + pos].len; + } + }); + IncrementUnencodedByteArrayDataBytes(total_bytes); + } + + void WriteValues(const ByteArray* values, int64_t num_values) { + int64_t total_bytes = 0; + std::for_each(values, values + num_values, + [&](const ByteArray& value) { total_bytes += values->len; }); + IncrementUnencodedByteArrayDataBytes(total_bytes); + } + + void WriteValues(const ::arrow::Array& values) { + int64_t total_bytes = 0; + const auto valid_func = [&](ByteArray val) { total_bytes += val.len; }; + const auto null_func = [&]() {}; + + if (::arrow::is_binary_like(values.type_id())) { + ::arrow::VisitArraySpanInline<::arrow::BinaryType>( + *values.data(), std::move(valid_func), std::move(null_func)); + } else if (::arrow::is_large_binary_like(values.type_id())) { + ::arrow::VisitArraySpanInline<::arrow::LargeBinaryType>( + *values.data(), std::move(valid_func), std::move(null_func)); + } else { + throw ParquetException("Unsupported type: " + values.type()->ToString()); + } + + IncrementUnencodedByteArrayDataBytes(total_bytes); + } + + std::unique_ptr Build() { + auto stats = std::unique_ptr(new SizeStatistics()); + stats->impl_->rep_level_histogram_ = rep_level_histogram_; + stats->impl_->def_level_histogram_ = def_level_histogram_; + stats->impl_->unencoded_byte_array_data_bytes_ = unencoded_byte_array_data_bytes_; + return stats; + } + + void Reset() { + rep_level_histogram_.assign(rep_level_histogram_.size(), 0); + def_level_histogram_.assign(def_level_histogram_.size(), 0); + if (unencoded_byte_array_data_bytes_.has_value()) { + unencoded_byte_array_data_bytes_ = 0; + } + } + + private: + void IncrementUnencodedByteArrayDataBytes(int64_t total_bytes) { + ARROW_DCHECK(unencoded_byte_array_data_bytes_.has_value()); + if (::arrow::internal::AddWithOverflow( + total_bytes, unencoded_byte_array_data_bytes_.value(), &total_bytes)) { + throw ParquetException("unencoded byte array data bytes overflows to INT64_MAX"); + } + unencoded_byte_array_data_bytes_ = total_bytes; + } + + private: + std::vector rep_level_histogram_; + std::vector def_level_histogram_; + std::optional unencoded_byte_array_data_bytes_; +}; + +void SizeStatisticsBuilder::WriteRepetitionLevels(int64_t num_levels, + const int16_t* rep_levels) { + impl_->WriteRepetitionLevels(num_levels, rep_levels); +} + +void SizeStatisticsBuilder::WriteDefinitionLevels(int64_t num_levels, + const int16_t* def_levels) { + impl_->WriteDefinitionLevels(num_levels, def_levels); +} + +void SizeStatisticsBuilder::WriteRepetitionLevel(int64_t num_levels, int16_t rep_level) { + impl_->WriteRepetitionLevel(num_levels, rep_level); +} + +void SizeStatisticsBuilder::WriteDefinitionLevel(int64_t num_levels, int16_t def_level) { + impl_->WriteDefinitionLevel(num_levels, def_level); +} + +void SizeStatisticsBuilder::WriteValuesSpaced(const ByteArray* values, + const uint8_t* valid_bits, + int64_t valid_bits_offset, + int64_t num_spaced_values) { + impl_->WriteValuesSpaced(values, valid_bits, valid_bits_offset, num_spaced_values); +} + +void SizeStatisticsBuilder::WriteValues(const ByteArray* values, int64_t num_values) { + impl_->WriteValues(values, num_values); +} + +void SizeStatisticsBuilder::WriteValues(const ::arrow::Array& values) { + impl_->WriteValues(values); +} + +std::unique_ptr SizeStatisticsBuilder::Build() { return impl_->Build(); } + +void SizeStatisticsBuilder::Reset() { return impl_->Reset(); } + +SizeStatisticsBuilder::SizeStatisticsBuilder(const ColumnDescriptor* descr) + : impl_(std::make_unique(descr)) {} + +SizeStatisticsBuilder::~SizeStatisticsBuilder() = default; + +std::unique_ptr SizeStatisticsBuilder::Make( + const ColumnDescriptor* descr) { + return std::unique_ptr(new SizeStatisticsBuilder(descr)); +} + +} // namespace parquet diff --git a/cpp/src/parquet/size_statistics.h b/cpp/src/parquet/size_statistics.h new file mode 100644 index 0000000000000..8ba6010611a47 --- /dev/null +++ b/cpp/src/parquet/size_statistics.h @@ -0,0 +1,156 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "parquet/platform.h" + +namespace parquet { + +struct ByteArray; +class ColumnDescriptor; + +/// \brief SizeStatistics is a proxy around format::SizeStatistics. +/// +/// A structure for capturing metadata for estimating the unencoded, +/// uncompressed size of data written. This is useful for readers to estimate +/// how much memory is needed to reconstruct data in their memory model and for +/// fine-grained filter push down on nested structures (the histograms contained +/// in this structure can help determine the number of nulls at a particular +/// nesting level and maximum length of lists). +class PARQUET_EXPORT SizeStatistics { + public: + /// \brief API convenience to get a SizeStatistics accessor + static std::unique_ptr Make(const void* size_statistics, + const ColumnDescriptor* descr); + + ~SizeStatistics(); + + /// When present, there is expected to be one element corresponding to each + /// repetition (i.e. size=max repetition_level+1) where each element + /// represents the number of times the repetition level was observed in the + /// data. + /// + /// This field may be omitted if max_repetition_level is 0 without loss + /// of information. + /// + /// \returns repetition level histogram of all levels if not empty. + const std::vector& repetition_level_histogram() const; + + /// Same as repetition_level_histogram except for definition levels. + /// + /// This field may be omitted if max_definition_level is 0 or 1 without + /// loss of information. + /// + /// \returns definition level histogram of all levels if not empty. + const std::vector& definition_level_histogram() const; + + /// The number of physical bytes stored for BYTE_ARRAY data values assuming + /// no encoding. This is exclusive of the bytes needed to store the length of + /// each byte array. In other words, this field is equivalent to the `(size + /// of PLAIN-ENCODING the byte array values) - (4 bytes * number of values + /// written)`. To determine unencoded sizes of other types readers can use + /// schema information multiplied by the number of non-null and null values. + /// The number of null/non-null values can be inferred from the histograms + /// below. + /// + /// For example, if a column chunk is dictionary-encoded with dictionary + /// ["a", "bc", "cde"], and a data page contains the indices [0, 0, 1, 2], + /// then this value for that data page should be 7 (1 + 1 + 2 + 3). + /// + /// This field should only be set for types that use BYTE_ARRAY as their + /// physical type. + /// + /// \returns unencoded and uncompressed byte size of the BYTE_ARRAY column, + /// or std::nullopt for other types. + std::optional unencoded_byte_array_data_bytes() const; + + /// \brief Merge two SizeStatistics of the same column. + /// + /// It is used to merge size statistics from all pages of the same column chunk. + void Merge(const SizeStatistics& other); + + private: + friend class SizeStatisticsBuilder; + SizeStatistics(const void* size_statistics, const ColumnDescriptor* descr); + + // PIMPL Idiom + SizeStatistics(); + class SizeStatisticsImpl; + std::unique_ptr impl_; +}; + +/// \brief Builder to create a SizeStatistics. +class PARQUET_EXPORT SizeStatisticsBuilder { + public: + /// \brief API convenience to get a SizeStatisticsBuilder. + static std::unique_ptr Make(const ColumnDescriptor* descr); + + ~SizeStatisticsBuilder(); + + /// \brief Add repetition levels to the histogram. + /// \param num_levels number of repetition levels to add. + /// \param rep_levels repetition levels to add. + void WriteRepetitionLevels(int64_t num_levels, const int16_t* rep_levels); + + /// \brief Add definition levels to the histogram. + /// \param num_levels number of definition levels to add. + /// \param def_levels definition levels to add. + void WriteDefinitionLevels(int64_t num_levels, const int16_t* def_levels); + + /// \brief Add repeated repetition level to the histogram. + /// \param num_levels number of repetition levels to add. + /// \param rep_level repeated repetition level value. + void WriteRepetitionLevel(int64_t num_levels, int16_t rep_level); + + /// \brief Add repeated definition level to the histogram. + /// \param num_levels number of definition levels to add. + /// \param def_level repeated definition level value. + void WriteDefinitionLevel(int64_t num_levels, int16_t def_level); + + /// \brief Add spaced BYTE_ARRAY values. + /// \param[in] values pointer to values of BYTE_ARRAY type. + /// \param[in] valid_bits pointer to bitmap representing if values are non-null. + /// \param[in] valid_bits_offset offset into valid_bits where the slice of data begins. + /// \param[in] num_spaced_values length of values in values/valid_bits to inspect. + void WriteValuesSpaced(const ByteArray* values, const uint8_t* valid_bits, + int64_t valid_bits_offset, int64_t num_spaced_values); + + /// \brief Add dense BYTE_ARRAY values. + /// \param values pointer to values of BYTE_ARRAY type. + /// \param num_values length of values. + void WriteValues(const ByteArray* values, int64_t num_values); + + /// \brief Add BYTE_ARRAY values in the arrow array. + void WriteValues(const ::arrow::Array& values); + + /// \brief Build a SizeStatistics from collected data. + std::unique_ptr Build(); + + /// \brief Reset all collected data for reuse. + void Reset(); + + private: + // PIMPL Idiom + SizeStatisticsBuilder(const ColumnDescriptor* descr); + class SizeStatisticsBuilderImpl; + std::unique_ptr impl_; +}; + +} // namespace parquet diff --git a/cpp/src/parquet/size_statistics_test.cc b/cpp/src/parquet/size_statistics_test.cc new file mode 100644 index 0000000000000..56bbf7edda816 --- /dev/null +++ b/cpp/src/parquet/size_statistics_test.cc @@ -0,0 +1,105 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/size_statistics.h" + +#include +#include + +#include + +#include "arrow/io/file.h" +#include "arrow/util/float16.h" +#include "parquet/file_reader.h" +#include "parquet/metadata.h" +#include "parquet/schema.h" +#include "parquet/test_util.h" +#include "parquet/thrift_internal.h" + +namespace parquet { + +using namespace parquet::schema; + +TEST(SizeStatistics, WriteBatchLevels) { + std::vector expected_def_level_histogram = {256, 128, 64, 32, 16, 8, 4, 2, 2}; + std::vector expected_rep_level_histogram = {256, 128, 64, 32, 32}; + + const int16_t max_def_level = + static_cast(expected_def_level_histogram.size()) - 1; + const int16_t max_rep_level = + static_cast(expected_rep_level_histogram.size()) - 1; + auto descr = + std::make_unique(Int32("a"), max_def_level, max_rep_level); + auto builder = SizeStatisticsBuilder::Make(descr.get()); + + auto write_batch_levels = + [&](const std::vector& histogram, + const std::function& + write_levels_func) { + std::vector levels; + for (int16_t level = 0; level < static_cast(histogram.size()); level++) { + levels.insert(levels.end(), histogram[level], level); + } + + auto rng = std::default_random_engine{}; + std::shuffle(std::begin(levels), std::end(levels), rng); + + constexpr size_t kBatchSize = 64; + for (size_t i = 0; i < levels.size(); i += kBatchSize) { + auto batch_size = static_cast(std::min(kBatchSize, levels.size() - i)); + write_levels_func(builder.get(), batch_size, levels.data() + i); + } + }; + + write_batch_levels(expected_def_level_histogram, + &SizeStatisticsBuilder::WriteDefinitionLevels); + write_batch_levels(expected_rep_level_histogram, + &SizeStatisticsBuilder::WriteRepetitionLevels); + auto size_statistics = builder->Build(); + EXPECT_EQ(size_statistics->definition_level_histogram(), expected_def_level_histogram); + EXPECT_EQ(size_statistics->repetition_level_histogram(), expected_rep_level_histogram); +} + +TEST(SizeStatistics, WriteRepeatedLevels) { + constexpr int16_t kMaxDefLevel = 2; + constexpr int16_t kMaxRepLevel = 3; + auto descr = std::make_unique(Int32("a"), kMaxDefLevel, kMaxRepLevel); + auto builder = SizeStatisticsBuilder::Make(descr.get()); + + constexpr int64_t kNumRounds = 10; + for (int64_t round = 1; round <= kNumRounds; round++) { + for (int16_t def_level = 0; def_level <= kMaxDefLevel; def_level++) { + builder->WriteDefinitionLevel(/*num_levels=*/round + def_level, def_level); + } + for (int16_t rep_level = 0; rep_level <= kMaxRepLevel; rep_level++) { + builder->WriteRepetitionLevel(/*num_levels=*/round + rep_level * rep_level, + rep_level); + } + } + + auto size_statistics = builder->Build(); + EXPECT_EQ(size_statistics->definition_level_histogram(), + std::vector({55, 65, 75})); + EXPECT_EQ(size_statistics->repetition_level_histogram(), + std::vector({55, 65, 95, 145})); +} + +// TODO: Add tests for write binary variants. +// TODO: Add tests for merge two size statistics. +// TODO: Add tests for thrift serialization. + +} // namespace parquet diff --git a/cpp/src/parquet/thrift_internal.h b/cpp/src/parquet/thrift_internal.h index e7bfd434c81a8..5929eebcbc122 100644 --- a/cpp/src/parquet/thrift_internal.h +++ b/cpp/src/parquet/thrift_internal.h @@ -43,6 +43,7 @@ #include "parquet/exception.h" #include "parquet/platform.h" #include "parquet/properties.h" +#include "parquet/size_statistics.h" #include "parquet/statistics.h" #include "parquet/types.h" @@ -383,6 +384,19 @@ static inline format::EncryptionAlgorithm ToThrift(EncryptionAlgorithm encryptio return encryption_algorithm; } +static inline format::SizeStatistics ToThrift(const SizeStatistics& size_stats) { + format::SizeStatistics size_statistics; + size_statistics.__set_repetition_level_histogram( + size_stats.repetition_level_histogram()); + size_statistics.__set_definition_level_histogram( + size_stats.definition_level_histogram()); + if (size_stats.unencoded_byte_array_data_bytes().has_value()) { + size_statistics.__set_unencoded_byte_array_data_bytes( + size_stats.unencoded_byte_array_data_bytes().value()); + } + return size_statistics; +} + // ---------------------------------------------------------------------- // Thrift struct serialization / deserialization utilities diff --git a/cpp/submodules/parquet-testing b/cpp/submodules/parquet-testing index cb7a9674142c1..74278bc4a1122 160000 --- a/cpp/submodules/parquet-testing +++ b/cpp/submodules/parquet-testing @@ -1 +1 @@ -Subproject commit cb7a9674142c137367bf75a01b79c6e214a73199 +Subproject commit 74278bc4a1122d74945969e6dec405abd1533ec3