Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-41579: [C++][Python][Parquet] Support reading/writing key-value metadata from/to ColumnChunkMetaData #41580

Merged
merged 22 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions cpp/src/parquet/column_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "arrow/util/crc32.h"
#include "arrow/util/endian.h"
#include "arrow/util/float16.h"
#include "arrow/util/key_value_metadata.h"
#include "arrow/util/logging.h"
#include "arrow/util/rle_encoding_internal.h"
#include "arrow/util/type_traits.h"
Expand Down Expand Up @@ -832,6 +833,9 @@ class ColumnWriterImpl {
void FlushBufferedDataPages();

ColumnChunkMetaDataBuilder* metadata_;
// key_value_metadata_ for the column chunk
// It would be nullptr if there is no KeyValueMetadata set.
std::shared_ptr<const KeyValueMetadata> key_value_metadata_;
const ColumnDescriptor* descr_;
// scratch buffer if validity bits need to be recalculated.
std::shared_ptr<ResizableBuffer> bits_buffer_;
Expand Down Expand Up @@ -1100,6 +1104,7 @@ int64_t ColumnWriterImpl::Close() {
if (rows_written_ > 0 && chunk_statistics.is_set()) {
metadata_->SetStatistics(chunk_statistics);
}
metadata_->SetKeyValueMetadata(key_value_metadata_);
pager_->Close(has_dictionary_, fallback_);
}

Expand Down Expand Up @@ -1397,6 +1402,25 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
return pages_change_on_record_boundaries_;
}

void AddKeyValueMetadata(
const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) override {
if (closed_) {
throw ParquetException("Cannot add key-value metadata to closed column");
}
clee704 marked this conversation as resolved.
Show resolved Hide resolved
if (key_value_metadata_ == nullptr) {
key_value_metadata_ = key_value_metadata;
} else if (key_value_metadata != nullptr) {
key_value_metadata_ = key_value_metadata_->Merge(*key_value_metadata);
}
}

void ResetKeyValueMetadata() override {
if (closed_) {
throw ParquetException("Cannot add key-value metadata to closed column");
}
key_value_metadata_ = nullptr;
}

private:
using ValueEncoderType = typename EncodingTraits<DType>::Encoder;
using TypedStats = TypedStatistics<DType>;
Expand Down
12 changes: 12 additions & 0 deletions cpp/src/parquet/column_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <cstring>
#include <memory>

#include "arrow/type_fwd.h"
#include "arrow/util/compression.h"
#include "parquet/exception.h"
#include "parquet/platform.h"
Expand Down Expand Up @@ -181,6 +182,17 @@ class PARQUET_EXPORT ColumnWriter {
/// \brief The file-level writer properties
virtual const WriterProperties* properties() = 0;

/// \brief Add key-value metadata to the ColumnChunk.
/// \param[in] key_value_metadata the metadata to add.
/// \note This will overwrite any existing metadata with the same key.
/// \throw ParquetException if Close() has been called.
virtual void AddKeyValueMetadata(
const std::shared_ptr<const ::arrow::KeyValueMetadata>& key_value_metadata) = 0;

/// \brief Reset the ColumnChunk key-value metadata.
/// \throw ParquetException if Close() has been called.
virtual void ResetKeyValueMetadata() = 0;

/// \brief Write Apache Arrow columnar data directly to ColumnWriter. Returns
/// error status if the array data type is not compatible with the concrete
/// writer type.
Expand Down
64 changes: 64 additions & 0 deletions cpp/src/parquet/column_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
#include <gtest/gtest.h>

#include "arrow/io/buffered.h"
#include "arrow/io/file.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/util/bit_util.h"
#include "arrow/util/bitmap_builders.h"
#include "arrow/util/config.h"
#include "arrow/util/key_value_metadata.h"

#include "parquet/column_page.h"
#include "parquet/column_reader.h"
Expand All @@ -51,6 +53,9 @@ using schema::PrimitiveNode;

namespace test {

using ::testing::IsNull;
using ::testing::NotNull;

// The default size used in most tests.
const int SMALL_SIZE = 100;
#ifdef PARQUET_VALGRIND
Expand Down Expand Up @@ -385,6 +390,15 @@ class TestPrimitiveWriter : public PrimitiveTypedTest<TestType> {
return metadata_accessor->encoding_stats();
}

std::shared_ptr<const KeyValueMetadata> metadata_key_value_metadata() {
// Metadata accessor must be created lazily.
// This is because the ColumnChunkMetaData semantics dictate the metadata object is
// complete (no changes to the metadata buffer can be made after instantiation)
auto metadata_accessor =
ColumnChunkMetaData::Make(metadata_->contents(), this->descr_);
return metadata_accessor->key_value_metadata();
}

protected:
int64_t values_read_;
// Keep the reader alive as for ByteArray the lifetime of the ByteArray
Expand Down Expand Up @@ -1705,5 +1719,55 @@ TEST(TestColumnWriter, WriteDataPageV2HeaderNullCount) {
}
}

using TestInt32Writer = TestPrimitiveWriter<Int32Type>;

TEST_F(TestInt32Writer, NoWriteKeyValueMetadata) {
auto writer = this->BuildWriter();
writer->Close();
auto key_value_metadata = metadata_key_value_metadata();
ASSERT_THAT(key_value_metadata, IsNull());
}

TEST_F(TestInt32Writer, WriteKeyValueMetadata) {
auto writer = this->BuildWriter();
writer->AddKeyValueMetadata(KeyValueMetadata::Make({"hello"}, {"world"}));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a test where AddKeyValueMetadata is called twice?

writer->Close();
auto key_value_metadata = metadata_key_value_metadata();
ASSERT_THAT(key_value_metadata, NotNull());
ASSERT_EQ(1, key_value_metadata->size());
ASSERT_OK_AND_ASSIGN(auto value, key_value_metadata->Get("hello"));
ASSERT_EQ("world", value);
}

TEST_F(TestInt32Writer, ResetKeyValueMetadata) {
auto writer = this->BuildWriter();
writer->AddKeyValueMetadata(KeyValueMetadata::Make({"hello"}, {"world"}));
writer->ResetKeyValueMetadata();
writer->Close();
auto key_value_metadata = metadata_key_value_metadata();
ASSERT_THAT(key_value_metadata, IsNull());
}

TEST_F(TestInt32Writer, WriteKeyValueMetadataEndToEnd) {
auto sink = CreateOutputStream();
{
auto file_writer = ParquetFileWriter::Open(
sink, std::dynamic_pointer_cast<schema::GroupNode>(schema_.schema_root()));
auto rg_writer = file_writer->AppendRowGroup();
auto col_writer = rg_writer->NextColumn();
col_writer->AddKeyValueMetadata(KeyValueMetadata::Make({"foo"}, {"bar"}));
file_writer->Close();
}
ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
auto file_reader =
ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(buffer));
auto key_value_metadata =
file_reader->metadata()->RowGroup(0)->ColumnChunk(0)->key_value_metadata();
ASSERT_THAT(key_value_metadata, NotNull());
ASSERT_EQ(1U, key_value_metadata->size());
ASSERT_OK_AND_ASSIGN(auto value, key_value_metadata->Get("foo"));
ASSERT_EQ("bar", value);
}

} // namespace test
} // namespace parquet
83 changes: 62 additions & 21 deletions cpp/src/parquet/metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,36 @@ std::shared_ptr<Statistics> MakeColumnStats(const format::ColumnMetaData& meta_d
throw ParquetException("Can't decode page statistics for selected column type");
}

