Skip to content

Commit

Permalink
GH-40592: [C++][Parquet] Implement SizeStatistics
Browse files Browse the repository at this point in the history
  • Loading branch information
wgtmac committed Dec 18, 2024
1 parent 48e11b8 commit 849b196
Show file tree
Hide file tree
Showing 19 changed files with 967 additions and 71 deletions.
8 changes: 8 additions & 0 deletions cpp/src/arrow/util/hashing.h
Original file line number Diff line number Diff line change
Expand Up @@ -843,6 +843,14 @@ class BinaryMemoTable : public MemoTable {
}
}

// Visit the stored value at a specific index in insertion order.
// The visitor function should have the signature `void(std::string_view)`
// or `void(const std::string_view&)`.
template <typename VisitFunc>
void VisitValue(int32_t idx, VisitFunc&& visit) const {
visit(binary_builder_.GetView(idx));
}

protected:
struct Payload {
int32_t memo_index;
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/parquet/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ set(PARQUET_SRCS
printer.cc
properties.cc
schema.cc
size_statistics.cc
statistics.cc
stream_reader.cc
stream_writer.cc
Expand Down Expand Up @@ -373,6 +374,7 @@ add_parquet_test(internals-test
metadata_test.cc
page_index_test.cc
public_api_test.cc
size_statistics_test.cc
types_test.cc)

set_source_files_properties(public_api_test.cc PROPERTIES SKIP_PRECOMPILE_HEADERS ON
Expand Down
22 changes: 15 additions & 7 deletions cpp/src/parquet/column_page.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <optional>
#include <string>

#include "parquet/size_statistics.h"
#include "parquet/statistics.h"
#include "parquet/types.h"

Expand Down Expand Up @@ -69,27 +70,30 @@ class DataPage : public Page {
/// Currently it is only present from data pages created by ColumnWriter in order
/// to collect page index.
std::optional<int64_t> first_row_index() const { return first_row_index_; }
const SizeStatistics& size_statistics() const { return size_statistics_; }

virtual ~DataPage() = default;

protected:
DataPage(PageType::type type, const std::shared_ptr<Buffer>& buffer, int32_t num_values,
Encoding::type encoding, int64_t uncompressed_size,
EncodedStatistics statistics = EncodedStatistics(),
std::optional<int64_t> first_row_index = std::nullopt)
EncodedStatistics statistics, std::optional<int64_t> first_row_index,
SizeStatistics size_statistics)
: Page(buffer, type),
num_values_(num_values),
encoding_(encoding),
uncompressed_size_(uncompressed_size),
statistics_(std::move(statistics)),
first_row_index_(std::move(first_row_index)) {}
first_row_index_(std::move(first_row_index)),
size_statistics_(std::move(size_statistics)) {}

int32_t num_values_;
Encoding::type encoding_;
int64_t uncompressed_size_;
EncodedStatistics statistics_;
/// Row ordinal within the row group to the first row in the data page.
std::optional<int64_t> first_row_index_;
SizeStatistics size_statistics_;
};

class DataPageV1 : public DataPage {
Expand All @@ -98,9 +102,11 @@ class DataPageV1 : public DataPage {
Encoding::type encoding, Encoding::type definition_level_encoding,
Encoding::type repetition_level_encoding, int64_t uncompressed_size,
EncodedStatistics statistics = EncodedStatistics(),
std::optional<int64_t> first_row_index = std::nullopt)
std::optional<int64_t> first_row_index = std::nullopt,
SizeStatistics size_statistics = SizeStatistics())
: DataPage(PageType::DATA_PAGE, buffer, num_values, encoding, uncompressed_size,
std::move(statistics), std::move(first_row_index)),
std::move(statistics), std::move(first_row_index),
std::move(size_statistics)),
definition_level_encoding_(definition_level_encoding),
repetition_level_encoding_(repetition_level_encoding) {}

Expand All @@ -120,9 +126,11 @@ class DataPageV2 : public DataPage {
int32_t definition_levels_byte_length, int32_t repetition_levels_byte_length,
int64_t uncompressed_size, bool is_compressed = false,
EncodedStatistics statistics = EncodedStatistics(),
std::optional<int64_t> first_row_index = std::nullopt)
std::optional<int64_t> first_row_index = std::nullopt,
SizeStatistics size_statistics = SizeStatistics())
: DataPage(PageType::DATA_PAGE_V2, buffer, num_values, encoding, uncompressed_size,
std::move(statistics), std::move(first_row_index)),
std::move(statistics), std::move(first_row_index),
std::move(size_statistics)),
num_nulls_(num_nulls),
num_rows_(num_rows),
definition_levels_byte_length_(definition_levels_byte_length),
Expand Down
123 changes: 103 additions & 20 deletions cpp/src/parquet/column_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
#include "parquet/platform.h"
#include "parquet/properties.h"
#include "parquet/schema.h"
#include "parquet/size_statistics.h"
#include "parquet/statistics.h"
#include "parquet/thrift_internal.h"
#include "parquet/types.h"
Expand Down Expand Up @@ -437,7 +438,7 @@ class SerializedPageWriter : public PageWriter {

/// Collect page index
if (column_index_builder_ != nullptr) {
column_index_builder_->AddPage(page.statistics());
column_index_builder_->AddPage(page.statistics(), page.size_statistics());
}
if (offset_index_builder_ != nullptr) {
const int64_t compressed_size = output_data_len + header_size;
Expand All @@ -451,8 +452,9 @@ class SerializedPageWriter : public PageWriter {
/// start_pos is a relative offset in the buffered mode. It should be
/// adjusted via OffsetIndexBuilder::Finish() after BufferedPageWriter
/// has flushed all data pages.
offset_index_builder_->AddPage(start_pos, static_cast<int32_t>(compressed_size),
*page.first_row_index());
offset_index_builder_->AddPage(
start_pos, static_cast<int32_t>(compressed_size), *page.first_row_index(),
page.size_statistics().unencoded_byte_array_data_bytes);
}

total_uncompressed_size_ += uncompressed_size + header_size;
Expand Down Expand Up @@ -774,11 +776,17 @@ class ColumnWriterImpl {
// Serializes Dictionary Page if enabled
virtual void WriteDictionaryPage() = 0;

// A convenience struct to combine the encoded statistics and size statistics
struct StatisticsPair {
EncodedStatistics encoded_stats;
SizeStatistics size_stats;
};

// Plain-encoded statistics of the current page
virtual EncodedStatistics GetPageStatistics() = 0;
virtual StatisticsPair GetPageStatistics() = 0;

// Plain-encoded statistics of the whole chunk
virtual EncodedStatistics GetChunkStatistics() = 0;
virtual StatisticsPair GetChunkStatistics() = 0;

// Merges page statistics into chunk statistics, then resets the values
virtual void ResetPageStatistics() = 0;
Expand Down Expand Up @@ -981,8 +989,7 @@ void ColumnWriterImpl::BuildDataPageV1(int64_t definition_levels_rle_size,
PARQUET_THROW_NOT_OK(uncompressed_data_->Resize(uncompressed_size, false));
ConcatenateBuffers(definition_levels_rle_size, repetition_levels_rle_size, values,
uncompressed_data_->mutable_data());

EncodedStatistics page_stats = GetPageStatistics();
auto [page_stats, page_size_stats] = GetPageStatistics();
page_stats.ApplyStatSizeLimits(properties_->max_statistics_size(descr_->path()));
page_stats.set_is_signed(SortOrder::SIGNED == descr_->sort_order());
ResetPageStatistics();
Expand All @@ -1006,13 +1013,15 @@ void ColumnWriterImpl::BuildDataPageV1(int64_t definition_levels_rle_size,
compressed_data->CopySlice(0, compressed_data->size(), allocator_));
std::unique_ptr<DataPage> page_ptr = std::make_unique<DataPageV1>(
compressed_data_copy, num_values, encoding_, Encoding::RLE, Encoding::RLE,
uncompressed_size, std::move(page_stats), first_row_index);
uncompressed_size, std::move(page_stats), first_row_index,
std::move(page_size_stats));
total_compressed_bytes_ += page_ptr->size() + sizeof(format::PageHeader);

data_pages_.push_back(std::move(page_ptr));
} else { // Eagerly write pages
DataPageV1 page(compressed_data, num_values, encoding_, Encoding::RLE, Encoding::RLE,
uncompressed_size, std::move(page_stats), first_row_index);
uncompressed_size, std::move(page_stats), first_row_index,
std::move(page_size_stats));
WriteDataPage(page);
}
}
Expand All @@ -1039,7 +1048,7 @@ void ColumnWriterImpl::BuildDataPageV2(int64_t definition_levels_rle_size,
ConcatenateBuffers(definition_levels_rle_size, repetition_levels_rle_size,
compressed_values, combined->mutable_data());

EncodedStatistics page_stats = GetPageStatistics();
auto [page_stats, page_size_stats] = GetPageStatistics();
page_stats.ApplyStatSizeLimits(properties_->max_statistics_size(descr_->path()));
page_stats.set_is_signed(SortOrder::SIGNED == descr_->sort_order());
ResetPageStatistics();
Expand All @@ -1062,14 +1071,15 @@ void ColumnWriterImpl::BuildDataPageV2(int64_t definition_levels_rle_size,
combined->CopySlice(0, combined->size(), allocator_));
std::unique_ptr<DataPage> page_ptr = std::make_unique<DataPageV2>(
combined, num_values, null_count, num_rows, encoding_, def_levels_byte_length,
rep_levels_byte_length, uncompressed_size, pager_->has_compressor(), page_stats,
first_row_index);
rep_levels_byte_length, uncompressed_size, pager_->has_compressor(),
std::move(page_stats), first_row_index, std::move(page_size_stats));
total_compressed_bytes_ += page_ptr->size() + sizeof(format::PageHeader);
data_pages_.push_back(std::move(page_ptr));
} else {
DataPageV2 page(combined, num_values, null_count, num_rows, encoding_,
def_levels_byte_length, rep_levels_byte_length, uncompressed_size,
pager_->has_compressor(), page_stats, first_row_index);
pager_->has_compressor(), std::move(page_stats), first_row_index,
std::move(page_size_stats));
WriteDataPage(page);
}
}
Expand All @@ -1083,7 +1093,7 @@ int64_t ColumnWriterImpl::Close() {

FlushBufferedDataPages();

EncodedStatistics chunk_statistics = GetChunkStatistics();
auto [chunk_statistics, chunk_size_statistics] = GetChunkStatistics();
chunk_statistics.ApplyStatSizeLimits(
properties_->max_statistics_size(descr_->path()));
chunk_statistics.set_is_signed(SortOrder::SIGNED == descr_->sort_order());
Expand All @@ -1092,6 +1102,9 @@ int64_t ColumnWriterImpl::Close() {
if (rows_written_ > 0 && chunk_statistics.is_set()) {
metadata_->SetStatistics(chunk_statistics);
}
if (rows_written_ > 0 && chunk_size_statistics.is_set()) {
metadata_->SetSizeStatistics(chunk_size_statistics);
}
metadata_->SetKeyValueMetadata(key_value_metadata_);
pager_->Close(has_dictionary_, fallback_);
}
Expand Down Expand Up @@ -1217,6 +1230,11 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
page_statistics_ = MakeStatistics<DType>(descr_, allocator_);
chunk_statistics_ = MakeStatistics<DType>(descr_, allocator_);
}
if (properties->size_statistics_level() == SizeStatisticsLevel::ColumnChunk ||
properties->size_statistics_level() == SizeStatisticsLevel::PageAndColumnChunk) {
page_size_statistics_ = SizeStatistics::Make(descr_);
chunk_size_statistics_ = SizeStatistics::Make(descr_);
}
pages_change_on_record_boundaries_ =
properties->data_page_version() == ParquetDataPageVersion::V2 ||
properties->page_index_enabled(descr_->path());
Expand Down Expand Up @@ -1355,15 +1373,26 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
total_bytes_written_ += pager_->WriteDictionaryPage(page);
}

