Skip to content

Commit

Permalink
GH-34888: [C++][Parquet] Writer supports adding extra kv meta (#34889)
Browse files Browse the repository at this point in the history
### Rationale for this change

Parquet specs support storing key-value metadata provided by the user. However, the parquet-cpp writer can only set it via ParquetFileWriter::Open(). Sometimes user may want to add extra information to it while writing. So it is good to support adding extra key-value metadata any time before closing the file writer.

### What changes are included in this PR?

Add a new interface `void AddKeyValueMetadata(std::shared_ptr<const KeyValueMetadata> key_value_metadata)` to the `ParquetFileWriter` class. User can now add more key-value metadata to the file if not closed.

### Are these changes tested?

Added a new `Metadata.TestAddKeyValueMetadata` test to verify key-value metadata added before closing the writer are well preserved.

### Are there any user-facing changes?

Yes, user can add custom key-value metadata whenever writer is not closed.
* Closes: #34888

Lead-authored-by: Gang Wu <[email protected]>
Co-authored-by: Will Jones <[email protected]>
Signed-off-by: Will Jones <[email protected]>
  • Loading branch information
2 people authored and raulcd committed Apr 17, 2023
1 parent 0434ab6 commit 6dd95ee
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 15 deletions.
29 changes: 24 additions & 5 deletions cpp/src/parquet/file_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@

#include "parquet/file_writer.h"

#include <cstddef>
#include <memory>
#include <ostream>
#include <string>
#include <utility>
#include <vector>

#include "arrow/util/key_value_metadata.h"
#include "arrow/util/logging.h"
#include "parquet/column_writer.h"
#include "parquet/encryption/encryption_internal.h"
#include "parquet/encryption/internal_file_encryptor.h"
Expand Down Expand Up @@ -338,7 +339,7 @@ class FileSerializer : public ParquetFileWriter::Contents {
auto file_encryption_properties = properties_->file_encryption_properties();

if (file_encryption_properties == nullptr) { // Non encrypted file.
file_metadata_ = metadata_->Finish();
file_metadata_ = metadata_->Finish(key_value_metadata_);
WriteFileMetaData(*file_metadata_, sink_.get());
} else { // Encrypted file
CloseEncryptedFile(file_encryption_properties);
Expand Down Expand Up @@ -376,6 +377,15 @@ class FileSerializer : public ParquetFileWriter::Contents {

RowGroupWriter* AppendBufferedRowGroup() override { return AppendRowGroup(true); }

void AddKeyValueMetadata(
const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) override {
if (key_value_metadata_ == nullptr) {
key_value_metadata_ = std::move(key_value_metadata);
} else if (key_value_metadata != nullptr) {
key_value_metadata_ = key_value_metadata_->Merge(*key_value_metadata);
}
}

~FileSerializer() override {
try {
Close();
Expand All @@ -394,7 +404,7 @@ class FileSerializer : public ParquetFileWriter::Contents {
properties_(std::move(properties)),
num_row_groups_(0),
num_rows_(0),
metadata_(FileMetaDataBuilder::Make(&schema_, properties_, key_value_metadata_)) {
metadata_(FileMetaDataBuilder::Make(&schema_, properties_)) {
PARQUET_ASSIGN_OR_THROW(int64_t position, sink_->Tell());
if (position == 0) {
StartFile();
Expand All @@ -407,7 +417,7 @@ class FileSerializer : public ParquetFileWriter::Contents {
// Encrypted file with encrypted footer
if (file_encryption_properties->encrypted_footer()) {
// encrypted footer
file_metadata_ = metadata_->Finish();
file_metadata_ = metadata_->Finish(key_value_metadata_);

PARQUET_ASSIGN_OR_THROW(int64_t position, sink_->Tell());
uint64_t metadata_start = static_cast<uint64_t>(position);
Expand All @@ -422,7 +432,7 @@ class FileSerializer : public ParquetFileWriter::Contents {
sink_->Write(reinterpret_cast<uint8_t*>(&footer_and_crypto_len), 4));
PARQUET_THROW_NOT_OK(sink_->Write(kParquetEMagic, 4));
} else { // Encrypted file with plaintext footer
file_metadata_ = metadata_->Finish();
file_metadata_ = metadata_->Finish(key_value_metadata_);
auto footer_signing_encryptor = file_encryptor_->GetFooterSigningEncryptor();
WriteEncryptedFileMetadata(*file_metadata_, sink_.get(), footer_signing_encryptor,
false);
Expand Down Expand Up @@ -613,6 +623,15 @@ RowGroupWriter* ParquetFileWriter::AppendRowGroup(int64_t num_rows) {
return AppendRowGroup();
}

void ParquetFileWriter::AddKeyValueMetadata(
const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) {
if (contents_) {
contents_->AddKeyValueMetadata(key_value_metadata);
} else {
throw ParquetException("Cannot add key-value metadata to closed file");
}
}

const std::shared_ptr<WriterProperties>& ParquetFileWriter::properties() const {
return contents_->properties();
}
Expand Down
10 changes: 10 additions & 0 deletions cpp/src/parquet/file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ class PARQUET_EXPORT ParquetFileWriter {
return key_value_metadata_;
}

virtual void AddKeyValueMetadata(
const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) = 0;

// Return const-pointer to make it clear that this object is not to be copied
const SchemaDescriptor* schema() const { return &schema_; }

Expand Down Expand Up @@ -209,6 +212,13 @@ class PARQUET_EXPORT ParquetFileWriter {
/// until the next call to AppendRowGroup or AppendBufferedRowGroup or Close.
RowGroupWriter* AppendBufferedRowGroup();

/// \brief Add key-value metadata to the file.
/// \param[in] key_value_metadata the metadata to add.
/// \note This will overwrite any existing metadata with the same key.
/// \throw ParquetException if Close() has been called.
void AddKeyValueMetadata(
const std::shared_ptr<const KeyValueMetadata>& key_value_metadata);

/// Number of columns.
///
/// This number is fixed during the lifetime of the writer as it is determined via
Expand Down
24 changes: 19 additions & 5 deletions cpp/src/parquet/metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1740,7 +1740,6 @@ void RowGroupMetaDataBuilder::Finish(int64_t total_bytes_written,
}

// file metadata
// TODO(PARQUET-595) Support key_value_metadata
class FileMetaDataBuilder::FileMetaDataBuilderImpl {
public:
explicit FileMetaDataBuilderImpl(
Expand Down Expand Up @@ -1797,15 +1796,21 @@ class FileMetaDataBuilder::FileMetaDataBuilderImpl {
}
}

std::unique_ptr<FileMetaData> Finish() {
std::unique_ptr<FileMetaData> Finish(
const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) {
int64_t total_rows = 0;
for (auto row_group : row_groups_) {
total_rows += row_group.num_rows;
}
metadata_->__set_num_rows(total_rows);
metadata_->__set_row_groups(row_groups_);

if (key_value_metadata_) {
if (key_value_metadata_ || key_value_metadata) {
if (!key_value_metadata_) {
key_value_metadata_ = key_value_metadata;
} else if (key_value_metadata) {
key_value_metadata_ = key_value_metadata_->Merge(*key_value_metadata);
}
metadata_->key_value_metadata.clear();
metadata_->key_value_metadata.reserve(key_value_metadata_->size());
for (int64_t i = 0; i < key_value_metadata_->size(); ++i) {
Expand All @@ -1829,7 +1834,7 @@ class FileMetaDataBuilder::FileMetaDataBuilderImpl {
metadata_->__set_version(file_version);
metadata_->__set_created_by(properties_->created_by());

// Users cannot set the `ColumnOrder` since we donot not have user defined sort order
// Users cannot set the `ColumnOrder` since we do not have user defined sort order
// in the spec yet.
// We always default to `TYPE_DEFINED_ORDER`. We can expose it in
// the API once we have user defined sort orders in the Parquet format.
Expand Down Expand Up @@ -1910,6 +1915,12 @@ std::unique_ptr<FileMetaDataBuilder> FileMetaDataBuilder::Make(
new FileMetaDataBuilder(schema, std::move(props), std::move(key_value_metadata)));
}

std::unique_ptr<FileMetaDataBuilder> FileMetaDataBuilder::Make(
const SchemaDescriptor* schema, std::shared_ptr<WriterProperties> props) {
return std::unique_ptr<FileMetaDataBuilder>(
new FileMetaDataBuilder(schema, std::move(props)));
}

FileMetaDataBuilder::FileMetaDataBuilder(
const SchemaDescriptor* schema, std::shared_ptr<WriterProperties> props,
std::shared_ptr<const KeyValueMetadata> key_value_metadata)
Expand All @@ -1926,7 +1937,10 @@ void FileMetaDataBuilder::SetPageIndexLocation(const PageIndexLocation& location
impl_->SetPageIndexLocation(location);
}

std::unique_ptr<FileMetaData> FileMetaDataBuilder::Finish() { return impl_->Finish(); }
std::unique_ptr<FileMetaData> FileMetaDataBuilder::Finish(
const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) {
return impl_->Finish(key_value_metadata);
}

std::unique_ptr<FileCryptoMetaData> FileMetaDataBuilder::GetCryptoMetaData() {
return impl_->BuildFileCryptoMetaData();
Expand Down
11 changes: 8 additions & 3 deletions cpp/src/parquet/metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -523,10 +523,14 @@ struct PageIndexLocation {

class PARQUET_EXPORT FileMetaDataBuilder {
public:
// API convenience to get a MetaData reader
ARROW_DEPRECATED("Deprecated in 12.0.0. Use overload without KeyValueMetadata instead.")
static std::unique_ptr<FileMetaDataBuilder> Make(
const SchemaDescriptor* schema, std::shared_ptr<WriterProperties> props,
std::shared_ptr<const KeyValueMetadata> key_value_metadata = NULLPTR);
std::shared_ptr<const KeyValueMetadata> key_value_metadata);

// API convenience to get a MetaData builder
static std::unique_ptr<FileMetaDataBuilder> Make(
const SchemaDescriptor* schema, std::shared_ptr<WriterProperties> props);

~FileMetaDataBuilder();

Expand All @@ -537,7 +541,8 @@ class PARQUET_EXPORT FileMetaDataBuilder {
void SetPageIndexLocation(const PageIndexLocation& location);

// Complete the Thrift structure
std::unique_ptr<FileMetaData> Finish();
std::unique_ptr<FileMetaData> Finish(
const std::shared_ptr<const KeyValueMetadata>& key_value_metadata = NULLPTR);

// crypto metadata
std::unique_ptr<FileCryptoMetaData> GetCryptoMetaData();
Expand Down
51 changes: 49 additions & 2 deletions cpp/src/parquet/metadata_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include "arrow/util/key_value_metadata.h"
#include "parquet/file_reader.h"
#include "parquet/file_writer.h"
#include "parquet/schema.h"
#include "parquet/statistics.h"
#include "parquet/test_util.h"
Expand Down Expand Up @@ -284,16 +285,62 @@ TEST(Metadata, TestKeyValueMetadata) {
auto kvmeta = std::make_shared<KeyValueMetadata>();
kvmeta->Append("test_key", "test_value");

auto f_builder = FileMetaDataBuilder::Make(&schema, props, kvmeta);
auto f_builder = FileMetaDataBuilder::Make(&schema, props);

// Read the metadata
auto f_accessor = f_builder->Finish();
auto f_accessor = f_builder->Finish(kvmeta);

// Key value metadata
ASSERT_TRUE(f_accessor->key_value_metadata());
EXPECT_TRUE(f_accessor->key_value_metadata()->Equals(*kvmeta));
}

TEST(Metadata, TestAddKeyValueMetadata) {
schema::NodeVector fields;
fields.push_back(schema::Int32("int_col", Repetition::REQUIRED));
auto schema = std::static_pointer_cast<schema::GroupNode>(
schema::GroupNode::Make("schema", Repetition::REQUIRED, fields));

auto kv_meta = std::make_shared<KeyValueMetadata>();
kv_meta->Append("test_key_1", "test_value_1");
kv_meta->Append("test_key_2", "test_value_2_");

auto sink = CreateOutputStream();
auto writer_props = parquet::WriterProperties::Builder().disable_dictionary()->build();
auto file_writer =
parquet::ParquetFileWriter::Open(sink, schema, writer_props, kv_meta);

// Key value metadata that will be added to the file.
auto kv_meta_added = std::make_shared<KeyValueMetadata>();
kv_meta_added->Append("test_key_2", "test_value_2");
kv_meta_added->Append("test_key_3", "test_value_3");

file_writer->AddKeyValueMetadata(kv_meta_added);
file_writer->Close();

// Throw if appending key value metadata to closed file.
auto kv_meta_ignored = std::make_shared<KeyValueMetadata>();
kv_meta_ignored->Append("test_key_4", "test_value_4");
EXPECT_THROW(file_writer->AddKeyValueMetadata(kv_meta_ignored), ParquetException);

PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish());
auto source = std::make_shared<::arrow::io::BufferReader>(buffer);
auto file_reader = ParquetFileReader::Open(source);

ASSERT_NE(nullptr, file_reader->metadata());
ASSERT_NE(nullptr, file_reader->metadata()->key_value_metadata());
auto read_kv_meta = file_reader->metadata()->key_value_metadata();

// Verify keys that were added before file writer was closed are present.
for (int i = 1; i <= 3; ++i) {
auto index = std::to_string(i);
PARQUET_ASSIGN_OR_THROW(auto value, read_kv_meta->Get("test_key_" + index));
EXPECT_EQ("test_value_" + index, value);
}
// Verify keys that were added after file writer was closed are not present.
EXPECT_FALSE(read_kv_meta->Contains("test_key_4"));
}

TEST(Metadata, TestHasBloomFilter) {
std::string dir_string(parquet::test::get_data_dir());
std::string path = dir_string + "/data_index_bloom_encoding_stats.parquet";
Expand Down

0 comments on commit 6dd95ee

Please sign in to comment.