template <typename Metadata>
std::shared_ptr<KeyValueMetadata> CopyKeyValueMetadata(const Metadata& source) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps call this FromThriftKeyValueMetadata?

std::shared_ptr<KeyValueMetadata> metadata = nullptr;
if (source.__isset.key_value_metadata) {
std::vector<std::string> keys;
std::vector<std::string> values;
keys.reserve(source.key_value_metadata.size());
values.reserve(source.key_value_metadata.size());
for (const auto& it : source.key_value_metadata) {
keys.push_back(it.key);
values.push_back(it.value);
}
metadata = std::make_shared<KeyValueMetadata>(std::move(keys), std::move(values));
}
return metadata;
}

template <typename Metadata>
void ToThriftKeyValueMetadata(const KeyValueMetadata& source, Metadata* metadata) {
std::vector<format::KeyValue> key_value_metadata;
key_value_metadata.reserve(static_cast<size_t>(source.size()));
for (int64_t i = 0; i < source.size(); ++i) {
format::KeyValue kv_pair;
kv_pair.__set_key(source.key(i));
kv_pair.__set_value(source.value(i));
key_value_metadata.emplace_back(std::move(kv_pair));
}
metadata->__set_key_value_metadata(std::move(key_value_metadata));
}

// MetaData Accessor

// ColumnCryptoMetaData
Expand Down Expand Up @@ -233,6 +263,7 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl {
encoding_stats.count});
}
possible_stats_ = nullptr;
InitKeyValueMetadata();
}

