Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-41579: [C++][Python][Parquet] Support reading/writing key-value metadata from/to ColumnChunkMetaData #41580

Merged
merged 22 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cpp/src/parquet/column_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1405,6 +1405,10 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
return pages_change_on_record_boundaries_;
}

KeyValueMetadata& key_value_metadata() override {
return metadata_->key_value_metadata();
clee704 marked this conversation as resolved.
Show resolved Hide resolved
}

private:
using ValueEncoderType = typename EncodingTraits<DType>::Encoder;
using TypedStats = TypedStatistics<DType>;
Expand Down
7 changes: 7 additions & 0 deletions cpp/src/parquet/column_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
namespace arrow {

class Array;
class KeyValueMetadata;
clee704 marked this conversation as resolved.
Show resolved Hide resolved

namespace bit_util {
class BitWriter;
Expand All @@ -53,6 +54,8 @@ class Encryptor;
class OffsetIndexBuilder;
class WriterProperties;

using KeyValueMetadata = ::arrow::KeyValueMetadata;
clee704 marked this conversation as resolved.
Show resolved Hide resolved

class PARQUET_EXPORT LevelEncoder {
public:
LevelEncoder();
Expand Down Expand Up @@ -181,6 +184,10 @@ class PARQUET_EXPORT ColumnWriter {
/// \brief The file-level writer properties
virtual const WriterProperties* properties() = 0;

/// \brief Return KeyValueMetadata that can be used to store key-value
/// metadata in ColumnChunkMetaData.
virtual KeyValueMetadata& key_value_metadata() = 0;
clee704 marked this conversation as resolved.
Show resolved Hide resolved

/// \brief Write Apache Arrow columnar data directly to ColumnWriter. Returns
/// error status if the array data type is not compatible with the concrete
/// writer type.
Expand Down
54 changes: 54 additions & 0 deletions cpp/src/parquet/column_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "arrow/util/bit_util.h"
#include "arrow/util/bitmap_builders.h"
#include "arrow/util/config.h"
#include "arrow/util/key_value_metadata.h"

#include "parquet/column_page.h"
#include "parquet/column_reader.h"
Expand All @@ -51,6 +52,9 @@ using schema::PrimitiveNode;

namespace test {

using ::testing::IsNull;
using ::testing::NotNull;

// The default size used in most tests.
const int SMALL_SIZE = 100;
#ifdef PARQUET_VALGRIND
Expand Down Expand Up @@ -385,6 +389,15 @@ class TestPrimitiveWriter : public PrimitiveTypedTest<TestType> {
return metadata_accessor->encoding_stats();
}

std::shared_ptr<const KeyValueMetadata> metadata_key_value_metadata() {
// Metadata accessor must be created lazily.
// This is because the ColumnChunkMetaData semantics dictate the metadata object is
// complete (no changes to the metadata buffer can be made after instantiation)
auto metadata_accessor =
ColumnChunkMetaData::Make(metadata_->contents(), this->descr_);
return metadata_accessor->key_value_metadata();
}

protected:
int64_t values_read_;
// Keep the reader alive as for ByteArray the lifetime of the ByteArray
Expand Down Expand Up @@ -1705,5 +1718,46 @@ TEST(TestColumnWriter, WriteDataPageV2HeaderNullCount) {
}
}

using TestInt32Writer = TestPrimitiveWriter<Int32Type>;

TEST_F(TestInt32Writer, NoWriteKeyValueMetadata) {
auto writer = this->BuildWriter();
writer->Close();
auto key_value_metadata = metadata_key_value_metadata();
ASSERT_THAT(key_value_metadata, IsNull());
}

TEST_F(TestInt32Writer, WriteKeyValueMetadata) {
auto writer = this->BuildWriter();
writer->key_value_metadata().Append("hello", "world");
writer->Close();
auto key_value_metadata = metadata_key_value_metadata();
ASSERT_THAT(key_value_metadata, NotNull());
ASSERT_THAT(key_value_metadata->size(), 1);
ASSERT_OK_AND_ASSIGN(auto value, key_value_metadata->Get("hello"));
ASSERT_THAT(value, "world");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use ASSERT_EQ here so that the test is more readable? Currently it's a bit confusing what's being checked exactly.

Copy link
Contributor Author

@clee704 clee704 Jul 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm against using ASSERT_EQ because the direction is reversed, e.g., ASSERT_EQ(expected, actual), ASSERT_THAT(actual, matcher), and it can hurt the readability when you have both ASSERT_EQ and ASSERT_THAT in the same code. If the readability is a concern, we can write ASSERT_THAT(actual, Eq(expected)). What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm ok with ASSERT_THAT(actual, Eq(expected))

}

TEST_F(TestInt32Writer, WriteKeyValueMetadataEndToEnd) {
auto sink = CreateOutputStream();
{
auto file_writer = ParquetFileWriter::Open(
sink, std::dynamic_pointer_cast<schema::GroupNode>(schema_.schema_root()));
auto rg_writer = file_writer->AppendRowGroup();
auto col_writer = rg_writer->NextColumn();
col_writer->key_value_metadata().Append("foo", "bar");
file_writer->Close();
}
ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
auto file_reader =
ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(buffer));
auto key_value_metadata =
file_reader->metadata()->RowGroup(0)->ColumnChunk(0)->key_value_metadata();
ASSERT_THAT(key_value_metadata, NotNull());
ASSERT_THAT(key_value_metadata->size(), 1);
ASSERT_OK_AND_ASSIGN(auto value, key_value_metadata->Get("foo"));
ASSERT_THAT(value, "bar");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question here.

}

} // namespace test
} // namespace parquet
78 changes: 58 additions & 20 deletions cpp/src/parquet/metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,31 @@ std::shared_ptr<Statistics> MakeColumnStats(const format::ColumnMetaData& meta_d
throw ParquetException("Can't decode page statistics for selected column type");
}

template <typename Metadata>
std::shared_ptr<KeyValueMetadata> CopyKeyValueMetadata(const Metadata& source) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps call this FromThriftKeyValueMetadata?

std::shared_ptr<KeyValueMetadata> metadata = nullptr;
if (source.__isset.key_value_metadata) {
metadata = std::make_shared<KeyValueMetadata>();
for (const auto& it : source.key_value_metadata) {
metadata->Append(it.key, it.value);
}
mapleFU marked this conversation as resolved.
Show resolved Hide resolved
}
return metadata;
}

