diff --git a/cpp/src/parquet/file_writer.cc b/cpp/src/parquet/file_writer.cc index ec2e0e8a8a304..6736a59cf7917 100644 --- a/cpp/src/parquet/file_writer.cc +++ b/cpp/src/parquet/file_writer.cc @@ -17,20 +17,19 @@ #include "parquet/file_writer.h" -#include #include #include #include #include #include +#include "arrow/util/key_value_metadata.h" #include "parquet/column_writer.h" #include "parquet/encryption/encryption_internal.h" #include "parquet/encryption/internal_file_encryptor.h" #include "parquet/exception.h" #include "parquet/platform.h" #include "parquet/schema.h" -#include "parquet/types.h" using arrow::MemoryPool; @@ -321,7 +320,7 @@ class FileSerializer : public ParquetFileWriter::Contents { auto file_encryption_properties = properties_->file_encryption_properties(); if (file_encryption_properties == nullptr) { // Non encrypted file. - file_metadata_ = metadata_->Finish(); + file_metadata_ = metadata_->Finish(key_value_metadata_); WriteFileMetaData(*file_metadata_, sink_.get()); } else { // Encrypted file CloseEncryptedFile(file_encryption_properties); @@ -356,6 +355,15 @@ class FileSerializer : public ParquetFileWriter::Contents { RowGroupWriter* AppendBufferedRowGroup() override { return AppendRowGroup(true); } + void AddKeyValueMetadata( + std::shared_ptr key_value_metadata) override { + if (key_value_metadata_ == nullptr) { + key_value_metadata_ = std::move(key_value_metadata); + } else if (key_value_metadata != nullptr) { + key_value_metadata_ = key_value_metadata_->Merge(*key_value_metadata); + } + } + ~FileSerializer() override { try { Close(); @@ -374,7 +382,7 @@ class FileSerializer : public ParquetFileWriter::Contents { properties_(std::move(properties)), num_row_groups_(0), num_rows_(0), - metadata_(FileMetaDataBuilder::Make(&schema_, properties_, key_value_metadata_)) { + metadata_(FileMetaDataBuilder::Make(&schema_, properties_)) { PARQUET_ASSIGN_OR_THROW(int64_t position, sink_->Tell()); if (position == 0) { StartFile(); @@ -387,7 +395,7 @@ class FileSerializer : public ParquetFileWriter::Contents { // Encrypted file with encrypted footer if (file_encryption_properties->encrypted_footer()) { // encrypted footer - file_metadata_ = metadata_->Finish(); + file_metadata_ = metadata_->Finish(key_value_metadata_); PARQUET_ASSIGN_OR_THROW(int64_t position, sink_->Tell()); uint64_t metadata_start = static_cast(position); @@ -402,7 +410,7 @@ class FileSerializer : public ParquetFileWriter::Contents { sink_->Write(reinterpret_cast(&footer_and_crypto_len), 4)); PARQUET_THROW_NOT_OK(sink_->Write(kParquetEMagic, 4)); } else { // Encrypted file with plaintext footer - file_metadata_ = metadata_->Finish(); + file_metadata_ = metadata_->Finish(key_value_metadata_); auto footer_signing_encryptor = file_encryptor_->GetFooterSigningEncryptor(); WriteEncryptedFileMetadata(*file_metadata_, sink_.get(), footer_signing_encryptor, false); @@ -574,6 +582,13 @@ RowGroupWriter* ParquetFileWriter::AppendRowGroup(int64_t num_rows) { return AppendRowGroup(); } +void ParquetFileWriter::AddKeyValueMetadata( + std::shared_ptr key_value_metadata) { + if (contents_) { + contents_->AddKeyValueMetadata(std::move(key_value_metadata)); + } +} + const std::shared_ptr& ParquetFileWriter::properties() const { return contents_->properties(); } diff --git a/cpp/src/parquet/file_writer.h b/cpp/src/parquet/file_writer.h index f2888599fb1a2..629faf1d0feb1 100644 --- a/cpp/src/parquet/file_writer.h +++ b/cpp/src/parquet/file_writer.h @@ -163,6 +163,9 @@ class PARQUET_EXPORT ParquetFileWriter { return key_value_metadata_; } + virtual void AddKeyValueMetadata( + std::shared_ptr key_value_metadata) = 0; + // Return const-pointer to make it clear that this object is not to be copied const SchemaDescriptor* schema() const { return &schema_; } @@ -209,6 +212,12 @@ class PARQUET_EXPORT ParquetFileWriter { /// until the next call to AppendRowGroup or AppendBufferedRowGroup or Close. RowGroupWriter* AppendBufferedRowGroup(); + /// \brief Add key-value metadata to the file. + /// \param[in] key_value_metadata the metadata to add. + /// \note This will overwrite any existing metadata with the same key. + /// It will not take effect if Close() has been called. + void AddKeyValueMetadata(std::shared_ptr key_value_metadata); + /// Number of columns. /// /// This number is fixed during the lifetime of the writer as it is determined via diff --git a/cpp/src/parquet/metadata.cc b/cpp/src/parquet/metadata.cc index e47257bf89cee..bc3b567eda703 100644 --- a/cpp/src/parquet/metadata.cc +++ b/cpp/src/parquet/metadata.cc @@ -1743,13 +1743,11 @@ void RowGroupMetaDataBuilder::Finish(int64_t total_bytes_written, // TODO(PARQUET-595) Support key_value_metadata class FileMetaDataBuilder::FileMetaDataBuilderImpl { public: - explicit FileMetaDataBuilderImpl( - const SchemaDescriptor* schema, std::shared_ptr props, - std::shared_ptr key_value_metadata) + explicit FileMetaDataBuilderImpl(const SchemaDescriptor* schema, + std::shared_ptr props) : metadata_(new format::FileMetaData()), properties_(std::move(props)), - schema_(schema), - key_value_metadata_(std::move(key_value_metadata)) { + schema_(schema) { if (properties_->file_encryption_properties() != nullptr && properties_->file_encryption_properties()->encrypted_footer()) { crypto_metadata_.reset(new format::FileCryptoMetaData()); @@ -1763,7 +1761,8 @@ class FileMetaDataBuilder::FileMetaDataBuilderImpl { return current_row_group_builder_.get(); } - std::unique_ptr Finish() { + std::unique_ptr Finish( + const std::shared_ptr& key_value_metadata) { int64_t total_rows = 0; for (auto row_group : row_groups_) { total_rows += row_group.num_rows; @@ -1771,13 +1770,13 @@ class FileMetaDataBuilder::FileMetaDataBuilderImpl { metadata_->__set_num_rows(total_rows); metadata_->__set_row_groups(row_groups_); - if (key_value_metadata_) { + if (key_value_metadata) { metadata_->key_value_metadata.clear(); - metadata_->key_value_metadata.reserve(key_value_metadata_->size()); - for (int64_t i = 0; i < key_value_metadata_->size(); ++i) { + metadata_->key_value_metadata.reserve(key_value_metadata->size()); + for (int64_t i = 0; i < key_value_metadata->size(); ++i) { format::KeyValue kv_pair; - kv_pair.__set_key(key_value_metadata_->key(i)); - kv_pair.__set_value(key_value_metadata_->value(i)); + kv_pair.__set_key(key_value_metadata->key(i)); + kv_pair.__set_value(key_value_metadata->value(i)); metadata_->key_value_metadata.push_back(kv_pair); } metadata_->__isset.key_value_metadata = true; @@ -1795,7 +1794,7 @@ class FileMetaDataBuilder::FileMetaDataBuilderImpl { metadata_->__set_version(file_version); metadata_->__set_created_by(properties_->created_by()); - // Users cannot set the `ColumnOrder` since we donot not have user defined sort order + // Users cannot set the `ColumnOrder` since we do not have user defined sort order // in the spec yet. // We always default to `TYPE_DEFINED_ORDER`. We can expose it in // the API once we have user defined sort orders in the Parquet format. @@ -1866,21 +1865,18 @@ class FileMetaDataBuilder::FileMetaDataBuilderImpl { std::unique_ptr current_row_group_builder_; const SchemaDescriptor* schema_; - std::shared_ptr key_value_metadata_; }; std::unique_ptr FileMetaDataBuilder::Make( - const SchemaDescriptor* schema, std::shared_ptr props, - std::shared_ptr key_value_metadata) { + const SchemaDescriptor* schema, std::shared_ptr props) { return std::unique_ptr( - new FileMetaDataBuilder(schema, std::move(props), std::move(key_value_metadata))); + new FileMetaDataBuilder(schema, std::move(props))); } -FileMetaDataBuilder::FileMetaDataBuilder( - const SchemaDescriptor* schema, std::shared_ptr props, - std::shared_ptr key_value_metadata) - : impl_{std::unique_ptr(new FileMetaDataBuilderImpl( - schema, std::move(props), std::move(key_value_metadata)))} {} +FileMetaDataBuilder::FileMetaDataBuilder(const SchemaDescriptor* schema, + std::shared_ptr props) + : impl_{std::unique_ptr( + new FileMetaDataBuilderImpl(schema, std::move(props)))} {} FileMetaDataBuilder::~FileMetaDataBuilder() = default; @@ -1888,7 +1884,10 @@ RowGroupMetaDataBuilder* FileMetaDataBuilder::AppendRowGroup() { return impl_->AppendRowGroup(); } -std::unique_ptr FileMetaDataBuilder::Finish() { return impl_->Finish(); } +std::unique_ptr FileMetaDataBuilder::Finish( + const std::shared_ptr& key_value_metadata) { + return impl_->Finish(key_value_metadata); +} std::unique_ptr FileMetaDataBuilder::GetCryptoMetaData() { return impl_->BuildFileCryptoMetaData(); diff --git a/cpp/src/parquet/metadata.h b/cpp/src/parquet/metadata.h index 277f59a1b6072..7565626801973 100644 --- a/cpp/src/parquet/metadata.h +++ b/cpp/src/parquet/metadata.h @@ -510,8 +510,7 @@ class PARQUET_EXPORT FileMetaDataBuilder { public: // API convenience to get a MetaData reader static std::unique_ptr Make( - const SchemaDescriptor* schema, std::shared_ptr props, - std::shared_ptr key_value_metadata = NULLPTR); + const SchemaDescriptor* schema, std::shared_ptr props); ~FileMetaDataBuilder(); @@ -519,15 +518,15 @@ class PARQUET_EXPORT FileMetaDataBuilder { RowGroupMetaDataBuilder* AppendRowGroup(); // Complete the Thrift structure - std::unique_ptr Finish(); + std::unique_ptr Finish( + const std::shared_ptr& key_value_metadata = NULLPTR); // crypto metadata std::unique_ptr GetCryptoMetaData(); private: - explicit FileMetaDataBuilder( - const SchemaDescriptor* schema, std::shared_ptr props, - std::shared_ptr key_value_metadata = NULLPTR); + explicit FileMetaDataBuilder(const SchemaDescriptor* schema, + std::shared_ptr props); // PIMPL Idiom class FileMetaDataBuilderImpl; std::unique_ptr impl_; diff --git a/cpp/src/parquet/metadata_test.cc b/cpp/src/parquet/metadata_test.cc index a7c7b7b00e717..f792bc03d1adb 100644 --- a/cpp/src/parquet/metadata_test.cc +++ b/cpp/src/parquet/metadata_test.cc @@ -21,6 +21,7 @@ #include "arrow/util/key_value_metadata.h" #include "parquet/file_reader.h" +#include "parquet/file_writer.h" #include "parquet/schema.h" #include "parquet/statistics.h" #include "parquet/test_util.h" @@ -284,16 +285,61 @@ TEST(Metadata, TestKeyValueMetadata) { auto kvmeta = std::make_shared(); kvmeta->Append("test_key", "test_value"); - auto f_builder = FileMetaDataBuilder::Make(&schema, props, kvmeta); + auto f_builder = FileMetaDataBuilder::Make(&schema, props); // Read the metadata - auto f_accessor = f_builder->Finish(); + auto f_accessor = f_builder->Finish(kvmeta); // Key value metadata ASSERT_TRUE(f_accessor->key_value_metadata()); EXPECT_TRUE(f_accessor->key_value_metadata()->Equals(*kvmeta)); } +TEST(Metadata, TestAddKeyValueMetadata) { + schema::NodeVector fields; + fields.push_back(schema::Int32("int_col", Repetition::REQUIRED)); + auto schema = std::static_pointer_cast( + schema::GroupNode::Make("schema", Repetition::REQUIRED, fields)); + + auto kv_meta = std::make_shared(); + kv_meta->Append("test_key_1", "test_value_1"); + kv_meta->Append("test_key_2", "test_value_2_"); + + auto sink = CreateOutputStream(); + auto writer_props = parquet::WriterProperties::Builder().disable_dictionary()->build(); + auto file_writer = + parquet::ParquetFileWriter::Open(sink, schema, writer_props, kv_meta); + + // Key value metadata that will be added to the file. + auto kv_meta_added = std::make_shared(); + kv_meta_added->Append("test_key_2", "test_value_2"); + kv_meta_added->Append("test_key_3", "test_value_3"); + + file_writer->AddKeyValueMetadata(kv_meta_added); + file_writer->Close(); + + // Key value metadata that will be ignored since file writer is closed. + auto kv_meta_ignored = std::make_shared(); + kv_meta_ignored->Append("test_key_4", "test_value_4"); + + PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish()); + auto source = std::make_shared<::arrow::io::BufferReader>(buffer); + auto file_reader = ParquetFileReader::Open(source); + + ASSERT_NE(nullptr, file_reader->metadata()); + ASSERT_NE(nullptr, file_reader->metadata()->key_value_metadata()); + auto read_kv_meta = file_reader->metadata()->key_value_metadata(); + + // Verify keys that were added before file writer was closed are present. + for (int i = 1; i <= 3; ++i) { + auto index = std::to_string(i); + PARQUET_ASSIGN_OR_THROW(auto value, read_kv_meta->Get("test_key_" + index)); + EXPECT_EQ("test_value_" + index, value); + } + // Verify keys that were added after file writer was closed are not present. + EXPECT_FALSE(read_kv_meta->Contains("test_key_4")); +} + TEST(Metadata, TestHasBloomFilter) { std::string dir_string(parquet::test::get_data_dir()); std::string path = dir_string + "/data_index_bloom_encoding_stats.parquet";