Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-41579: [C++][Python][Parquet] Support reading/writing key-value metadata from/to ColumnChunkMetaData #41580

Merged
merged 22 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions cpp/src/parquet/column_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "arrow/util/crc32.h"
#include "arrow/util/endian.h"
#include "arrow/util/float16.h"
#include "arrow/util/key_value_metadata.h"
#include "arrow/util/logging.h"
#include "arrow/util/rle_encoding.h"
#include "arrow/util/type_traits.h"
Expand Down Expand Up @@ -836,6 +837,7 @@ class ColumnWriterImpl {
void FlushBufferedDataPages();

ColumnChunkMetaDataBuilder* metadata_;
std::shared_ptr<const KeyValueMetadata> key_value_metadata_;
const ColumnDescriptor* descr_;
// scratch buffer if validity bits need to be recalculated.
std::shared_ptr<ResizableBuffer> bits_buffer_;
Expand Down Expand Up @@ -1104,6 +1106,7 @@ int64_t ColumnWriterImpl::Close() {
if (rows_written_ > 0 && chunk_statistics.is_set()) {
metadata_->SetStatistics(chunk_statistics);
}
metadata_->SetKeyValueMetadata(key_value_metadata_);
pager_->Close(has_dictionary_, fallback_);
}

Expand Down Expand Up @@ -1405,6 +1408,25 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
return pages_change_on_record_boundaries_;
}

void AddKeyValueMetadata(
const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) override {
if (closed_) {
throw ParquetException("Cannot add key-value metadata to closed column");
}
clee704 marked this conversation as resolved.
Show resolved Hide resolved
if (key_value_metadata_ == nullptr) {
key_value_metadata_ = key_value_metadata;
} else if (key_value_metadata != nullptr) {
key_value_metadata_ = key_value_metadata_->Merge(*key_value_metadata);
}
}

void ResetKeyValueMetadata() override {
if (closed_) {
throw ParquetException("Cannot add key-value metadata to closed column");
}
key_value_metadata_ = nullptr;
}

private:
using ValueEncoderType = typename EncodingTraits<DType>::Encoder;
using TypedStats = TypedStatistics<DType>;
Expand Down
12 changes: 12 additions & 0 deletions cpp/src/parquet/column_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <cstring>
#include <memory>

