From 3bd57e35cc949bce6a04d7fbfed30c6db755b023 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Thu, 13 Apr 2023 08:41:56 +0800 Subject: [PATCH] GH-34888: [C++][Parquet] Writer supports adding extra kv meta (#34889) ### Rationale for this change Parquet specs support storing key-value metadata provided by the user. However, the parquet-cpp writer can only set it via ParquetFileWriter::Open(). Sometimes user may want to add extra information to it while writing. So it is good to support adding extra key-value metadata any time before closing the file writer. ### What changes are included in this PR? Add a new interface `void AddKeyValueMetadata(std::shared_ptr key_value_metadata)` to the `ParquetFileWriter` class. User can now add more key-value metadata to the file if not closed. ### Are these changes tested? Added a new `Metadata.TestAddKeyValueMetadata` test to verify key-value metadata added before closing the writer are well preserved. ### Are there any user-facing changes? Yes, user can add custom key-value metadata whenever writer is not closed. * Closes: #34888 Lead-authored-by: Gang Wu Co-authored-by: Will Jones Signed-off-by: Will Jones --- cpp/src/parquet/file_writer.cc | 29 ++++++++++++++---- cpp/src/parquet/file_writer.h | 10 +++++++ cpp/src/parquet/metadata.cc | 24 +++++++++++---- cpp/src/parquet/metadata.h | 11 +++++-- cpp/src/parquet/metadata_test.cc | 51 ++++++++++++++++++++++++++++++-- 5 files changed, 110 insertions(+), 15 deletions(-) diff --git a/cpp/src/parquet/file_writer.cc b/cpp/src/parquet/file_writer.cc index 481d5b6d30dbb..f1098e4a74bc5 100644 --- a/cpp/src/parquet/file_writer.cc +++ b/cpp/src/parquet/file_writer.cc @@ -17,13 +17,14 @@ #include "parquet/file_writer.h" -#include #include #include #include #include #include +#include "arrow/util/key_value_metadata.h" +#include "arrow/util/logging.h" #include "parquet/column_writer.h" #include "parquet/encryption/encryption_internal.h" #include "parquet/encryption/internal_file_encryptor.h" @@ -338,7 +339,7 @@ class FileSerializer : public ParquetFileWriter::Contents { auto file_encryption_properties = properties_->file_encryption_properties(); if (file_encryption_properties == nullptr) { // Non encrypted file. - file_metadata_ = metadata_->Finish(); + file_metadata_ = metadata_->Finish(key_value_metadata_); WriteFileMetaData(*file_metadata_, sink_.get()); } else { // Encrypted file CloseEncryptedFile(file_encryption_properties); @@ -376,6 +377,15 @@ class FileSerializer : public ParquetFileWriter::Contents { RowGroupWriter* AppendBufferedRowGroup() override { return AppendRowGroup(true); } + void AddKeyValueMetadata( + const std::shared_ptr& key_value_metadata) override { + if (key_value_metadata_ == nullptr) { + key_value_metadata_ = std::move(key_value_metadata); + } else if (key_value_metadata != nullptr) { + key_value_metadata_ = key_value_metadata_->Merge(*key_value_metadata); + } + } + ~FileSerializer() override { try { Close(); @@ -394,7 +404,7 @@ class FileSerializer : public ParquetFileWriter::Contents { properties_(std::move(properties)), num_row_groups_(0), num_rows_(0), - metadata_(FileMetaDataBuilder::Make(&schema_, properties_, key_value_metadata_)) { + metadata_(FileMetaDataBuilder::Make(&schema_, properties_)) { PARQUET_ASSIGN_OR_THROW(int64_t position, sink_->Tell()); if (position == 0) { StartFile(); @@ -407,7 +417,7 @@ class FileSerializer : public ParquetFileWriter::Contents { // Encrypted file with encrypted footer if (file_encryption_properties->encrypted_footer()) { // encrypted footer - file_metadata_ = metadata_->Finish(); + file_metadata_ = metadata_->Finish(key_value_metadata_); PARQUET_ASSIGN_OR_THROW(int64_t position, sink_->Tell()); uint64_t metadata_start = static_cast(position); @@ -422,7 +432,7 @@ class FileSerializer : public ParquetFileWriter::Contents { sink_->Write(reinterpret_cast(&footer_and_crypto_len), 4)); PARQUET_THROW_NOT_OK(sink_->Write(kParquetEMagic, 4)); } else { // Encrypted file with plaintext footer - file_metadata_ = metadata_->Finish(); + file_metadata_ = metadata_->Finish(key_value_metadata_); auto footer_signing_encryptor = file_encryptor_->GetFooterSigningEncryptor(); WriteEncryptedFileMetadata(*file_metadata_, sink_.get(), footer_signing_encryptor, false); @@ -613,6 +623,15 @@ RowGroupWriter* ParquetFileWriter::AppendRowGroup(int64_t num_rows) { return AppendRowGroup(); } +void ParquetFileWriter::AddKeyValueMetadata( + const std::shared_ptr& key_value_metadata) { + if (contents_) { + contents_->AddKeyValueMetadata(key_value_metadata); + } else { + throw ParquetException("Cannot add key-value metadata to closed file"); + } +} + const std::shared_ptr& ParquetFileWriter::properties() const { return contents_->properties(); } diff --git a/cpp/src/parquet/file_writer.h b/cpp/src/parquet/file_writer.h index f2888599fb1a2..3bda1e535cfa6 100644 --- a/cpp/src/parquet/file_writer.h +++ b/cpp/src/parquet/file_writer.h @@ -163,6 +163,9 @@ class PARQUET_EXPORT ParquetFileWriter { return key_value_metadata_; } + virtual void AddKeyValueMetadata( + const std::shared_ptr& key_value_metadata) = 0; + // Return const-pointer to make it clear that this object is not to be copied const SchemaDescriptor* schema() const { return &schema_; } @@ -209,6 +212,13 @@ class PARQUET_EXPORT ParquetFileWriter { /// until the next call to AppendRowGroup or AppendBufferedRowGroup or Close. RowGroupWriter* AppendBufferedRowGroup(); + /// \brief Add key-value metadata to the file. + /// \param[in] key_value_metadata the metadata to add. + /// \note This will overwrite any existing metadata with the same key. + /// \throw ParquetException if Close() has been called. + void AddKeyValueMetadata( + const std::shared_ptr& key_value_metadata); + /// Number of columns. /// /// This number is fixed during the lifetime of the writer as it is determined via diff --git a/cpp/src/parquet/metadata.cc b/cpp/src/parquet/metadata.cc index b6c240115f532..ddb4fb143d3d3 100644 --- a/cpp/src/parquet/metadata.cc +++ b/cpp/src/parquet/metadata.cc @@ -1740,7 +1740,6 @@ void RowGroupMetaDataBuilder::Finish(int64_t total_bytes_written, } // file metadata -// TODO(PARQUET-595) Support key_value_metadata class FileMetaDataBuilder::FileMetaDataBuilderImpl { public: explicit FileMetaDataBuilderImpl( @@ -1797,7 +1796,8 @@ class FileMetaDataBuilder::FileMetaDataBuilderImpl { } } - std::unique_ptr Finish() { + std::unique_ptr Finish( + const std::shared_ptr& key_value_metadata) { int64_t total_rows = 0; for (auto row_group : row_groups_) { total_rows += row_group.num_rows; @@ -1805,7 +1805,12 @@ class FileMetaDataBuilder::FileMetaDataBuilderImpl { metadata_->__set_num_rows(total_rows); metadata_->__set_row_groups(row_groups_); - if (key_value_metadata_) { + if (key_value_metadata_ || key_value_metadata) { + if (!key_value_metadata_) { + key_value_metadata_ = key_value_metadata; + } else if (key_value_metadata) { + key_value_metadata_ = key_value_metadata_->Merge(*key_value_metadata); + } metadata_->key_value_metadata.clear(); metadata_->key_value_metadata.reserve(key_value_metadata_->size()); for (int64_t i = 0; i < key_value_metadata_->size(); ++i) { @@ -1829,7 +1834,7 @@ class FileMetaDataBuilder::FileMetaDataBuilderImpl { metadata_->__set_version(file_version); metadata_->__set_created_by(properties_->created_by()); - // Users cannot set the `ColumnOrder` since we donot not have user defined sort order + // Users cannot set the `ColumnOrder` since we do not have user defined sort order // in the spec yet. // We always default to `TYPE_DEFINED_ORDER`. We can expose it in // the API once we have user defined sort orders in the Parquet format. @@ -1910,6 +1915,12 @@ std::unique_ptr FileMetaDataBuilder::Make( new FileMetaDataBuilder(schema, std::move(props), std::move(key_value_metadata))); } +std::unique_ptr FileMetaDataBuilder::Make( + const SchemaDescriptor* schema, std::shared_ptr props) { + return std::unique_ptr( + new FileMetaDataBuilder(schema, std::move(props))); +} + FileMetaDataBuilder::FileMetaDataBuilder( const SchemaDescriptor* schema, std::shared_ptr props, std::shared_ptr key_value_metadata) @@ -1926,7 +1937,10 @@ void FileMetaDataBuilder::SetPageIndexLocation(const PageIndexLocation& location impl_->SetPageIndexLocation(location); } -std::unique_ptr FileMetaDataBuilder::Finish() { return impl_->Finish(); } +std::unique_ptr FileMetaDataBuilder::Finish( + const std::shared_ptr& key_value_metadata) { + return impl_->Finish(key_value_metadata); +} std::unique_ptr FileMetaDataBuilder::GetCryptoMetaData() { return impl_->BuildFileCryptoMetaData(); diff --git a/cpp/src/parquet/metadata.h b/cpp/src/parquet/metadata.h index efcb17be04641..620bc842f9ec5 100644 --- a/cpp/src/parquet/metadata.h +++ b/cpp/src/parquet/metadata.h @@ -523,10 +523,14 @@ struct PageIndexLocation { class PARQUET_EXPORT FileMetaDataBuilder { public: - // API convenience to get a MetaData reader + ARROW_DEPRECATED("Deprecated in 12.0.0. Use overload without KeyValueMetadata instead.") static std::unique_ptr Make( const SchemaDescriptor* schema, std::shared_ptr props, - std::shared_ptr key_value_metadata = NULLPTR); + std::shared_ptr key_value_metadata); + + // API convenience to get a MetaData builder + static std::unique_ptr Make( + const SchemaDescriptor* schema, std::shared_ptr props); ~FileMetaDataBuilder(); @@ -537,7 +541,8 @@ class PARQUET_EXPORT FileMetaDataBuilder { void SetPageIndexLocation(const PageIndexLocation& location); // Complete the Thrift structure - std::unique_ptr Finish(); + std::unique_ptr Finish( + const std::shared_ptr& key_value_metadata = NULLPTR); // crypto metadata std::unique_ptr GetCryptoMetaData(); diff --git a/cpp/src/parquet/metadata_test.cc b/cpp/src/parquet/metadata_test.cc index a7c7b7b00e717..4375661f5cde9 100644 --- a/cpp/src/parquet/metadata_test.cc +++ b/cpp/src/parquet/metadata_test.cc @@ -21,6 +21,7 @@ #include "arrow/util/key_value_metadata.h" #include "parquet/file_reader.h" +#include "parquet/file_writer.h" #include "parquet/schema.h" #include "parquet/statistics.h" #include "parquet/test_util.h" @@ -284,16 +285,62 @@ TEST(Metadata, TestKeyValueMetadata) { auto kvmeta = std::make_shared(); kvmeta->Append("test_key", "test_value"); - auto f_builder = FileMetaDataBuilder::Make(&schema, props, kvmeta); + auto f_builder = FileMetaDataBuilder::Make(&schema, props); // Read the metadata - auto f_accessor = f_builder->Finish(); + auto f_accessor = f_builder->Finish(kvmeta); // Key value metadata ASSERT_TRUE(f_accessor->key_value_metadata()); EXPECT_TRUE(f_accessor->key_value_metadata()->Equals(*kvmeta)); } +TEST(Metadata, TestAddKeyValueMetadata) { + schema::NodeVector fields; + fields.push_back(schema::Int32("int_col", Repetition::REQUIRED)); + auto schema = std::static_pointer_cast( + schema::GroupNode::Make("schema", Repetition::REQUIRED, fields)); + + auto kv_meta = std::make_shared(); + kv_meta->Append("test_key_1", "test_value_1"); + kv_meta->Append("test_key_2", "test_value_2_"); + + auto sink = CreateOutputStream(); + auto writer_props = parquet::WriterProperties::Builder().disable_dictionary()->build(); + auto file_writer = + parquet::ParquetFileWriter::Open(sink, schema, writer_props, kv_meta); + + // Key value metadata that will be added to the file. + auto kv_meta_added = std::make_shared(); + kv_meta_added->Append("test_key_2", "test_value_2"); + kv_meta_added->Append("test_key_3", "test_value_3"); + + file_writer->AddKeyValueMetadata(kv_meta_added); + file_writer->Close(); + + // Throw if appending key value metadata to closed file. + auto kv_meta_ignored = std::make_shared(); + kv_meta_ignored->Append("test_key_4", "test_value_4"); + EXPECT_THROW(file_writer->AddKeyValueMetadata(kv_meta_ignored), ParquetException); + + PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish()); + auto source = std::make_shared<::arrow::io::BufferReader>(buffer); + auto file_reader = ParquetFileReader::Open(source); + + ASSERT_NE(nullptr, file_reader->metadata()); + ASSERT_NE(nullptr, file_reader->metadata()->key_value_metadata()); + auto read_kv_meta = file_reader->metadata()->key_value_metadata(); + + // Verify keys that were added before file writer was closed are present. + for (int i = 1; i <= 3; ++i) { + auto index = std::to_string(i); + PARQUET_ASSIGN_OR_THROW(auto value, read_kv_meta->Get("test_key_" + index)); + EXPECT_EQ("test_value_" + index, value); + } + // Verify keys that were added after file writer was closed are not present. + EXPECT_FALSE(read_kv_meta->Contains("test_key_4")); +} + TEST(Metadata, TestHasBloomFilter) { std::string dir_string(parquet::test::get_data_dir()); std::string path = dir_string + "/data_index_bloom_encoding_stats.parquet";