diff --git a/cpp/src/parquet/CMakeLists.txt b/cpp/src/parquet/CMakeLists.txt index 8be5a88c33c55..8db67ebf49695 100644 --- a/cpp/src/parquet/CMakeLists.txt +++ b/cpp/src/parquet/CMakeLists.txt @@ -183,6 +183,7 @@ set(PARQUET_SRCS printer.cc properties.cc schema.cc + size_statistics.cc statistics.cc stream_reader.cc stream_writer.cc diff --git a/cpp/src/parquet/column_page.h b/cpp/src/parquet/column_page.h index 905f805b8c9cc..9cc810b5c0c73 100644 --- a/cpp/src/parquet/column_page.h +++ b/cpp/src/parquet/column_page.h @@ -26,6 +26,7 @@ #include #include +#include "parquet/size_statistics.h" #include "parquet/statistics.h" #include "parquet/types.h" @@ -69,20 +70,24 @@ class DataPage : public Page { /// Currently it is only present from data pages created by ColumnWriter in order /// to collect page index. std::optional first_row_index() const { return first_row_index_; } + const std::shared_ptr& size_statistics() const { + return size_statistics_; + } virtual ~DataPage() = default; protected: DataPage(PageType::type type, const std::shared_ptr& buffer, int32_t num_values, Encoding::type encoding, int64_t uncompressed_size, - const EncodedStatistics& statistics = EncodedStatistics(), - std::optional first_row_index = std::nullopt) + const EncodedStatistics& statistics, std::optional first_row_index, + std::shared_ptr size_statistics) : Page(buffer, type), num_values_(num_values), encoding_(encoding), uncompressed_size_(uncompressed_size), statistics_(statistics), - first_row_index_(std::move(first_row_index)) {} + first_row_index_(std::move(first_row_index)), + size_statistics_(std::move(size_statistics)) {} int32_t num_values_; Encoding::type encoding_; @@ -90,6 +95,7 @@ class DataPage : public Page { EncodedStatistics statistics_; /// Row ordinal within the row group to the first row in the data page. std::optional first_row_index_; + std::shared_ptr size_statistics_; }; class DataPageV1 : public DataPage { @@ -98,9 +104,10 @@ class DataPageV1 : public DataPage { Encoding::type encoding, Encoding::type definition_level_encoding, Encoding::type repetition_level_encoding, int64_t uncompressed_size, const EncodedStatistics& statistics = EncodedStatistics(), - std::optional first_row_index = std::nullopt) + std::optional first_row_index = std::nullopt, + std::shared_ptr size_statistics = NULLPTR) : DataPage(PageType::DATA_PAGE, buffer, num_values, encoding, uncompressed_size, - statistics, std::move(first_row_index)), + statistics, std::move(first_row_index), std::move(size_statistics)), definition_level_encoding_(definition_level_encoding), repetition_level_encoding_(repetition_level_encoding) {} @@ -120,9 +127,10 @@ class DataPageV2 : public DataPage { int32_t definition_levels_byte_length, int32_t repetition_levels_byte_length, int64_t uncompressed_size, bool is_compressed = false, const EncodedStatistics& statistics = EncodedStatistics(), - std::optional first_row_index = std::nullopt) + std::optional first_row_index = std::nullopt, + std::shared_ptr size_statistics = NULLPTR) : DataPage(PageType::DATA_PAGE_V2, buffer, num_values, encoding, uncompressed_size, - statistics, std::move(first_row_index)), + statistics, std::move(first_row_index), std::move(size_statistics)), num_nulls_(num_nulls), num_rows_(num_rows), definition_levels_byte_length_(definition_levels_byte_length), diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc index eae8fc6125499..2fc230bab9728 100644 --- a/cpp/src/parquet/column_writer.cc +++ b/cpp/src/parquet/column_writer.cc @@ -54,6 +54,7 @@ #include "parquet/platform.h" #include "parquet/properties.h" #include "parquet/schema.h" +#include "parquet/size_statistics.h" #include "parquet/statistics.h" #include "parquet/thrift_internal.h" #include "parquet/types.h" @@ -434,10 +435,11 @@ class SerializedPageWriter : public PageWriter { const int64_t header_size = thrift_serializer_->Serialize(&page_header, sink_.get(), meta_encryptor_.get()); PARQUET_THROW_NOT_OK(sink_->Write(output_data_buffer, output_data_len)); + const auto& page_size_stats = page.size_statistics(); /// Collect page index if (column_index_builder_ != nullptr) { - column_index_builder_->AddPage(page.statistics()); + column_index_builder_->AddPage(page.statistics(), page_size_stats.get()); } if (offset_index_builder_ != nullptr) { const int64_t compressed_size = output_data_len + header_size; @@ -451,8 +453,10 @@ class SerializedPageWriter : public PageWriter { /// start_pos is a relative offset in the buffered mode. It should be /// adjusted via OffsetIndexBuilder::Finish() after BufferedPageWriter /// has flushed all data pages. - offset_index_builder_->AddPage(start_pos, static_cast(compressed_size), - *page.first_row_index()); + offset_index_builder_->AddPage( + start_pos, static_cast(compressed_size), *page.first_row_index(), + page_size_stats ? page_size_stats->unencoded_byte_array_data_bytes() + : std::nullopt); } total_uncompressed_size_ += uncompressed_size + header_size; @@ -789,11 +793,13 @@ class ColumnWriterImpl { // Serializes Dictionary Page if enabled virtual void WriteDictionaryPage() = 0; + using StatisticsPair = std::pair>; + // Plain-encoded statistics of the current page - virtual EncodedStatistics GetPageStatistics() = 0; + virtual StatisticsPair GetPageStatistics() = 0; // Plain-encoded statistics of the whole chunk - virtual EncodedStatistics GetChunkStatistics() = 0; + virtual StatisticsPair GetChunkStatistics() = 0; // Merges page statistics into chunk statistics, then resets the values virtual void ResetPageStatistics() = 0; @@ -994,7 +1000,9 @@ void ColumnWriterImpl::BuildDataPageV1(int64_t definition_levels_rle_size, ConcatenateBuffers(definition_levels_rle_size, repetition_levels_rle_size, values, uncompressed_data_->mutable_data()); - EncodedStatistics page_stats = GetPageStatistics(); + EncodedStatistics page_stats; + std::shared_ptr page_size_stats; + std::tie(page_stats, page_size_stats) = GetPageStatistics(); page_stats.ApplyStatSizeLimits(properties_->max_statistics_size(descr_->path())); page_stats.set_is_signed(SortOrder::SIGNED == descr_->sort_order()); ResetPageStatistics(); @@ -1018,13 +1026,14 @@ void ColumnWriterImpl::BuildDataPageV1(int64_t definition_levels_rle_size, compressed_data->CopySlice(0, compressed_data->size(), allocator_)); std::unique_ptr page_ptr = std::make_unique( compressed_data_copy, num_values, encoding_, Encoding::RLE, Encoding::RLE, - uncompressed_size, page_stats, first_row_index); + uncompressed_size, page_stats, first_row_index, std::move(page_size_stats)); total_compressed_bytes_ += page_ptr->size() + sizeof(format::PageHeader); data_pages_.push_back(std::move(page_ptr)); } else { // Eagerly write pages DataPageV1 page(compressed_data, num_values, encoding_, Encoding::RLE, Encoding::RLE, - uncompressed_size, page_stats, first_row_index); + uncompressed_size, page_stats, first_row_index, + std::move(page_size_stats)); WriteDataPage(page); } } @@ -1051,7 +1060,9 @@ void ColumnWriterImpl::BuildDataPageV2(int64_t definition_levels_rle_size, ConcatenateBuffers(definition_levels_rle_size, repetition_levels_rle_size, compressed_values, combined->mutable_data()); - EncodedStatistics page_stats = GetPageStatistics(); + EncodedStatistics page_stats; + std::shared_ptr page_size_stats; + std::tie(page_stats, page_size_stats) = GetPageStatistics(); page_stats.ApplyStatSizeLimits(properties_->max_statistics_size(descr_->path())); page_stats.set_is_signed(SortOrder::SIGNED == descr_->sort_order()); ResetPageStatistics(); @@ -1075,13 +1086,14 @@ void ColumnWriterImpl::BuildDataPageV2(int64_t definition_levels_rle_size, std::unique_ptr page_ptr = std::make_unique( combined, num_values, null_count, num_rows, encoding_, def_levels_byte_length, rep_levels_byte_length, uncompressed_size, pager_->has_compressor(), page_stats, - first_row_index); + first_row_index, std::move(page_size_stats)); total_compressed_bytes_ += page_ptr->size() + sizeof(format::PageHeader); data_pages_.push_back(std::move(page_ptr)); } else { DataPageV2 page(combined, num_values, null_count, num_rows, encoding_, def_levels_byte_length, rep_levels_byte_length, uncompressed_size, - pager_->has_compressor(), page_stats, first_row_index); + pager_->has_compressor(), page_stats, first_row_index, + std::move(page_size_stats)); WriteDataPage(page); } } @@ -1095,7 +1107,9 @@ int64_t ColumnWriterImpl::Close() { FlushBufferedDataPages(); - EncodedStatistics chunk_statistics = GetChunkStatistics(); + EncodedStatistics chunk_statistics; + std::shared_ptr chunk_size_stats; + std::tie(chunk_statistics, chunk_size_stats) = GetChunkStatistics(); chunk_statistics.ApplyStatSizeLimits( properties_->max_statistics_size(descr_->path())); chunk_statistics.set_is_signed(SortOrder::SIGNED == descr_->sort_order()); @@ -1104,6 +1118,9 @@ int64_t ColumnWriterImpl::Close() { if (rows_written_ > 0 && chunk_statistics.is_set()) { metadata_->SetStatistics(chunk_statistics); } + if (rows_written_ > 0 && chunk_size_stats) { + metadata_->SetSizeStatistics(*chunk_size_stats); + } pager_->Close(has_dictionary_, fallback_); } @@ -1235,6 +1252,11 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< pages_change_on_record_boundaries_ = properties->data_page_version() == ParquetDataPageVersion::V2 || properties->page_index_enabled(descr_->path()); + + if (/*properties->size_statistics_enabled(descr_->path())*/ true) { + page_size_stats_builder_ = SizeStatisticsBuilder::Make(descr_); + chunk_size_stats_ = page_size_stats_builder_->Build(); + } } int64_t Close() override { return ColumnWriterImpl::Close(); } @@ -1366,15 +1388,17 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< total_bytes_written_ += pager_->WriteDictionaryPage(page); } - EncodedStatistics GetPageStatistics() override { - EncodedStatistics result; - if (page_statistics_) result = page_statistics_->Encode(); + StatisticsPair GetPageStatistics() override { + StatisticsPair result; + if (page_statistics_) result.first = page_statistics_->Encode(); + if (page_size_stats_builder_) result.second = page_size_stats_builder_->Build(); return result; } - EncodedStatistics GetChunkStatistics() override { - EncodedStatistics result; - if (chunk_statistics_) result = chunk_statistics_->Encode(); + StatisticsPair GetChunkStatistics() override { + StatisticsPair result; + if (chunk_statistics_) result.first = chunk_statistics_->Encode(); + if (chunk_size_stats_) result.second = chunk_size_stats_; return result; } @@ -1383,6 +1407,11 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< chunk_statistics_->Merge(*page_statistics_); page_statistics_->Reset(); } + if (page_size_stats_builder_ != nullptr) { + auto page_size_stats = page_size_stats_builder_->Build(); + chunk_size_stats_->Merge(*page_size_stats); + page_size_stats_builder_->Reset(); + } } Type::type type() const override { return descr_->physical_type(); } @@ -1425,6 +1454,10 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< // which case we call back to the dense write path) std::shared_ptr<::arrow::Array> preserved_dictionary_; + // Utility to collect and store SizeStatistics of page and chunk. + std::unique_ptr page_size_stats_builder_; + std::shared_ptr chunk_size_stats_; + int64_t WriteLevels(int64_t num_values, const int16_t* def_levels, const int16_t* rep_levels) { int64_t values_to_write = 0; @@ -1459,6 +1492,8 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< rows_written_ += num_values; num_buffered_rows_ += num_values; } + + CollectLevelHistogram(num_values, def_levels, rep_levels); return values_to_write; } @@ -1550,6 +1585,27 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< rows_written_ += num_levels; num_buffered_rows_ += num_levels; } + + CollectLevelHistogram(num_levels, def_levels, rep_levels); + } + + void CollectLevelHistogram(int64_t num_levels, const int16_t* def_levels, + const int16_t* rep_levels) { + if (page_size_stats_builder_ == nullptr) { + return; + } + + if (descr_->max_definition_level() > 0) { + page_size_stats_builder_->WriteDefinitionLevels(num_levels, def_levels); + } else { + page_size_stats_builder_->WriteDefinitionLevel(num_levels, /*def_level=*/0); + } + + if (descr_->max_repetition_level() > 0) { + page_size_stats_builder_->WriteRepetitionLevels(num_levels, rep_levels); + } else { + page_size_stats_builder_->WriteRepetitionLevel(num_levels, /*rep_level=*/0); + } } void CommitWriteAndCheckPageLimit(int64_t num_levels, int64_t num_values, @@ -1603,6 +1659,11 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< if (page_statistics_ != nullptr) { page_statistics_->Update(values, num_values, num_nulls); } + if constexpr (std::is_same_v) { + if (page_size_stats_builder_ != nullptr) { + page_size_stats_builder_->WriteValues(values, num_values); + } + } } /// \brief Write values with spaces and update page statistics accordingly. @@ -1631,6 +1692,12 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< page_statistics_->UpdateSpaced(values, valid_bits, valid_bits_offset, num_spaced_values, num_values, num_nulls); } + if constexpr (std::is_same_v) { + if (page_size_stats_builder_ != nullptr) { + page_size_stats_builder_->WriteValuesSpaced(values, valid_bits, valid_bits_offset, + num_spaced_values); + } + } } }; @@ -1689,8 +1756,14 @@ Status TypedColumnWriterImpl::WriteArrowDictionary( exec_ctx.set_use_threads(false); std::shared_ptr<::arrow::Array> referenced_dictionary; - PARQUET_ASSIGN_OR_THROW(::arrow::Datum referenced_indices, - ::arrow::compute::Unique(*chunk_indices, &exec_ctx)); + ::arrow::Datum referenced_indices; + if (page_size_stats_builder_) { + // SizeStatistics need to compute total bytes, so we cannot extract unique values. + referenced_indices = *chunk_indices; + } else { + PARQUET_ASSIGN_OR_THROW(referenced_indices, + ::arrow::compute::Unique(*chunk_indices, &exec_ctx)); + } // On first run, we might be able to re-use the existing dictionary if (referenced_indices.length() == dictionary->length()) { @@ -1704,10 +1777,15 @@ Status TypedColumnWriterImpl::WriteArrowDictionary( referenced_dictionary = referenced_dictionary_datum.make_array(); } - int64_t non_null_count = chunk_indices->length() - chunk_indices->null_count(); - page_statistics_->IncrementNullCount(num_chunk_levels - non_null_count); - page_statistics_->IncrementNumValues(non_null_count); - page_statistics_->Update(*referenced_dictionary, /*update_counts=*/false); + if (page_statistics_) { + int64_t non_null_count = chunk_indices->length() - chunk_indices->null_count(); + page_statistics_->IncrementNullCount(num_chunk_levels - non_null_count); + page_statistics_->IncrementNumValues(non_null_count); + page_statistics_->Update(*referenced_dictionary, /*update_counts=*/false); + } + if (page_size_stats_builder_) { + page_size_stats_builder_->WriteValues(*referenced_dictionary); + } }; int64_t value_offset = 0; @@ -1724,7 +1802,7 @@ Status TypedColumnWriterImpl::WriteArrowDictionary( AddIfNotNull(rep_levels, offset)); std::shared_ptr writeable_indices = indices->Slice(value_offset, batch_num_spaced_values); - if (page_statistics_) { + if (page_statistics_ || page_size_stats_builder_) { update_stats(/*num_chunk_levels=*/batch_size, writeable_indices); } PARQUET_ASSIGN_OR_THROW( @@ -2211,6 +2289,9 @@ Status TypedColumnWriterImpl::WriteArrowDense( page_statistics_->IncrementNullCount(batch_size - non_null); page_statistics_->IncrementNumValues(non_null); } + if (page_size_stats_builder_ != nullptr) { + page_size_stats_builder_->WriteValues(*data_slice); + } CommitWriteAndCheckPageLimit(batch_size, batch_num_values, batch_size - non_null, check_page); CheckDictionarySizeLimit(); diff --git a/cpp/src/parquet/metadata.cc b/cpp/src/parquet/metadata.cc index 3f101b5ae3ac6..afe9fc93356f5 100644 --- a/cpp/src/parquet/metadata.cc +++ b/cpp/src/parquet/metadata.cc @@ -34,6 +34,7 @@ #include "parquet/exception.h" #include "parquet/schema.h" #include "parquet/schema_internal.h" +#include "parquet/size_statistics.h" #include "parquet/thrift_internal.h" namespace parquet { @@ -271,6 +272,13 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl { return is_stats_set() ? possible_stats_ : nullptr; } + inline std::unique_ptr size_statistics() const { + if (!column_metadata_->__isset.size_statistics) { + return nullptr; + } + return SizeStatistics::Make(&column_metadata_->size_statistics, descr_); + } + inline Compression::type compression() const { return LoadEnumSafe(&column_metadata_->codec); } @@ -402,6 +410,10 @@ std::shared_ptr ColumnChunkMetaData::statistics() const { bool ColumnChunkMetaData::is_stats_set() const { return impl_->is_stats_set(); } +std::unique_ptr ColumnChunkMetaData::size_statistics() const { + return impl_->size_statistics(); +} + std::optional ColumnChunkMetaData::bloom_filter_offset() const { return impl_->bloom_filter_offset(); } @@ -1455,6 +1467,10 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl { column_chunk_->meta_data.__set_statistics(ToThrift(val)); } + void SetSizeStatistics(const SizeStatistics& size_stats) { + column_chunk_->meta_data.__set_size_statistics(ToThrift(size_stats)); + } + void Finish(int64_t num_values, int64_t dictionary_page_offset, int64_t index_page_offset, int64_t data_page_offset, int64_t compressed_size, int64_t uncompressed_size, bool has_dictionary, @@ -1655,6 +1671,10 @@ void ColumnChunkMetaDataBuilder::SetStatistics(const EncodedStatistics& result) impl_->SetStatistics(result); } +void ColumnChunkMetaDataBuilder::SetSizeStatistics(const SizeStatistics& size_stats) { + impl_->SetSizeStatistics(size_stats); +} + int64_t ColumnChunkMetaDataBuilder::total_compressed_size() const { return impl_->total_compressed_size(); } diff --git a/cpp/src/parquet/metadata.h b/cpp/src/parquet/metadata.h index 640b898024346..fbbb5db23a43d 100644 --- a/cpp/src/parquet/metadata.h +++ b/cpp/src/parquet/metadata.h @@ -36,6 +36,7 @@ namespace parquet { class ColumnDescriptor; class EncodedStatistics; class FileCryptoMetaData; +class SizeStatistics; class Statistics; class SchemaDescriptor; @@ -159,6 +160,7 @@ class PARQUET_EXPORT ColumnChunkMetaData { std::shared_ptr path_in_schema() const; bool is_stats_set() const; std::shared_ptr statistics() const; + std::unique_ptr size_statistics() const; Compression::type compression() const; // Indicate if the ColumnChunk compression is supported by the current @@ -454,6 +456,7 @@ class PARQUET_EXPORT ColumnChunkMetaDataBuilder { void set_file_path(const std::string& path); // column metadata void SetStatistics(const EncodedStatistics& stats); + void SetSizeStatistics(const SizeStatistics& size_stats); // get the column descriptor const ColumnDescriptor* descr() const; diff --git a/cpp/src/parquet/page_index.cc b/cpp/src/parquet/page_index.cc index afda4c6064b36..f4ace9f9728fe 100644 --- a/cpp/src/parquet/page_index.cc +++ b/cpp/src/parquet/page_index.cc @@ -460,7 +460,8 @@ class ColumnIndexBuilderImpl final : public ColumnIndexBuilder { column_index_.boundary_order = format::BoundaryOrder::UNORDERED; } - void AddPage(const EncodedStatistics& stats) override { + void AddPage(const EncodedStatistics& stats, + const SizeStatistics* size_stats) override { if (state_ == BuilderState::kFinished) { throw ParquetException("Cannot add page to finished ColumnIndexBuilder."); } else if (state_ == BuilderState::kDiscarded) { @@ -493,6 +494,17 @@ class ColumnIndexBuilderImpl final : public ColumnIndexBuilder { column_index_.__isset.null_counts = false; column_index_.null_counts.clear(); } + + if (size_stats) { + const auto& page_ref_level_hist = size_stats->repetition_level_histogram(); + const auto& page_def_level_hist = size_stats->definition_level_histogram(); + column_index_.repetition_level_histograms.insert( + column_index_.repetition_level_histograms.end(), page_ref_level_hist.cbegin(), + page_ref_level_hist.cend()); + column_index_.definition_level_histograms.insert( + column_index_.definition_level_histograms.end(), page_def_level_hist.cbegin(), + page_def_level_hist.cend()); + } } void Finish() override { @@ -533,6 +545,21 @@ class ColumnIndexBuilderImpl final : public ColumnIndexBuilder { /// Decide the boundary order from decoded min/max values. auto boundary_order = DetermineBoundaryOrder(min_values, max_values); column_index_.__set_boundary_order(ToThrift(boundary_order)); + + /// Finalize level histogram. + const int64_t num_pages = column_index_.null_pages.size(); + const int64_t rep_level_hist_size = column_index_.repetition_level_histograms.size(); + const int64_t def_level_hist_size = column_index_.definition_level_histograms.size(); + if (rep_level_hist_size == (descr_->max_repetition_level() + 1) * num_pages) { + column_index_.__isset.repetition_level_histograms = true; + } else { + column_index_.repetition_level_histograms.clear(); + } + if (def_level_hist_size == (descr_->max_definition_level() + 1) * num_pages) { + column_index_.__isset.definition_level_histograms = true; + } else { + column_index_.definition_level_histograms.clear(); + } } void WriteTo(::arrow::io::OutputStream* sink, Encryptor* encryptor) const override { @@ -604,8 +631,8 @@ class OffsetIndexBuilderImpl final : public OffsetIndexBuilder { public: OffsetIndexBuilderImpl() = default; - void AddPage(int64_t offset, int32_t compressed_page_size, - int64_t first_row_index) override { + void AddPage(int64_t offset, int32_t compressed_page_size, int64_t first_row_index, + std::optional unencoded_byte_array_length) override { if (state_ == BuilderState::kFinished) { throw ParquetException("Cannot add page to finished OffsetIndexBuilder."); } else if (state_ == BuilderState::kDiscarded) { @@ -620,6 +647,10 @@ class OffsetIndexBuilderImpl final : public OffsetIndexBuilder { page_location.__set_compressed_page_size(compressed_page_size); page_location.__set_first_row_index(first_row_index); offset_index_.page_locations.emplace_back(std::move(page_location)); + if (unencoded_byte_array_length.has_value()) { + offset_index_.unencoded_byte_array_data_bytes.emplace_back( + unencoded_byte_array_length.value()); + } } void Finish(int64_t final_position) override { @@ -636,6 +667,16 @@ class OffsetIndexBuilderImpl final : public OffsetIndexBuilder { page_location.__set_offset(page_location.offset + final_position); } } + + /// Finalize unencoded_byte_array_data_bytes and make sure page sizes match. + if (offset_index_.page_locations.size() == + offset_index_.unencoded_byte_array_data_bytes.size()) { + offset_index_.__isset.unencoded_byte_array_data_bytes = true; + } else { + /// Discard unencoded_byte_array_data_bytes if its size is abnormal. + offset_index_.unencoded_byte_array_data_bytes.clear(); + } + state_ = BuilderState::kFinished; break; } @@ -813,6 +854,14 @@ class PageIndexBuilderImpl final : public PageIndexBuilder { } // namespace +void OffsetIndexBuilder::AddPage(const PageLocation& page_location, + const SizeStatistics* size_stats) { + this->AddPage( + page_location.offset, page_location.compressed_page_size, + page_location.first_row_index, + size_stats ? size_stats->unencoded_byte_array_data_bytes() : std::nullopt); +} + RowGroupIndexReadRange PageIndexReader::DeterminePageIndexRangesInRowGroup( const RowGroupMetaData& row_group_metadata, const std::vector& columns) { int64_t ci_start = std::numeric_limits::max(); diff --git a/cpp/src/parquet/page_index.h b/cpp/src/parquet/page_index.h index d45c59cab223f..4bfdf4c9bc234 100644 --- a/cpp/src/parquet/page_index.h +++ b/cpp/src/parquet/page_index.h @@ -27,6 +27,7 @@ namespace parquet { class EncodedStatistics; +class SizeStatistics; struct PageIndexLocation; /// \brief ColumnIndex is a proxy around format::ColumnIndex. @@ -76,6 +77,8 @@ class PARQUET_EXPORT ColumnIndex { /// \brief A vector of page indices for non-null pages. virtual const std::vector& non_null_page_indices() const = 0; + + // TODO: read level histograms. }; /// \brief Typed implementation of ColumnIndex. @@ -129,6 +132,8 @@ class PARQUET_EXPORT OffsetIndex { /// \brief A vector of locations for each data page in this column. virtual const std::vector& page_locations() const = 0; + + // TODO: read unencoded byte array data bytes. }; /// \brief Interface for reading the page index for a Parquet row group. @@ -266,7 +271,9 @@ class PARQUET_EXPORT ColumnIndexBuilder { /// not update statistics anymore. /// /// \param stats Page statistics in the encoded form. - virtual void AddPage(const EncodedStatistics& stats) = 0; + /// \param size_stats Size statistics of the page if available. + virtual void AddPage(const EncodedStatistics& stats, + const SizeStatistics* size_stats = NULLPTR) = 0; /// \brief Complete the column index. /// @@ -299,15 +306,14 @@ class PARQUET_EXPORT OffsetIndexBuilder { virtual ~OffsetIndexBuilder() = default; - /// \brief Add page location of a data page. - virtual void AddPage(int64_t offset, int32_t compressed_page_size, - int64_t first_row_index) = 0; + /// \brief Add page location and size stats of a data page. + virtual void AddPage( + int64_t offset, int32_t compressed_page_size, int64_t first_row_index, + std::optional unencoded_byte_array_length = std::nullopt) = 0; - /// \brief Add page location of a data page. - void AddPage(const PageLocation& page_location) { - AddPage(page_location.offset, page_location.compressed_page_size, - page_location.first_row_index); - } + /// \brief Add page location and size stats of a data page. + void AddPage(const PageLocation& page_location, + const SizeStatistics* size_stats = NULLPTR); /// \brief Complete the offset index. /// diff --git a/cpp/src/parquet/size_statistics.cc b/cpp/src/parquet/size_statistics.cc new file mode 100644 index 0000000000000..a7a15b7fad7dc --- /dev/null +++ b/cpp/src/parquet/size_statistics.cc @@ -0,0 +1,265 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliancec +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/size_statistics.h" + +#include + +#include "arrow/type_traits.h" +#include "arrow/util/bit_run_reader.h" +#include "arrow/util/int_util_overflow.h" +#include "arrow/visit_data_inline.h" +#include "parquet/exception.h" +#include "parquet/schema.h" +#include "parquet/thrift_internal.h" +#include "parquet/types.h" + +namespace parquet { + +class SizeStatistics::SizeStatisticsImpl { + public: + SizeStatisticsImpl() = default; + + SizeStatisticsImpl(const format::SizeStatistics* size_stats, + const ColumnDescriptor* descr) + : rep_level_histogram_(size_stats->repetition_level_histogram), + def_level_histogram_(size_stats->definition_level_histogram) { + if (descr->physical_type() == Type::BYTE_ARRAY && + size_stats->__isset.unencoded_byte_array_data_bytes) { + unencoded_byte_array_data_bytes_ = size_stats->unencoded_byte_array_data_bytes; + } + } + + const std::vector& repetition_level_histogram() const { + return rep_level_histogram_; + } + + const std::vector& definition_level_histogram() const { + return def_level_histogram_; + } + + std::optional unencoded_byte_array_data_bytes() const { + return unencoded_byte_array_data_bytes_; + } + + void Merge(const SizeStatistics& other) { + if (rep_level_histogram_.size() != other.repetition_level_histogram().size() || + def_level_histogram_.size() != other.definition_level_histogram().size() || + unencoded_byte_array_data_bytes_.has_value() != + other.unencoded_byte_array_data_bytes().has_value()) { + throw ParquetException("Cannot merge incompatible SizeStatistics"); + } + + std::transform(rep_level_histogram_.begin(), rep_level_histogram_.end(), + other.repetition_level_histogram().begin(), + rep_level_histogram_.begin(), std::plus()); + + std::transform(def_level_histogram_.begin(), def_level_histogram_.end(), + other.definition_level_histogram().begin(), + def_level_histogram_.begin(), std::plus()); + if (unencoded_byte_array_data_bytes_.has_value()) { + unencoded_byte_array_data_bytes_ = unencoded_byte_array_data_bytes_.value() + + other.unencoded_byte_array_data_bytes().value(); + } + } + + private: + friend class SizeStatisticsBuilder; + std::vector rep_level_histogram_; + std::vector def_level_histogram_; + std::optional unencoded_byte_array_data_bytes_; +}; + +const std::vector& SizeStatistics::repetition_level_histogram() const { + return impl_->repetition_level_histogram(); +} + +const std::vector& SizeStatistics::definition_level_histogram() const { + return impl_->definition_level_histogram(); +} + +std::optional SizeStatistics::unencoded_byte_array_data_bytes() const { + return impl_->unencoded_byte_array_data_bytes(); +} + +void SizeStatistics::Merge(const SizeStatistics& other) { return impl_->Merge(other); } + +SizeStatistics::SizeStatistics(const void* size_statistics, const ColumnDescriptor* descr) + : impl_(std::make_unique( + reinterpret_cast(size_statistics), descr)) {} + +SizeStatistics::SizeStatistics() : impl_(std::make_unique()) {} + +SizeStatistics::~SizeStatistics() = default; + +std::unique_ptr SizeStatistics::Make(const void* size_statistics, + const ColumnDescriptor* descr) { + return std::unique_ptr(new SizeStatistics(size_statistics, descr)); +} + +class SizeStatisticsBuilder::SizeStatisticsBuilderImpl { + public: + SizeStatisticsBuilderImpl(const ColumnDescriptor* descr) + : rep_level_histogram_(descr->max_repetition_level() + 1, 0), + def_level_histogram_(descr->max_definition_level() + 1, 0) { + if (descr->physical_type() == Type::BYTE_ARRAY) { + unencoded_byte_array_data_bytes_ = 0; + } + } + + void WriteRepetitionLevels(int64_t num_levels, const int16_t* rep_levels) { + for (int64_t i = 0; i < num_levels; ++i) { + ARROW_DCHECK_LT(rep_levels[i], static_cast(rep_level_histogram_.size())); + rep_level_histogram_[rep_levels[i]]++; + } + } + + void WriteDefinitionLevels(int64_t num_levels, const int16_t* def_levels) { + for (int64_t i = 0; i < num_levels; ++i) { + ARROW_DCHECK_LT(def_levels[i], static_cast(def_level_histogram_.size())); + def_level_histogram_[def_levels[i]]++; + } + } + + void WriteRepetitionLevel(int64_t num_levels, int16_t rep_level) { + ARROW_DCHECK_LT(rep_level, static_cast(rep_level_histogram_.size())); + rep_level_histogram_[rep_level] += num_levels; + } + + void WriteDefinitionLevel(int64_t num_levels, int16_t def_level) { + ARROW_DCHECK_LT(def_level, static_cast(def_level_histogram_.size())); + def_level_histogram_[def_level] += num_levels; + } + + void WriteValuesSpaced(const ByteArray* values, const uint8_t* valid_bits, + int64_t valid_bits_offset, int64_t num_spaced_values) { + int64_t total_bytes = 0; + ::arrow::internal::VisitSetBitRunsVoid(valid_bits, valid_bits_offset, + num_spaced_values, + [&](int64_t pos, int64_t length) { + for (int64_t i = 0; i < length; i++) { + // Don't bother to check unlikely overflow. + total_bytes += values[i + pos].len; + } + }); + IncrementUnencodedByteArrayDataBytes(total_bytes); + } + + void WriteValues(const ByteArray* values, int64_t num_values) { + int64_t total_bytes = 0; + std::for_each(values, values + num_values, + [&](const ByteArray& value) { total_bytes += values->len; }); + IncrementUnencodedByteArrayDataBytes(total_bytes); + } + + void WriteValues(const ::arrow::Array& values) { + int64_t total_bytes = 0; + const auto valid_func = [&](ByteArray val) { total_bytes += val.len; }; + const auto null_func = [&]() {}; + + if (::arrow::is_binary_like(values.type_id())) { + ::arrow::VisitArraySpanInline<::arrow::BinaryType>( + *values.data(), std::move(valid_func), std::move(null_func)); + } else { + DCHECK(::arrow::is_large_binary_like(values.type_id())); + ::arrow::VisitArraySpanInline<::arrow::LargeBinaryType>( + *values.data(), std::move(valid_func), std::move(null_func)); + } + + IncrementUnencodedByteArrayDataBytes(total_bytes); + } + + std::unique_ptr Build() { + auto stats = std::unique_ptr(new SizeStatistics()); + stats->impl_->rep_level_histogram_ = rep_level_histogram_; + stats->impl_->def_level_histogram_ = def_level_histogram_; + stats->impl_->unencoded_byte_array_data_bytes_ = unencoded_byte_array_data_bytes_; + return stats; + } + + void Reset() { + rep_level_histogram_.assign(rep_level_histogram_.size(), 0); + def_level_histogram_.assign(def_level_histogram_.size(), 0); + if (unencoded_byte_array_data_bytes_.has_value()) { + unencoded_byte_array_data_bytes_ = 0; + } + } + + private: + void IncrementUnencodedByteArrayDataBytes(int64_t total_bytes) { + ARROW_DCHECK(unencoded_byte_array_data_bytes_.has_value()); + if (::arrow::internal::AddWithOverflow( + total_bytes, unencoded_byte_array_data_bytes_.value(), &total_bytes)) { + throw ParquetException("unencoded byte array data bytes overflows to INT64_MAX"); + } + unencoded_byte_array_data_bytes_ = total_bytes; + } + + private: + std::vector rep_level_histogram_; + std::vector def_level_histogram_; + std::optional unencoded_byte_array_data_bytes_; +}; + +void SizeStatisticsBuilder::WriteRepetitionLevels(int64_t num_levels, + const int16_t* rep_levels) { + impl_->WriteRepetitionLevels(num_levels, rep_levels); +} + +void SizeStatisticsBuilder::WriteDefinitionLevels(int64_t num_levels, + const int16_t* def_levels) { + impl_->WriteDefinitionLevels(num_levels, def_levels); +} + +void SizeStatisticsBuilder::WriteRepetitionLevel(int64_t num_levels, int16_t rep_level) { + impl_->WriteRepetitionLevel(num_levels, rep_level); +} + +void SizeStatisticsBuilder::WriteDefinitionLevel(int64_t num_levels, int16_t def_level) { + impl_->WriteDefinitionLevel(num_levels, def_level); +} + +void SizeStatisticsBuilder::WriteValuesSpaced(const ByteArray* values, + const uint8_t* valid_bits, + int64_t valid_bits_offset, + int64_t num_spaced_values) { + impl_->WriteValuesSpaced(values, valid_bits, valid_bits_offset, num_spaced_values); +} + +void SizeStatisticsBuilder::WriteValues(const ByteArray* values, int64_t num_values) { + impl_->WriteValues(values, num_values); +} + +void SizeStatisticsBuilder::WriteValues(const ::arrow::Array& values) { + impl_->WriteValues(values); +} + +std::unique_ptr SizeStatisticsBuilder::Build() { return impl_->Build(); } + +void SizeStatisticsBuilder::Reset() { return impl_->Reset(); } + +SizeStatisticsBuilder::SizeStatisticsBuilder(const ColumnDescriptor* descr) + : impl_(std::make_unique(descr)) {} + +SizeStatisticsBuilder::~SizeStatisticsBuilder() = default; + +std::unique_ptr SizeStatisticsBuilder::Make( + const ColumnDescriptor* descr) { + return std::unique_ptr(new SizeStatisticsBuilder(descr)); +} + +} // namespace parquet diff --git a/cpp/src/parquet/size_statistics.h b/cpp/src/parquet/size_statistics.h new file mode 100644 index 0000000000000..8ba6010611a47 --- /dev/null +++ b/cpp/src/parquet/size_statistics.h @@ -0,0 +1,156 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "parquet/platform.h" + +namespace parquet { + +struct ByteArray; +class ColumnDescriptor; + +/// \brief SizeStatistics is a proxy around format::SizeStatistics. +/// +/// A structure for capturing metadata for estimating the unencoded, +/// uncompressed size of data written. This is useful for readers to estimate +/// how much memory is needed to reconstruct data in their memory model and for +/// fine-grained filter push down on nested structures (the histograms contained +/// in this structure can help determine the number of nulls at a particular +/// nesting level and maximum length of lists). +class PARQUET_EXPORT SizeStatistics { + public: + /// \brief API convenience to get a SizeStatistics accessor + static std::unique_ptr Make(const void* size_statistics, + const ColumnDescriptor* descr); + + ~SizeStatistics(); + + /// When present, there is expected to be one element corresponding to each + /// repetition (i.e. size=max repetition_level+1) where each element + /// represents the number of times the repetition level was observed in the + /// data. + /// + /// This field may be omitted if max_repetition_level is 0 without loss + /// of information. + /// + /// \returns repetition level histogram of all levels if not empty. + const std::vector& repetition_level_histogram() const; + + /// Same as repetition_level_histogram except for definition levels. + /// + /// This field may be omitted if max_definition_level is 0 or 1 without + /// loss of information. + /// + /// \returns definition level histogram of all levels if not empty. + const std::vector& definition_level_histogram() const; + + /// The number of physical bytes stored for BYTE_ARRAY data values assuming + /// no encoding. This is exclusive of the bytes needed to store the length of + /// each byte array. In other words, this field is equivalent to the `(size + /// of PLAIN-ENCODING the byte array values) - (4 bytes * number of values + /// written)`. To determine unencoded sizes of other types readers can use + /// schema information multiplied by the number of non-null and null values. + /// The number of null/non-null values can be inferred from the histograms + /// below. + /// + /// For example, if a column chunk is dictionary-encoded with dictionary + /// ["a", "bc", "cde"], and a data page contains the indices [0, 0, 1, 2], + /// then this value for that data page should be 7 (1 + 1 + 2 + 3). + /// + /// This field should only be set for types that use BYTE_ARRAY as their + /// physical type. + /// + /// \returns unencoded and uncompressed byte size of the BYTE_ARRAY column, + /// or std::nullopt for other types. + std::optional unencoded_byte_array_data_bytes() const; + + /// \brief Merge two SizeStatistics of the same column. + /// + /// It is used to merge size statistics from all pages of the same column chunk. + void Merge(const SizeStatistics& other); + + private: + friend class SizeStatisticsBuilder; + SizeStatistics(const void* size_statistics, const ColumnDescriptor* descr); + + // PIMPL Idiom + SizeStatistics(); + class SizeStatisticsImpl; + std::unique_ptr impl_; +}; + +/// \brief Builder to create a SizeStatistics. +class PARQUET_EXPORT SizeStatisticsBuilder { + public: + /// \brief API convenience to get a SizeStatisticsBuilder. + static std::unique_ptr Make(const ColumnDescriptor* descr); + + ~SizeStatisticsBuilder(); + + /// \brief Add repetition levels to the histogram. + /// \param num_levels number of repetition levels to add. + /// \param rep_levels repetition levels to add. + void WriteRepetitionLevels(int64_t num_levels, const int16_t* rep_levels); + + /// \brief Add definition levels to the histogram. + /// \param num_levels number of definition levels to add. + /// \param def_levels definition levels to add. + void WriteDefinitionLevels(int64_t num_levels, const int16_t* def_levels); + + /// \brief Add repeated repetition level to the histogram. + /// \param num_levels number of repetition levels to add. + /// \param rep_level repeated repetition level value. + void WriteRepetitionLevel(int64_t num_levels, int16_t rep_level); + + /// \brief Add repeated definition level to the histogram. + /// \param num_levels number of definition levels to add. + /// \param def_level repeated definition level value. + void WriteDefinitionLevel(int64_t num_levels, int16_t def_level); + + /// \brief Add spaced BYTE_ARRAY values. + /// \param[in] values pointer to values of BYTE_ARRAY type. + /// \param[in] valid_bits pointer to bitmap representing if values are non-null. + /// \param[in] valid_bits_offset offset into valid_bits where the slice of data begins. + /// \param[in] num_spaced_values length of values in values/valid_bits to inspect. + void WriteValuesSpaced(const ByteArray* values, const uint8_t* valid_bits, + int64_t valid_bits_offset, int64_t num_spaced_values); + + /// \brief Add dense BYTE_ARRAY values. + /// \param values pointer to values of BYTE_ARRAY type. + /// \param num_values length of values. + void WriteValues(const ByteArray* values, int64_t num_values); + + /// \brief Add BYTE_ARRAY values in the arrow array. + void WriteValues(const ::arrow::Array& values); + + /// \brief Build a SizeStatistics from collected data. + std::unique_ptr Build(); + + /// \brief Reset all collected data for reuse. + void Reset(); + + private: + // PIMPL Idiom + SizeStatisticsBuilder(const ColumnDescriptor* descr); + class SizeStatisticsBuilderImpl; + std::unique_ptr impl_; +}; + +} // namespace parquet diff --git a/cpp/src/parquet/thrift_internal.h b/cpp/src/parquet/thrift_internal.h index 7491f118d32a0..ffc6680e2a3cc 100644 --- a/cpp/src/parquet/thrift_internal.h +++ b/cpp/src/parquet/thrift_internal.h @@ -43,6 +43,7 @@ #include "parquet/exception.h" #include "parquet/platform.h" #include "parquet/properties.h" +#include "parquet/size_statistics.h" #include "parquet/statistics.h" #include "parquet/types.h" @@ -383,6 +384,19 @@ static inline format::EncryptionAlgorithm ToThrift(EncryptionAlgorithm encryptio return encryption_algorithm; } +static inline format::SizeStatistics ToThrift(const SizeStatistics& size_stats) { + format::SizeStatistics size_statistics; + size_statistics.__set_repetition_level_histogram( + size_stats.repetition_level_histogram()); + size_statistics.__set_definition_level_histogram( + size_stats.definition_level_histogram()); + if (size_stats.unencoded_byte_array_data_bytes().has_value()) { + size_statistics.__set_unencoded_byte_array_data_bytes( + size_stats.unencoded_byte_array_data_bytes().value()); + } + return size_statistics; +} + // ---------------------------------------------------------------------- // Thrift struct serialization / deserialization utilities