EncodedStatistics GetPageStatistics() override {
EncodedStatistics result;
if (page_statistics_) result = page_statistics_->Encode();
StatisticsPair GetPageStatistics() override {
StatisticsPair result;
if (page_statistics_) {
result.encoded_stats = page_statistics_->Encode();
}
if (properties_->size_statistics_level() == SizeStatisticsLevel::PageAndColumnChunk) {
ARROW_DCHECK(page_size_statistics_ != nullptr);
result.size_stats = *page_size_statistics_;
}
return result;
}

EncodedStatistics GetChunkStatistics() override {
EncodedStatistics result;
if (chunk_statistics_) result = chunk_statistics_->Encode();
StatisticsPair GetChunkStatistics() override {
StatisticsPair result;
if (chunk_statistics_) {
result.encoded_stats = chunk_statistics_->Encode();
}
if (chunk_size_statistics_) {
result.size_stats = *chunk_size_statistics_;
}
return result;
}

Expand All @@ -1372,6 +1401,10 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
chunk_statistics_->Merge(*page_statistics_);
page_statistics_->Reset();
}
if (page_size_statistics_ != nullptr) {
chunk_size_statistics_->Merge(*page_size_statistics_);
page_size_statistics_->Reset();
}
}

Type::type type() const override { return descr_->physical_type(); }
Expand Down Expand Up @@ -1425,6 +1458,8 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
DictEncoder<DType>* current_dict_encoder_;
std::shared_ptr<TypedStats> page_statistics_;
std::shared_ptr<TypedStats> chunk_statistics_;
std::unique_ptr<SizeStatistics> page_size_statistics_;
std::shared_ptr<SizeStatistics> chunk_size_statistics_;
bool pages_change_on_record_boundaries_;