#include "arrow/type_fwd.h"
#include "arrow/util/compression.h"
#include "parquet/exception.h"
#include "parquet/platform.h"
Expand Down Expand Up @@ -181,6 +182,17 @@ class PARQUET_EXPORT ColumnWriter {
/// \brief The file-level writer properties
virtual const WriterProperties* properties() = 0;

/// \brief Add key-value metadata to the ColumnChunk.
/// \param[in] key_value_metadata the metadata to add.
/// \note This will overwrite any existing metadata with the same key.
/// \throw ParquetException if Close() has been called.
virtual void AddKeyValueMetadata(
const std::shared_ptr<const ::arrow::KeyValueMetadata>& key_value_metadata) = 0;

/// \brief Reset the ColumnChunk key-value metadata.
/// \throw ParquetException if Close() has been called.
virtual void ResetKeyValueMetadata() = 0;

/// \brief Write Apache Arrow columnar data directly to ColumnWriter. Returns
/// error status if the array data type is not compatible with the concrete
/// writer type.
Expand Down
64 changes: 64 additions & 0 deletions cpp/src/parquet/column_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
#include <gtest/gtest.h>

#include "arrow/io/buffered.h"
#include "arrow/io/file.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/util/bit_util.h"
#include "arrow/util/bitmap_builders.h"
#include "arrow/util/config.h"
#include "arrow/util/key_value_metadata.h"

#include "parquet/column_page.h"
#include "parquet/column_reader.h"
Expand All @@ -51,6 +53,9 @@ using schema::PrimitiveNode;

namespace test {

using ::testing::IsNull;
using ::testing::NotNull;

// The default size used in most tests.
const int SMALL_SIZE = 100;
#ifdef PARQUET_VALGRIND
Expand Down Expand Up @@ -385,6 +390,15 @@ class TestPrimitiveWriter : public PrimitiveTypedTest<TestType> {
return metadata_accessor->encoding_stats();
}

std::shared_ptr<const KeyValueMetadata> metadata_key_value_metadata() {
// Metadata accessor must be created lazily.
// This is because the ColumnChunkMetaData semantics dictate the metadata object is
// complete (no changes to the metadata buffer can be made after instantiation)
auto metadata_accessor =
ColumnChunkMetaData::Make(metadata_->contents(), this->descr_);
return metadata_accessor->key_value_metadata();
}

protected:
int64_t values_read_;
// Keep the reader alive as for ByteArray the lifetime of the ByteArray
Expand Down Expand Up @@ -1705,5 +1719,55 @@ TEST(TestColumnWriter, WriteDataPageV2HeaderNullCount) {
}
}

using TestInt32Writer = TestPrimitiveWriter<Int32Type>;

TEST_F(TestInt32Writer, NoWriteKeyValueMetadata) {
auto writer = this->BuildWriter();
writer->Close();
auto key_value_metadata = metadata_key_value_metadata();
ASSERT_THAT(key_value_metadata, IsNull());
}

TEST_F(TestInt32Writer, WriteKeyValueMetadata) {
auto writer = this->BuildWriter();
writer->AddKeyValueMetadata(KeyValueMetadata::Make({"hello"}, {"world"}));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a test where AddKeyValueMetadata is called twice?

writer->Close();
auto key_value_metadata = metadata_key_value_metadata();
ASSERT_THAT(key_value_metadata, NotNull());
ASSERT_THAT(key_value_metadata->size(), 1);
ASSERT_OK_AND_ASSIGN(auto value, key_value_metadata->Get("hello"));
ASSERT_THAT(value, "world");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use ASSERT_EQ here so that the test is more readable? Currently it's a bit confusing what's being checked exactly.

Copy link
Contributor Author

@clee704 clee704 Jul 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm against using ASSERT_EQ because the direction is reversed, e.g., ASSERT_EQ(expected, actual), ASSERT_THAT(actual, matcher), and it can hurt the readability when you have both ASSERT_EQ and ASSERT_THAT in the same code. If the readability is a concern, we can write ASSERT_THAT(actual, Eq(expected)). What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm ok with ASSERT_THAT(actual, Eq(expected))

}

TEST_F(TestInt32Writer, ResetKeyValueMetadata) {
auto writer = this->BuildWriter();
writer->AddKeyValueMetadata(KeyValueMetadata::Make({"hello"}, {"world"}));
writer->ResetKeyValueMetadata();
writer->Close();
auto key_value_metadata = metadata_key_value_metadata();
ASSERT_THAT(key_value_metadata, IsNull());
}

TEST_F(TestInt32Writer, WriteKeyValueMetadataEndToEnd) {
auto sink = CreateOutputStream();
{
auto file_writer = ParquetFileWriter::Open(
sink, std::dynamic_pointer_cast<schema::GroupNode>(schema_.schema_root()));
auto rg_writer = file_writer->AppendRowGroup();
auto col_writer = rg_writer->NextColumn();
col_writer->AddKeyValueMetadata(KeyValueMetadata::Make({"foo"}, {"bar"}));
file_writer->Close();
}
ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
auto file_reader =
ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(buffer));
auto key_value_metadata =
file_reader->metadata()->RowGroup(0)->ColumnChunk(0)->key_value_metadata();
ASSERT_THAT(key_value_metadata, NotNull());
ASSERT_THAT(key_value_metadata->size(), 1);
ASSERT_OK_AND_ASSIGN(auto value, key_value_metadata->Get("foo"));
ASSERT_THAT(value, "bar");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question here.

}

} // namespace test
} // namespace parquet
76 changes: 56 additions & 20 deletions cpp/src/parquet/metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,31 @@ std::shared_ptr<Statistics> MakeColumnStats(const format::ColumnMetaData& meta_d
throw ParquetException("Can't decode page statistics for selected column type");
}

template <typename Metadata>
std::shared_ptr<KeyValueMetadata> CopyKeyValueMetadata(const Metadata& source) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps call this FromThriftKeyValueMetadata?

std::shared_ptr<KeyValueMetadata> metadata = nullptr;
if (source.__isset.key_value_metadata) {
metadata = std::make_shared<KeyValueMetadata>();
for (const auto& it : source.key_value_metadata) {
metadata->Append(it.key, it.value);
}
mapleFU marked this conversation as resolved.
Show resolved Hide resolved
}
return metadata;
}

template <typename Metadata>
void ToThriftKeyValueMetadata(Metadata& metadata, const KeyValueMetadata& source) {
mapleFU marked this conversation as resolved.
Show resolved Hide resolved
std::vector<format::KeyValue> key_value_metadata;
key_value_metadata.reserve(static_cast<size_t>(source.size()));
for (int64_t i = 0; i < source.size(); ++i) {
format::KeyValue kv_pair;
kv_pair.__set_key(source.key(i));
kv_pair.__set_value(source.value(i));
key_value_metadata.emplace_back(kv_pair);
mapleFU marked this conversation as resolved.
Show resolved Hide resolved
}
metadata.__set_key_value_metadata(key_value_metadata);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
metadata.__set_key_value_metadata(key_value_metadata);
metadata.__set_key_value_metadata(std::move(key_value_metadata));

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mapleFU suggested removing the move, so did I. What's the consensus?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aha sorry for misleading, previously I suggest not moving element in member when building ( metadata_ ). We can applying move for temporary thrift objects

}

