Skip to content

Commit

Permalink
apacheGH-34053: [C++][Parquet] Write parquet page index
Browse files Browse the repository at this point in the history
  • Loading branch information
wgtmac committed Feb 6, 2023
1 parent 1aea20b commit 7fa4e81
Show file tree
Hide file tree
Showing 10 changed files with 597 additions and 14 deletions.
5 changes: 5 additions & 0 deletions cpp/src/parquet/column_page.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ class DataPage : public Page {
int64_t uncompressed_size() const { return uncompressed_size_; }
const EncodedStatistics& statistics() const { return statistics_; }

void SetFirstRowIndex(int64_t first_row_index) { first_row_index_ = first_row_index; }
std::optional<int64_t> GetFirstRowIndex() const { return first_row_index_; }

virtual ~DataPage() = default;

protected:
Expand All @@ -81,6 +84,8 @@ class DataPage : public Page {
Encoding::type encoding_;
int64_t uncompressed_size_;
EncodedStatistics statistics_;
/// Row ordinal within the row group to the first row in the data page.
std::optional<int64_t> first_row_index_ = std::nullopt;
};

class DataPageV1 : public DataPage {
Expand Down
80 changes: 73 additions & 7 deletions cpp/src/parquet/column_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
#include "parquet/encryption/internal_file_encryptor.h"
#include "parquet/level_conversion.h"
#include "parquet/metadata.h"
#include "parquet/page_index.h"
#include "parquet/platform.h"
#include "parquet/properties.h"
#include "parquet/schema.h"
Expand Down Expand Up @@ -250,7 +251,9 @@ class SerializedPageWriter : public PageWriter {
int16_t row_group_ordinal, int16_t column_chunk_ordinal,
MemoryPool* pool = ::arrow::default_memory_pool(),
std::shared_ptr<Encryptor> meta_encryptor = nullptr,
std::shared_ptr<Encryptor> data_encryptor = nullptr)
std::shared_ptr<Encryptor> data_encryptor = nullptr,
ColumnIndexBuilder* column_index_builder = nullptr,
OffsetIndexBuilder* offset_index_builder = nullptr)
: sink_(std::move(sink)),
metadata_(metadata),
pool_(pool),
Expand All @@ -264,7 +267,9 @@ class SerializedPageWriter : public PageWriter {
column_ordinal_(column_chunk_ordinal),
meta_encryptor_(std::move(meta_encryptor)),
data_encryptor_(std::move(data_encryptor)),
encryption_buffer_(AllocateBuffer(pool, 0)) {
encryption_buffer_(AllocateBuffer(pool, 0)),
column_index_builder_(column_index_builder),
offset_index_builder_(offset_index_builder) {
if (data_encryptor_ != nullptr || meta_encryptor_ != nullptr) {
InitEncryption();
}
Expand Down Expand Up @@ -331,6 +336,10 @@ class SerializedPageWriter : public PageWriter {
if (meta_encryptor_ != nullptr) {
UpdateEncryption(encryption::kColumnMetaData);
}

// Serialized page writer does not need to adjust page offsets.
FinishPageIndex(/*final_position=*/0);

// index_page_offset = -1 since they are not supported
metadata_->Finish(num_values_, dictionary_page_offset_, -1, data_page_offset_,
total_compressed_size_, total_uncompressed_size_, has_dictionary,
Expand Down Expand Up @@ -403,6 +412,25 @@ class SerializedPageWriter : public PageWriter {
thrift_serializer_->Serialize(&page_header, sink_.get(), meta_encryptor_);
PARQUET_THROW_NOT_OK(sink_->Write(output_data_buffer, output_data_len));

/// Collect page index
if (column_index_builder_ != nullptr) {
column_index_builder_->AddPage(page.statistics());
}
if (offset_index_builder_ != nullptr) {
const int64_t compressed_size = output_data_len + header_size;
if (compressed_size > std::numeric_limits<int32_t>::max()) {
throw ParquetException("Compressed page size overflows to INT32_MAX.");
}
if (!page.GetFirstRowIndex().has_value()) {
throw ParquetException("First row index is not set in data page.");
}
/// start_pos is a relative offset in the buffered mode. It should be
/// adjusted via OffsetIndexBuilder::Finish() after BufferedPageWriter
/// has flushed all data pages.
offset_index_builder_->AddPage(start_pos, static_cast<int32_t>(compressed_size),
*page.GetFirstRowIndex());
}

total_uncompressed_size_ += uncompressed_size + header_size;
total_compressed_size_ += output_data_len + header_size;
num_values_ += page.num_values();
Expand Down Expand Up @@ -444,6 +472,17 @@ class SerializedPageWriter : public PageWriter {
page_header.__set_data_page_header_v2(data_page_header);
}

/// \brief Finish page index builders and update the stream offset to adjust
/// page offsets.
void FinishPageIndex(int64_t final_position) {
if (column_index_builder_ != nullptr) {
column_index_builder_->Finish();
}
if (offset_index_builder_ != nullptr) {
offset_index_builder_->Finish(final_position);
}
}

bool has_compressor() override { return (compressor_ != nullptr); }

int64_t num_values() { return num_values_; }
Expand Down Expand Up @@ -536,6 +575,9 @@ class SerializedPageWriter : public PageWriter {

std::map<Encoding::type, int32_t> dict_encoding_stats_;
std::map<Encoding::type, int32_t> data_encoding_stats_;

ColumnIndexBuilder* column_index_builder_;
OffsetIndexBuilder* offset_index_builder_;
};

// This implementation of the PageWriter writes to the final sink on Close .
Expand All @@ -546,13 +588,15 @@ class BufferedPageWriter : public PageWriter {
int16_t row_group_ordinal, int16_t current_column_ordinal,
MemoryPool* pool = ::arrow::default_memory_pool(),
std::shared_ptr<Encryptor> meta_encryptor = nullptr,
std::shared_ptr<Encryptor> data_encryptor = nullptr)
std::shared_ptr<Encryptor> data_encryptor = nullptr,
ColumnIndexBuilder* column_index_builder = nullptr,
OffsetIndexBuilder* offset_index_builder = nullptr)
: final_sink_(std::move(sink)), metadata_(metadata), has_dictionary_pages_(false) {
in_memory_sink_ = CreateOutputStream(pool);
pager_ = std::make_unique<SerializedPageWriter>(
in_memory_sink_, codec, compression_level, metadata, row_group_ordinal,
current_column_ordinal, pool, std::move(meta_encryptor),
std::move(data_encryptor));
std::move(data_encryptor), column_index_builder, offset_index_builder);
}

int64_t WriteDictionaryPage(const DictionaryPage& page) override {
Expand All @@ -578,6 +622,9 @@ class BufferedPageWriter : public PageWriter {
// Write metadata at end of column chunk
metadata_->WriteTo(in_memory_sink_.get());

// Buffered page writer needs to adjust page offsets.
pager_->FinishPageIndex(final_position);

// flush everything to the serialized sink
PARQUET_ASSIGN_OR_THROW(auto buffer, in_memory_sink_->Finish());
PARQUET_THROW_NOT_OK(final_sink_->Write(buffer));
Expand Down Expand Up @@ -606,15 +653,18 @@ std::unique_ptr<PageWriter> PageWriter::Open(
int compression_level, ColumnChunkMetaDataBuilder* metadata,
int16_t row_group_ordinal, int16_t column_chunk_ordinal, MemoryPool* pool,
bool buffered_row_group, std::shared_ptr<Encryptor> meta_encryptor,
std::shared_ptr<Encryptor> data_encryptor) {
std::shared_ptr<Encryptor> data_encryptor, ColumnIndexBuilder* column_index_builder,
OffsetIndexBuilder* offset_index_builder) {
if (buffered_row_group) {
return std::make_unique<BufferedPageWriter>(
std::move(sink), codec, compression_level, metadata, row_group_ordinal,
column_chunk_ordinal, pool, std::move(meta_encryptor), std::move(data_encryptor));
column_chunk_ordinal, pool, std::move(meta_encryptor), std::move(data_encryptor),
column_index_builder, offset_index_builder);
} else {
return std::make_unique<SerializedPageWriter>(
std::move(sink), codec, compression_level, metadata, row_group_ordinal,
column_chunk_ordinal, pool, std::move(meta_encryptor), std::move(data_encryptor));
column_chunk_ordinal, pool, std::move(meta_encryptor), std::move(data_encryptor),
column_index_builder, offset_index_builder);
}
}

Expand Down Expand Up @@ -643,6 +693,7 @@ class ColumnWriterImpl {
num_buffered_values_(0),
num_buffered_encoded_values_(0),
rows_written_(0),
page_rows_written_(0),
total_bytes_written_(0),
total_compressed_bytes_(0),
closed_(false),
Expand Down Expand Up @@ -718,6 +769,9 @@ class ColumnWriterImpl {
// Serialize the buffered Data Pages
void FlushBufferedDataPages();

// Return number of rows in the current data page
int64_t num_page_rows() const { return rows_written_ - page_rows_written_; }

ColumnChunkMetaDataBuilder* metadata_;
const ColumnDescriptor* descr_;
// scratch buffer if validity bits need to be recalculated.
Expand Down Expand Up @@ -749,6 +803,9 @@ class ColumnWriterImpl {
// Total number of rows written with this ColumnWriter
int64_t rows_written_;

// Total number of rows written in the current data page
int64_t page_rows_written_;

// Records the total number of uncompressed bytes written by the serializer
int64_t total_bytes_written_;

Expand Down Expand Up @@ -854,6 +911,7 @@ void ColumnWriterImpl::AddDataPage() {
InitSinks();
num_buffered_values_ = 0;
num_buffered_encoded_values_ = 0;
page_rows_written_ = 0;
}

void ColumnWriterImpl::BuildDataPageV1(int64_t definition_levels_rle_size,
Expand Down Expand Up @@ -888,13 +946,15 @@ void ColumnWriterImpl::BuildDataPageV1(int64_t definition_levels_rle_size,
std::unique_ptr<DataPage> page_ptr = std::make_unique<DataPageV1>(
compressed_data_copy, static_cast<int32_t>(num_buffered_values_), encoding_,
Encoding::RLE, Encoding::RLE, uncompressed_size, page_stats);
page_ptr->SetFirstRowIndex(num_page_rows());
total_compressed_bytes_ += page_ptr->size() + sizeof(format::PageHeader);

data_pages_.push_back(std::move(page_ptr));
} else { // Eagerly write pages
DataPageV1 page(compressed_data, static_cast<int32_t>(num_buffered_values_),
encoding_, Encoding::RLE, Encoding::RLE, uncompressed_size,
page_stats);
page.SetFirstRowIndex(num_page_rows());
WriteDataPage(page);
}
}
Expand Down Expand Up @@ -939,12 +999,14 @@ void ColumnWriterImpl::BuildDataPageV2(int64_t definition_levels_rle_size,
std::unique_ptr<DataPage> page_ptr = std::make_unique<DataPageV2>(
combined, num_values, null_count, num_values, encoding_, def_levels_byte_length,
rep_levels_byte_length, uncompressed_size, pager_->has_compressor(), page_stats);
page_ptr->SetFirstRowIndex(num_page_rows());
total_compressed_bytes_ += page_ptr->size() + sizeof(format::PageHeader);
data_pages_.push_back(std::move(page_ptr));
} else {
DataPageV2 page(combined, num_values, null_count, num_values, encoding_,
def_levels_byte_length, rep_levels_byte_length, uncompressed_size,
pager_->has_compressor(), page_stats);
page.SetFirstRowIndex(num_page_rows());
WriteDataPage(page);
}
}
Expand Down Expand Up @@ -1248,13 +1310,15 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
for (int64_t i = 0; i < num_values; ++i) {
if (rep_levels[i] == 0) {
rows_written_++;
page_rows_written_++;
}
}

WriteRepetitionLevels(num_values, rep_levels);
} else {
// Each value is exactly one row
rows_written_ += num_values;
page_rows_written_ += num_values;
}
return values_to_write;
}
Expand Down Expand Up @@ -1338,12 +1402,14 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
for (int64_t i = 0; i < num_levels; ++i) {
if (rep_levels[i] == 0) {
rows_written_++;
page_rows_written_++;
}
}
WriteRepetitionLevels(num_levels, rep_levels);
} else {
// Each value is exactly one row
rows_written_ += num_levels;
page_rows_written_ += num_levels;
}
}

Expand Down
8 changes: 6 additions & 2 deletions cpp/src/parquet/column_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,13 @@ class RleEncoder;
namespace parquet {

struct ArrowWriteContext;
class ColumnChunkMetaDataBuilder;
class ColumnDescriptor;
class ColumnIndexBuilder;
class DataPage;
class DictionaryPage;
class ColumnChunkMetaDataBuilder;
class Encryptor;
class OffsetIndexBuilder;
class WriterProperties;

class PARQUET_EXPORT LevelEncoder {
Expand Down Expand Up @@ -90,7 +92,9 @@ class PARQUET_EXPORT PageWriter {
::arrow::MemoryPool* pool = ::arrow::default_memory_pool(),
bool buffered_row_group = false,
std::shared_ptr<Encryptor> header_encryptor = NULLPTR,
std::shared_ptr<Encryptor> data_encryptor = NULLPTR);
std::shared_ptr<Encryptor> data_encryptor = NULLPTR,
ColumnIndexBuilder* column_index_builder = NULLPTR,
OffsetIndexBuilder* offset_index_builder = NULLPTR);

// The Column Writer decides if dictionary encoding is used if set and
// if the dictionary encoding has fallen back to default encoding on reaching dictionary
Expand Down
21 changes: 19 additions & 2 deletions cpp/src/parquet/file_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
#include "parquet/encryption/encryption_internal.h"
#include "parquet/encryption/internal_file_encryptor.h"
#include "parquet/exception.h"
#include "parquet/page_index.h"
#include "parquet/platform.h"
#include "parquet/schema.h"
#include "parquet/types.h"

using arrow::MemoryPool;

Expand Down Expand Up @@ -291,6 +291,19 @@ class FileSerializer : public ParquetFileWriter::Contents {
// Write magic bytes and metadata
auto file_encryption_properties = properties_->file_encryption_properties();

if (page_index_builder_ != nullptr) {
if (file_encryption_properties != nullptr) {
throw ParquetException("Encryption is not supported with page index");
}

// Serialize page index after all row groups have been written and report
// location to the file metadata.
PageIndexLocation page_index_location;
page_index_builder_->Finish();
page_index_builder_->WriteTo(sink_.get(), &page_index_location);
metadata_->SetPageIndexLocation(page_index_location);
}

if (file_encryption_properties == nullptr) { // Non encrypted file.
file_metadata_ = metadata_->Finish();
WriteFileMetaData(*file_metadata_, sink_.get());
Expand Down Expand Up @@ -391,7 +404,7 @@ class FileSerializer : public ParquetFileWriter::Contents {
std::unique_ptr<FileMetaDataBuilder> metadata_;
// Only one of the row group writers is active at a time
std::unique_ptr<RowGroupWriter> row_group_writer_;

std::unique_ptr<PageIndexBuilder> page_index_builder_;
std::unique_ptr<InternalFileEncryptor> file_encryptor_;

void StartFile() {
Expand Down Expand Up @@ -430,6 +443,10 @@ class FileSerializer : public ParquetFileWriter::Contents {
PARQUET_THROW_NOT_OK(sink_->Write(kParquetMagic, 4));
}
}

if (properties_->write_page_index()) {
page_index_builder_ = PageIndexBuilder::Make(&schema_);
}
}
};

Expand Down
39 changes: 39 additions & 0 deletions cpp/src/parquet/metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1763,6 +1763,41 @@ class FileMetaDataBuilder::FileMetaDataBuilderImpl {
return current_row_group_builder_.get();
}

void SetPageIndexLocation(const PageIndexLocation& location) {
auto set_index_location =
[this](int32_t row_group_ordinal,
const PageIndexLocation::FileIndexLocation& file_index_location,
bool column_index) {
auto& row_group_metadata = this->row_groups_.at(row_group_ordinal);
auto iter = file_index_location.find(row_group_ordinal);
if (iter != file_index_location.cend()) {
const auto& row_group_index_location = iter->second;
for (size_t i = 0; i < row_group_index_location.size(); ++i) {
if (i >= row_group_metadata.columns.size()) {
throw ParquetException("Cannot find metadata for column ordinal ", i);
}
auto& column_metadata = row_group_metadata.columns.at(i);
const auto& index_location = row_group_index_location.at(i);
if (index_location.has_value()) {
if (column_index) {
column_metadata.__set_column_index_offset(index_location->offset);
column_metadata.__set_column_index_length(index_location->length);
} else {
column_metadata.__set_offset_index_offset(index_location->offset);
column_metadata.__set_offset_index_length(index_location->length);
}
}
}
}
};

for (size_t i = 0; i < row_groups_.size(); ++i) {
int32_t row_group_ordinal = static_cast<int32_t>(i);
set_index_location(row_group_ordinal, location.column_index_location, true);
set_index_location(row_group_ordinal, location.offset_index_location, false);
}
}

std::unique_ptr<FileMetaData> Finish() {
int64_t total_rows = 0;
for (auto row_group : row_groups_) {
Expand Down Expand Up @@ -1888,6 +1923,10 @@ RowGroupMetaDataBuilder* FileMetaDataBuilder::AppendRowGroup() {
return impl_->AppendRowGroup();
}

void FileMetaDataBuilder::SetPageIndexLocation(const PageIndexLocation& location) {
impl_->SetPageIndexLocation(location);
}

std::unique_ptr<FileMetaData> FileMetaDataBuilder::Finish() { return impl_->Finish(); }

std::unique_ptr<FileCryptoMetaData> FileMetaDataBuilder::GetCryptoMetaData() {
Expand Down
Loading

0 comments on commit 7fa4e81

Please sign in to comment.