// If writing a sequence of ::arrow::DictionaryArray to the writer, we keep the
Expand Down Expand Up @@ -1467,6 +1502,8 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
rows_written_ += num_values;
num_buffered_rows_ += num_values;
}

UpdateLevelHistogram(num_values, def_levels, rep_levels);
return values_to_write;
}

Expand Down Expand Up @@ -1558,6 +1595,47 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
rows_written_ += num_levels;
num_buffered_rows_ += num_levels;
}

UpdateLevelHistogram(num_levels, def_levels, rep_levels);
}

void UpdateLevelHistogram(int64_t num_levels, const int16_t* def_levels,
const int16_t* rep_levels) const {
if (page_size_statistics_ == nullptr) {
return;
}

auto add_levels = [](std::vector<int64_t>& level_histogram,
::arrow::util::span<const int16_t> levels) {
for (int16_t level : levels) {
ARROW_DCHECK_LT(level, static_cast<int16_t>(level_histogram.size()));
++level_histogram[level];
}
};

if (descr_->max_definition_level() > 0) {
add_levels(page_size_statistics_->definition_level_histogram,
{def_levels, static_cast<size_t>(num_levels)});
} else {
page_size_statistics_->definition_level_histogram[0] += num_levels;
}

if (descr_->max_repetition_level() > 0) {
add_levels(page_size_statistics_->repetition_level_histogram,
{rep_levels, static_cast<size_t>(num_levels)});
} else {
page_size_statistics_->repetition_level_histogram[0] += num_levels;
}
}