// MetaData Accessor

// ColumnCryptoMetaData
Expand Down Expand Up @@ -230,6 +255,7 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl {
encoding_stats.count});
}
possible_stats_ = nullptr;
InitKeyValueMetadata();
}

bool Equals(const ColumnChunkMetaDataImpl& other) const {
Expand Down Expand Up @@ -340,7 +366,15 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl {
return std::nullopt;
}

const std::shared_ptr<const KeyValueMetadata>& key_value_metadata() const {
return key_value_metadata_;
}

private:
void InitKeyValueMetadata() {
key_value_metadata_ = CopyKeyValueMetadata(*column_metadata_);
}

mutable std::shared_ptr<Statistics> possible_stats_;
std::vector<Encoding::type> encodings_;
std::vector<PageEncodingStats> encoding_stats_;
Expand All @@ -350,6 +384,7 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl {
const ColumnDescriptor* descr_;
const ReaderProperties properties_;
const ApplicationVersion* writer_version_;
std::shared_ptr<const KeyValueMetadata> key_value_metadata_;
};

std::unique_ptr<ColumnChunkMetaData> ColumnChunkMetaData::Make(
Expand Down Expand Up @@ -468,6 +503,11 @@ bool ColumnChunkMetaData::Equals(const ColumnChunkMetaData& other) const {
return impl_->Equals(*other.impl_);
}

const std::shared_ptr<const KeyValueMetadata>& ColumnChunkMetaData::key_value_metadata()
const {
return impl_->key_value_metadata();
}

// row-group metadata
class RowGroupMetaData::RowGroupMetaDataImpl {
public:
Expand Down Expand Up @@ -863,16 +903,7 @@ class FileMetaData::FileMetaDataImpl {
schema_.updateColumnOrders(column_orders);
}

void InitKeyValueMetadata() {
std::shared_ptr<KeyValueMetadata> metadata = nullptr;
if (metadata_->__isset.key_value_metadata) {
metadata = std::make_shared<KeyValueMetadata>();
for (const auto& it : metadata_->key_value_metadata) {
metadata->Append(it.key, it.value);
}
}
key_value_metadata_ = std::move(metadata);
}
void InitKeyValueMetadata() { key_value_metadata_ = CopyKeyValueMetadata(*metadata_); }
};

std::shared_ptr<FileMetaData> FileMetaData::Make(
Expand Down Expand Up @@ -1518,6 +1549,10 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl {
column_chunk_->meta_data.__set_encodings(std::move(thrift_encodings));
column_chunk_->meta_data.__set_encoding_stats(std::move(thrift_encoding_stats));

if (key_value_metadata_) {
ToThriftKeyValueMetadata(column_chunk_->meta_data, *key_value_metadata_);
}

const auto& encrypt_md =
properties_->column_encryption_properties(column_->path()->ToDotString());
// column is encrypted
Expand Down Expand Up @@ -1584,6 +1619,10 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl {
return column_chunk_->meta_data.total_compressed_size;
}

void SetKeyValueMetadata(std::shared_ptr<const KeyValueMetadata> key_value_metadata) {
key_value_metadata_ = std::move(key_value_metadata);
}

private:
void Init(format::ColumnChunk* column_chunk) {
column_chunk_ = column_chunk;
Expand All @@ -1598,6 +1637,7 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl {
std::unique_ptr<format::ColumnChunk> owned_column_chunk_;
const std::shared_ptr<WriterProperties> properties_;
const ColumnDescriptor* column_;
std::shared_ptr<const KeyValueMetadata> key_value_metadata_;
};

std::unique_ptr<ColumnChunkMetaDataBuilder> ColumnChunkMetaDataBuilder::Make(
Expand Down Expand Up @@ -1655,6 +1695,11 @@ void ColumnChunkMetaDataBuilder::SetStatistics(const EncodedStatistics& result)
impl_->SetStatistics(result);
}

void ColumnChunkMetaDataBuilder::SetKeyValueMetadata(
std::shared_ptr<const KeyValueMetadata> key_value_metadata) {
impl_->SetKeyValueMetadata(key_value_metadata);
}

int64_t ColumnChunkMetaDataBuilder::total_compressed_size() const {
return impl_->total_compressed_size();
}
Expand Down Expand Up @@ -1853,16 +1898,7 @@ class FileMetaDataBuilder::FileMetaDataBuilderImpl {
} else if (key_value_metadata) {
key_value_metadata_ = key_value_metadata_->Merge(*key_value_metadata);
}
metadata_->key_value_metadata.clear();
metadata_->key_value_metadata.reserve(
static_cast<size_t>(key_value_metadata_->size()));
for (int64_t i = 0; i < key_value_metadata_->size(); ++i) {
format::KeyValue kv_pair;
kv_pair.__set_key(key_value_metadata_->key(i));
kv_pair.__set_value(key_value_metadata_->value(i));
metadata_->key_value_metadata.push_back(std::move(kv_pair));
}
metadata_->__isset.key_value_metadata = true;
ToThriftKeyValueMetadata(*metadata_, *key_value_metadata_);
}

int32_t file_version = 0;
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/parquet/metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ class PARQUET_EXPORT ColumnChunkMetaData {
std::unique_ptr<ColumnCryptoMetaData> crypto_metadata() const;
std::optional<IndexLocation> GetColumnIndexLocation() const;
std::optional<IndexLocation> GetOffsetIndexLocation() const;
const std::shared_ptr<const KeyValueMetadata>& key_value_metadata() const;

private:
explicit ColumnChunkMetaData(
Expand Down Expand Up @@ -452,8 +453,12 @@ class PARQUET_EXPORT ColumnChunkMetaDataBuilder {
// column chunk
// Used when a dataset is spread across multiple files
void set_file_path(const std::string& path);

// column metadata
void SetStatistics(const EncodedStatistics& stats);

void SetKeyValueMetadata(std::shared_ptr<const KeyValueMetadata> key_value_metadata);

// get the column descriptor
const ColumnDescriptor* descr() const;

Expand Down
32 changes: 25 additions & 7 deletions cpp/src/parquet/printer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,25 @@ void PrintPageEncodingStats(std::ostream& stream,
// the fixed initial size is just for an example
#define COL_WIDTH 30

void Put(std::ostream& stream, char c, int n) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you give this a more distinctive name, such as PutChars?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

for (int i = 0; i < n; ++i) {
stream.put(c);
}
}

void PrintKeyValueMetadata(std::ostream& stream,
const KeyValueMetadata& key_value_metadata,
int indent_level = 0, int indent_width = 1) {
clee704 marked this conversation as resolved.
Show resolved Hide resolved
const int64_t size_of_key_value_metadata = key_value_metadata.size();
Put(stream, ' ', indent_level * indent_width);
stream << "Key Value Metadata: " << size_of_key_value_metadata << " entries\n";
for (int64_t i = 0; i < size_of_key_value_metadata; i++) {
Put(stream, ' ', (indent_level + 1) * indent_width);
stream << "Key nr " << i << " " << key_value_metadata.key(i) << ": "
<< key_value_metadata.value(i) << "\n";
}
}

void ParquetFilePrinter::DebugPrint(std::ostream& stream, std::list<int> selected_columns,
bool print_values, bool format_dump,
bool print_key_value_metadata, const char* filename) {
Expand All @@ -76,12 +95,7 @@ void ParquetFilePrinter::DebugPrint(std::ostream& stream, std::list<int> selecte

if (print_key_value_metadata && file_metadata->key_value_metadata()) {
auto key_value_metadata = file_metadata->key_value_metadata();
int64_t size_of_key_value_metadata = key_value_metadata->size();
stream << "Key Value File Metadata: " << size_of_key_value_metadata << " entries\n";
for (int64_t i = 0; i < size_of_key_value_metadata; i++) {
stream << " Key nr " << i << " " << key_value_metadata->key(i) << ": "
<< key_value_metadata->value(i) << "\n";
}
PrintKeyValueMetadata(stream, *key_value_metadata);
}

stream << "Number of RowGroups: " << file_metadata->num_row_groups() << "\n";
Expand Down Expand Up @@ -136,7 +150,11 @@ void ParquetFilePrinter::DebugPrint(std::ostream& stream, std::list<int> selecte
std::shared_ptr<Statistics> stats = column_chunk->statistics();

const ColumnDescriptor* descr = file_metadata->schema()->Column(i);
stream << "Column " << i << std::endl << " Values: " << column_chunk->num_values();
stream << "Column " << i << std::endl;
if (print_key_value_metadata && column_chunk->key_value_metadata()) {
PrintKeyValueMetadata(stream, *column_chunk->key_value_metadata(), 1, 2);
}
stream << " Values: " << column_chunk->num_values();
if (column_chunk->is_stats_set()) {
std::string min = stats->EncodeMin(), max = stats->EncodeMax();
stream << ", Null Values: " << stats->null_count()
Expand Down
Loading
Loading