template <typename Metadata>
void SetKeyValueMetadata(Metadata& metadata, const KeyValueMetadata& source) {
metadata.key_value_metadata.clear();
metadata.key_value_metadata.reserve(static_cast<size_t>(source.size()));
for (int64_t i = 0; i < source.size(); ++i) {
format::KeyValue kv_pair;
kv_pair.__set_key(source.key(i));
kv_pair.__set_value(source.value(i));
metadata.key_value_metadata.push_back(kv_pair);
}
metadata.__isset.key_value_metadata = true;
clee704 marked this conversation as resolved.
Show resolved Hide resolved
}

// MetaData Accessor

// ColumnCryptoMetaData
Expand Down Expand Up @@ -230,6 +255,7 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl {
encoding_stats.count});
}
possible_stats_ = nullptr;
InitKeyValueMetadata();
}

bool Equals(const ColumnChunkMetaDataImpl& other) const {
Expand Down Expand Up @@ -340,7 +366,15 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl {
return std::nullopt;
}

const std::shared_ptr<const KeyValueMetadata>& key_value_metadata() const {
return key_value_metadata_;
}

private:
void InitKeyValueMetadata() {
key_value_metadata_ = CopyKeyValueMetadata(*column_metadata_);
}

mutable std::shared_ptr<Statistics> possible_stats_;
std::vector<Encoding::type> encodings_;
std::vector<PageEncodingStats> encoding_stats_;
Expand All @@ -350,6 +384,7 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl {
const ColumnDescriptor* descr_;
const ReaderProperties properties_;
const ApplicationVersion* writer_version_;
std::shared_ptr<const KeyValueMetadata> key_value_metadata_;
};

std::unique_ptr<ColumnChunkMetaData> ColumnChunkMetaData::Make(
Expand Down Expand Up @@ -468,6 +503,11 @@ bool ColumnChunkMetaData::Equals(const ColumnChunkMetaData& other) const {
return impl_->Equals(*other.impl_);
}

const std::shared_ptr<const KeyValueMetadata>& ColumnChunkMetaData::key_value_metadata()
const {
return impl_->key_value_metadata();
}

// row-group metadata
class RowGroupMetaData::RowGroupMetaDataImpl {
public:
Expand Down Expand Up @@ -863,16 +903,7 @@ class FileMetaData::FileMetaDataImpl {
schema_.updateColumnOrders(column_orders);
}

void InitKeyValueMetadata() {
std::shared_ptr<KeyValueMetadata> metadata = nullptr;
if (metadata_->__isset.key_value_metadata) {
metadata = std::make_shared<KeyValueMetadata>();
for (const auto& it : metadata_->key_value_metadata) {
metadata->Append(it.key, it.value);
}
}
key_value_metadata_ = std::move(metadata);
}
void InitKeyValueMetadata() { key_value_metadata_ = CopyKeyValueMetadata(*metadata_); }
};

std::shared_ptr<FileMetaData> FileMetaData::Make(
Expand Down Expand Up @@ -1518,6 +1549,10 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl {
column_chunk_->meta_data.__set_encodings(std::move(thrift_encodings));
column_chunk_->meta_data.__set_encoding_stats(std::move(thrift_encoding_stats));

if (key_value_metadata_) {
SetKeyValueMetadata(column_chunk_->meta_data, *key_value_metadata_);
}

const auto& encrypt_md =
properties_->column_encryption_properties(column_->path()->ToDotString());
// column is encrypted
Expand Down Expand Up @@ -1584,6 +1619,13 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl {
return column_chunk_->meta_data.total_compressed_size;
}

KeyValueMetadata& key_value_metadata() {
if (!key_value_metadata_) {
key_value_metadata_ = std::make_unique<KeyValueMetadata>();
}
return *key_value_metadata_;
clee704 marked this conversation as resolved.
Show resolved Hide resolved
}

private:
void Init(format::ColumnChunk* column_chunk) {
column_chunk_ = column_chunk;
Expand All @@ -1598,6 +1640,7 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl {
std::unique_ptr<format::ColumnChunk> owned_column_chunk_;
const std::shared_ptr<WriterProperties> properties_;
const ColumnDescriptor* column_;
std::unique_ptr<KeyValueMetadata> key_value_metadata_;
};

std::unique_ptr<ColumnChunkMetaDataBuilder> ColumnChunkMetaDataBuilder::Make(
Expand Down Expand Up @@ -1659,6 +1702,10 @@ int64_t ColumnChunkMetaDataBuilder::total_compressed_size() const {
return impl_->total_compressed_size();
}

KeyValueMetadata& ColumnChunkMetaDataBuilder::key_value_metadata() {
return impl_->key_value_metadata();
}

class RowGroupMetaDataBuilder::RowGroupMetaDataBuilderImpl {
public:
explicit RowGroupMetaDataBuilderImpl(std::shared_ptr<WriterProperties> props,
Expand Down Expand Up @@ -1853,16 +1900,7 @@ class FileMetaDataBuilder::FileMetaDataBuilderImpl {
} else if (key_value_metadata) {
key_value_metadata_ = key_value_metadata_->Merge(*key_value_metadata);
}
metadata_->key_value_metadata.clear();
metadata_->key_value_metadata.reserve(
static_cast<size_t>(key_value_metadata_->size()));
for (int64_t i = 0; i < key_value_metadata_->size(); ++i) {
format::KeyValue kv_pair;
kv_pair.__set_key(key_value_metadata_->key(i));
kv_pair.__set_value(key_value_metadata_->value(i));
metadata_->key_value_metadata.push_back(std::move(kv_pair));
}
metadata_->__isset.key_value_metadata = true;
SetKeyValueMetadata(*metadata_, *key_value_metadata_);
}

int32_t file_version = 0;
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/parquet/metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ class PARQUET_EXPORT ColumnChunkMetaData {
std::unique_ptr<ColumnCryptoMetaData> crypto_metadata() const;
std::optional<IndexLocation> GetColumnIndexLocation() const;
std::optional<IndexLocation> GetOffsetIndexLocation() const;
const std::shared_ptr<const KeyValueMetadata>& key_value_metadata() const;

private:
explicit ColumnChunkMetaData(
Expand Down Expand Up @@ -452,8 +453,12 @@ class PARQUET_EXPORT ColumnChunkMetaDataBuilder {
// column chunk
// Used when a dataset is spread across multiple files
void set_file_path(const std::string& path);

// column metadata
void SetStatistics(const EncodedStatistics& stats);

KeyValueMetadata& key_value_metadata();
clee704 marked this conversation as resolved.
Show resolved Hide resolved

// get the column descriptor
const ColumnDescriptor* descr() const;

Expand Down
32 changes: 25 additions & 7 deletions cpp/src/parquet/printer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,25 @@ void PrintPageEncodingStats(std::ostream& stream,
// the fixed initial size is just for an example
#define COL_WIDTH 30

void Put(std::ostream& stream, char c, int n) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you give this a more distinctive name, such as PutChars?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

for (int i = 0; i < n; ++i) {
stream.put(c);
}
}

void PrintKeyValueMetadata(std::ostream& stream,
const KeyValueMetadata& key_value_metadata,
int indent_level = 0, int indent_width = 1) {
clee704 marked this conversation as resolved.
Show resolved Hide resolved
const int64_t size_of_key_value_metadata = key_value_metadata.size();
Put(stream, ' ', indent_level * indent_width);
stream << "Key Value Metadata: " << size_of_key_value_metadata << " entries\n";
for (int64_t i = 0; i < size_of_key_value_metadata; i++) {
Put(stream, ' ', (indent_level + 1) * indent_width);
stream << "Key nr " << i << " " << key_value_metadata.key(i) << ": "
<< key_value_metadata.value(i) << "\n";
}
}

void ParquetFilePrinter::DebugPrint(std::ostream& stream, std::list<int> selected_columns,
bool print_values, bool format_dump,
bool print_key_value_metadata, const char* filename) {
Expand All @@ -76,12 +95,7 @@ void ParquetFilePrinter::DebugPrint(std::ostream& stream, std::list<int> selecte

if (print_key_value_metadata && file_metadata->key_value_metadata()) {
auto key_value_metadata = file_metadata->key_value_metadata();
int64_t size_of_key_value_metadata = key_value_metadata->size();
stream << "Key Value File Metadata: " << size_of_key_value_metadata << " entries\n";
for (int64_t i = 0; i < size_of_key_value_metadata; i++) {
stream << " Key nr " << i << " " << key_value_metadata->key(i) << ": "
<< key_value_metadata->value(i) << "\n";
}
PrintKeyValueMetadata(stream, *key_value_metadata);
}

stream << "Number of RowGroups: " << file_metadata->num_row_groups() << "\n";
Expand Down Expand Up @@ -136,7 +150,11 @@ void ParquetFilePrinter::DebugPrint(std::ostream& stream, std::list<int> selecte
std::shared_ptr<Statistics> stats = column_chunk->statistics();

const ColumnDescriptor* descr = file_metadata->schema()->Column(i);
stream << "Column " << i << std::endl << " Values: " << column_chunk->num_values();
stream << "Column " << i << std::endl;
if (print_key_value_metadata && column_chunk->key_value_metadata()) {
PrintKeyValueMetadata(stream, *column_chunk->key_value_metadata(), 1, 2);
}
stream << " Values: " << column_chunk->num_values();
if (column_chunk->is_stats_set()) {
std::string min = stats->EncodeMin(), max = stats->EncodeMax();
stream << ", Null Values: " << stats->null_count()
Expand Down
1 change: 1 addition & 0 deletions python/pyarrow/_parquet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil:
unique_ptr[CColumnCryptoMetaData] crypto_metadata() const
optional[ParquetIndexLocation] GetColumnIndexLocation() const
optional[ParquetIndexLocation] GetOffsetIndexLocation() const
shared_ptr[const CKeyValueMetadata] key_value_metadata() const

struct CSortingColumn" parquet::SortingColumn":
int column_idx
Expand Down
13 changes: 13 additions & 0 deletions python/pyarrow/_parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,19 @@ cdef class ColumnChunkMetaData(_Weakrefable):
"""Whether the column chunk has a column index"""
return self.metadata.GetColumnIndexLocation().has_value()

@property
def metadata(self):
"""Additional metadata as key value pairs (dict[bytes, bytes])."""
cdef:
unordered_map[c_string, c_string] metadata
const CKeyValueMetadata* underlying_metadata
underlying_metadata = self.metadata.key_value_metadata().get()
if underlying_metadata != NULL:
underlying_metadata.ToUnorderedMap(&metadata)
return metadata
else:
return None
clee704 marked this conversation as resolved.
Show resolved Hide resolved


cdef class SortingColumn:
"""
Expand Down
Binary file not shown.
7 changes: 7 additions & 0 deletions python/pyarrow/tests/parquet/test_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -772,3 +772,10 @@ def test_write_metadata_fs_file_combinations(tempdir, s3_example_s3fs):
assert meta1.read_bytes() == meta2.read_bytes() \
== meta3.read_bytes() == meta4.read_bytes() \
== s3_fs.open(meta5).read()


def test_column_chunk_key_value_metadata(datadir):
metadata = pq.read_metadata(datadir / 'column-chunk-key-value-metadata.parquet')
key_value_metadata = metadata.row_group(0).column(0).metadata
print(key_value_metadata)
clee704 marked this conversation as resolved.
Show resolved Hide resolved
assert key_value_metadata[b'foo'] == b'bar'
clee704 marked this conversation as resolved.
Show resolved Hide resolved
Loading