// Update the unencoded data bytes for ByteArray only per the specification.
void UpdateUnencodedDataBytes() const {
if constexpr (std::is_same_v<T, ByteArray>) {
if (page_size_statistics_ != nullptr) {
page_size_statistics_->IncrementUnencodedByteArrayDataBytes(
current_encoder_->ReportUnencodedDataBytes());
}
}
}

void CommitWriteAndCheckPageLimit(int64_t num_levels, int64_t num_values,
Expand Down Expand Up @@ -1611,6 +1689,7 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
if (page_statistics_ != nullptr) {
page_statistics_->Update(values, num_values, num_nulls);
}
UpdateUnencodedDataBytes();
}

/// \brief Write values with spaces and update page statistics accordingly.
Expand Down Expand Up @@ -1639,6 +1718,7 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
page_statistics_->UpdateSpaced(values, valid_bits, valid_bits_offset,
num_spaced_values, num_values, num_nulls);
}
UpdateUnencodedDataBytes();
}
};

Expand Down Expand Up @@ -1739,6 +1819,8 @@ Status TypedColumnWriterImpl<DType>::WriteArrowDictionary(
writeable_indices,
MaybeReplaceValidity(writeable_indices, null_count, ctx->memory_pool));
dict_encoder->PutIndices(*writeable_indices);
// Update unencoded byte array data size to size statistics
UpdateUnencodedDataBytes();
CommitWriteAndCheckPageLimit(batch_size, batch_num_values, null_count, check_page);
value_offset += batch_num_spaced_values;
};
Expand Down Expand Up @@ -2219,6 +2301,7 @@ Status TypedColumnWriterImpl<ByteArrayType>::WriteArrowDense(
page_statistics_->IncrementNullCount(batch_size - non_null);
page_statistics_->IncrementNumValues(non_null);
}
UpdateUnencodedDataBytes();
CommitWriteAndCheckPageLimit(batch_size, batch_num_values, batch_size - non_null,
check_page);
CheckDictionarySizeLimit();
Expand Down
Loading

0 comments on commit 849b196

Please sign in to comment.