bool Equals(const ColumnChunkMetaDataImpl& other) const {
Expand Down Expand Up @@ -343,7 +374,15 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl {
return std::nullopt;
}

const std::shared_ptr<const KeyValueMetadata>& key_value_metadata() const {
return key_value_metadata_;
}

private:
void InitKeyValueMetadata() {
key_value_metadata_ = CopyKeyValueMetadata(*column_metadata_);
}

mutable std::shared_ptr<Statistics> possible_stats_;
std::vector<Encoding::type> encodings_;
std::vector<PageEncodingStats> encoding_stats_;
Expand All @@ -353,6 +392,7 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl {
const ColumnDescriptor* descr_;
const ReaderProperties properties_;
const ApplicationVersion* writer_version_;
std::shared_ptr<const KeyValueMetadata> key_value_metadata_;
};

std::unique_ptr<ColumnChunkMetaData> ColumnChunkMetaData::Make(
Expand Down Expand Up @@ -471,6 +511,11 @@ bool ColumnChunkMetaData::Equals(const ColumnChunkMetaData& other) const {
return impl_->Equals(*other.impl_);
}

const std::shared_ptr<const KeyValueMetadata>& ColumnChunkMetaData::key_value_metadata()
const {
return impl_->key_value_metadata();
}

// row-group metadata
class RowGroupMetaData::RowGroupMetaDataImpl {
public:
Expand Down Expand Up @@ -913,7 +958,7 @@ class FileMetaData::FileMetaDataImpl {
std::vector<parquet::ColumnOrder> column_orders;
if (metadata_->__isset.column_orders) {
column_orders.reserve(metadata_->column_orders.size());
for (auto column_order : metadata_->column_orders) {
for (auto& column_order : metadata_->column_orders) {
if (column_order.__isset.TYPE_ORDER) {
column_orders.push_back(ColumnOrder::type_defined_);
} else {
Expand All @@ -927,16 +972,7 @@ class FileMetaData::FileMetaDataImpl {
schema_.updateColumnOrders(column_orders);
}

void InitKeyValueMetadata() {
std::shared_ptr<KeyValueMetadata> metadata = nullptr;
if (metadata_->__isset.key_value_metadata) {
metadata = std::make_shared<KeyValueMetadata>();
for (const auto& it : metadata_->key_value_metadata) {
metadata->Append(it.key, it.value);
}
}
key_value_metadata_ = std::move(metadata);
}
void InitKeyValueMetadata() { key_value_metadata_ = CopyKeyValueMetadata(*metadata_); }
};

std::shared_ptr<FileMetaData> FileMetaData::Make(
Expand Down Expand Up @@ -1590,6 +1626,10 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl {
column_chunk_->meta_data.__set_encodings(std::move(thrift_encodings));
column_chunk_->meta_data.__set_encoding_stats(std::move(thrift_encoding_stats));

if (key_value_metadata_) {
ToThriftKeyValueMetadata(*key_value_metadata_, &column_chunk_->meta_data);
}

const auto& encrypt_md =
properties_->column_encryption_properties(column_->path()->ToDotString());
// column is encrypted
Expand Down Expand Up @@ -1656,6 +1696,10 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl {
return column_chunk_->meta_data.total_compressed_size;
}

void SetKeyValueMetadata(std::shared_ptr<const KeyValueMetadata> key_value_metadata) {
key_value_metadata_ = std::move(key_value_metadata);
}

private:
void Init(format::ColumnChunk* column_chunk) {
column_chunk_ = column_chunk;
Expand All @@ -1670,6 +1714,7 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl {
std::unique_ptr<format::ColumnChunk> owned_column_chunk_;
const std::shared_ptr<WriterProperties> properties_;
const ColumnDescriptor* column_;
std::shared_ptr<const KeyValueMetadata> key_value_metadata_;
};

std::unique_ptr<ColumnChunkMetaDataBuilder> ColumnChunkMetaDataBuilder::Make(
Expand Down Expand Up @@ -1727,6 +1772,11 @@ void ColumnChunkMetaDataBuilder::SetStatistics(const EncodedStatistics& result)
impl_->SetStatistics(result);
}

void ColumnChunkMetaDataBuilder::SetKeyValueMetadata(
std::shared_ptr<const KeyValueMetadata> key_value_metadata) {
impl_->SetKeyValueMetadata(std::move(key_value_metadata));
}

int64_t ColumnChunkMetaDataBuilder::total_compressed_size() const {
return impl_->total_compressed_size();
}
Expand Down Expand Up @@ -1925,16 +1975,7 @@ class FileMetaDataBuilder::FileMetaDataBuilderImpl {
} else if (key_value_metadata) {
key_value_metadata_ = key_value_metadata_->Merge(*key_value_metadata);
}
metadata_->key_value_metadata.clear();
metadata_->key_value_metadata.reserve(
static_cast<size_t>(key_value_metadata_->size()));
for (int64_t i = 0; i < key_value_metadata_->size(); ++i) {
format::KeyValue kv_pair;
kv_pair.__set_key(key_value_metadata_->key(i));
kv_pair.__set_value(key_value_metadata_->value(i));
metadata_->key_value_metadata.push_back(std::move(kv_pair));
}
metadata_->__isset.key_value_metadata = true;
ToThriftKeyValueMetadata(*key_value_metadata_, metadata_.get());
}

int32_t file_version = 0;
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/parquet/metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ class PARQUET_EXPORT ColumnChunkMetaData {
std::unique_ptr<ColumnCryptoMetaData> crypto_metadata() const;
std::optional<IndexLocation> GetColumnIndexLocation() const;
std::optional<IndexLocation> GetOffsetIndexLocation() const;
const std::shared_ptr<const KeyValueMetadata>& key_value_metadata() const;

private:
explicit ColumnChunkMetaData(
Expand Down Expand Up @@ -466,8 +467,12 @@ class PARQUET_EXPORT ColumnChunkMetaDataBuilder {
// column chunk
// Used when a dataset is spread across multiple files
void set_file_path(const std::string& path);

// column metadata
void SetStatistics(const EncodedStatistics& stats);

void SetKeyValueMetadata(std::shared_ptr<const KeyValueMetadata> key_value_metadata);

// get the column descriptor
const ColumnDescriptor* descr() const;

Expand Down
Loading
Loading