From fa36cde1105c8c82ef6407cb991b83d1ca67258a Mon Sep 17 00:00:00 2001
From: Sutou Kouhei <kou@clear-code.com>
Date: Sat, 23 Mar 2024 07:24:33 +0900
Subject: [PATCH] GH-40026: [C++][FS][Azure] Add support for reading user
 defined metadata (#40671)

### Rationale for this change

Users may want to set user defined metadata and/or properties such as `Content-Type`.

### What changes are included in this PR?

* Write user defined metadata.
* But the following metadata are written as properties:
  * `Content-Type`
  * `Content-Encoding`
  * `Content-Language`
  * `Content-Disposition`
  * `Cache-Control`
* The following metadata is ignored because corresponding properties are auto generated:
  * `Content-Hash`

### Are these changes tested?

Yes.

### Are there any user-facing changes?

Yes.
* GitHub Issue: #40026

Authored-by: Sutou Kouhei <kou@clear-code.com>
Signed-off-by: Sutou Kouhei <kou@clear-code.com>
---
 cpp/src/arrow/filesystem/azurefs.cc      | 43 ++++++++++++------
 cpp/src/arrow/filesystem/azurefs_test.cc | 58 ++++++++++++++----------
 2 files changed, 64 insertions(+), 37 deletions(-)

diff --git a/cpp/src/arrow/filesystem/azurefs.cc b/cpp/src/arrow/filesystem/azurefs.cc
index 8b7358fbcf2c8..260478b068ed1 100644
--- a/cpp/src/arrow/filesystem/azurefs.cc
+++ b/cpp/src/arrow/filesystem/azurefs.cc
@@ -547,7 +547,6 @@ std::shared_ptr<const KeyValueMetadata> PropertiesToMetadata(
   auto metadata = std::make_shared<KeyValueMetadata>();
   // Not supported yet:
   // * properties.ObjectReplicationSourceProperties
-  // * properties.Metadata
   //
   // They may have the same key defined in the following
   // metadata->Append() list. If we have duplicated key in metadata,
@@ -669,16 +668,33 @@ std::shared_ptr<const KeyValueMetadata> PropertiesToMetadata(
     metadata->Append("Last-Accessed-On", properties.LastAccessedOn.Value().ToString());
   }
   metadata->Append("Has-Legal-Hold", FormatValue<BooleanType>(properties.HasLegalHold));
+  for (const auto& [key, value] : properties.Metadata) {
+    metadata->Append(key, value);
+  }
   return metadata;
 }
 
-Storage::Metadata ArrowMetadataToAzureMetadata(
-    const std::shared_ptr<const KeyValueMetadata>& arrow_metadata) {
-  Storage::Metadata azure_metadata;
-  for (auto key_value : arrow_metadata->sorted_pairs()) {
-    azure_metadata[key_value.first] = key_value.second;
+void ArrowMetadataToCommitBlockListOptions(
+    const std::shared_ptr<const KeyValueMetadata>& arrow_metadata,
+    Blobs::CommitBlockListOptions& options) {
+  using ::arrow::internal::AsciiEqualsCaseInsensitive;
+  for (auto& [key, value] : arrow_metadata->sorted_pairs()) {
+    if (AsciiEqualsCaseInsensitive(key, "Content-Type")) {
+      options.HttpHeaders.ContentType = value;
+    } else if (AsciiEqualsCaseInsensitive(key, "Content-Encoding")) {
+      options.HttpHeaders.ContentEncoding = value;
+    } else if (AsciiEqualsCaseInsensitive(key, "Content-Language")) {
+      options.HttpHeaders.ContentLanguage = value;
+    } else if (AsciiEqualsCaseInsensitive(key, "Content-Hash")) {
+      // Ignore: auto-generated value
+    } else if (AsciiEqualsCaseInsensitive(key, "Content-Disposition")) {
+      options.HttpHeaders.ContentDisposition = value;
+    } else if (AsciiEqualsCaseInsensitive(key, "Cache-Control")) {
+      options.HttpHeaders.CacheControl = value;
+    } else {
+      options.Metadata[key] = value;
+    }
   }
-  return azure_metadata;
 }
 
 class ObjectInputFile final : public io::RandomAccessFile {
@@ -864,9 +880,7 @@ Result<Blobs::Models::GetBlockListResult> GetBlockList(
 
 Status CommitBlockList(std::shared_ptr<Storage::Blobs::BlockBlobClient> block_blob_client,
                        const std::vector<std::string>& block_ids,
-                       const Storage::Metadata& metadata) {
-  Blobs::CommitBlockListOptions options;
-  options.Metadata = metadata;
+                       const Blobs::CommitBlockListOptions& options) {
   try {
     // CommitBlockList puts all block_ids in the latest element. That means in the case of
     // overlapping block_ids the newly staged block ids will always replace the
@@ -891,9 +905,10 @@ class ObjectAppendStream final : public io::OutputStream {
         io_context_(io_context),
         location_(location) {
     if (metadata && metadata->size() != 0) {
-      metadata_ = ArrowMetadataToAzureMetadata(metadata);
+      ArrowMetadataToCommitBlockListOptions(metadata, commit_block_list_options_);
     } else if (options.default_metadata && options.default_metadata->size() != 0) {
-      metadata_ = ArrowMetadataToAzureMetadata(options.default_metadata);
+      ArrowMetadataToCommitBlockListOptions(options.default_metadata,
+                                            commit_block_list_options_);
     }
   }
 
@@ -996,7 +1011,7 @@ class ObjectAppendStream final : public io::OutputStream {
       // flush. This also avoids some unhandled errors when flushing in the destructor.
       return Status::OK();
     }
-    return CommitBlockList(block_blob_client_, block_ids_, metadata_);
+    return CommitBlockList(block_blob_client_, block_ids_, commit_block_list_options_);
   }
 
  private:
@@ -1053,7 +1068,7 @@ class ObjectAppendStream final : public io::OutputStream {
   bool initialised_ = false;
   int64_t pos_ = 0;
   std::vector<std::string> block_ids_;
-  Storage::Metadata metadata_;
+  Blobs::CommitBlockListOptions commit_block_list_options_;
 };
 
 bool IsDfsEmulator(const AzureOptions& options) {
diff --git a/cpp/src/arrow/filesystem/azurefs_test.cc b/cpp/src/arrow/filesystem/azurefs_test.cc
index bd741fde8cc23..7ea5eb446bc12 100644
--- a/cpp/src/arrow/filesystem/azurefs_test.cc
+++ b/cpp/src/arrow/filesystem/azurefs_test.cc
@@ -2458,38 +2458,50 @@ TEST_F(TestAzuriteFileSystem, WriteMetadata) {
   ASSERT_OK(output->Close());
 
   // Verify the metadata has been set.
-  // TODO(GH-40025): Use `AzureFileSystem` to fetch metadata for this assertion.
-  auto blob_metadata = blob_service_client_->GetBlobContainerClient(data.container_name)
-                           .GetBlockBlobClient(blob_path)
-                           .GetProperties()
-                           .Value.Metadata;
-  EXPECT_EQ(Core::CaseInsensitiveMap{std::make_pair("foo", "bar")}, blob_metadata);
+  ASSERT_OK_AND_ASSIGN(auto input, fs()->OpenInputStream(full_path));
+  ASSERT_OK_AND_ASSIGN(auto metadata, input->ReadMetadata());
+  ASSERT_OK_AND_ASSIGN(auto foo_value, metadata->Get("foo"));
+  ASSERT_EQ("bar", foo_value);
 
   // Check that explicit metadata overrides the defaults.
-  ASSERT_OK_AND_ASSIGN(
-      output, fs_with_defaults->OpenOutputStream(
-                  full_path, /*metadata=*/arrow::key_value_metadata({{"bar", "foo"}})));
+  ASSERT_OK_AND_ASSIGN(output,
+                       fs_with_defaults->OpenOutputStream(
+                           full_path, arrow::key_value_metadata({{"bar", "foo"}})));
   ASSERT_OK(output->Write(expected));
   ASSERT_OK(output->Close());
-  // TODO(GH-40025): Use `AzureFileSystem` to fetch metadata for this assertion.
-  blob_metadata = blob_service_client_->GetBlobContainerClient(data.container_name)
-                      .GetBlockBlobClient(blob_path)
-                      .GetProperties()
-                      .Value.Metadata;
+  ASSERT_OK_AND_ASSIGN(input, fs()->OpenInputStream(full_path));
+  ASSERT_OK_AND_ASSIGN(metadata, input->ReadMetadata());
   // Defaults are overwritten and not merged.
-  EXPECT_EQ(Core::CaseInsensitiveMap{std::make_pair("bar", "foo")}, blob_metadata);
+  ASSERT_NOT_OK(metadata->Get("foo"));
+  ASSERT_OK_AND_ASSIGN(auto bar_value, metadata->Get("bar"));
+  ASSERT_EQ("foo", bar_value);
 
   // Metadata can be written without writing any data.
-  ASSERT_OK_AND_ASSIGN(
-      output, fs_with_defaults->OpenAppendStream(
-                  full_path, /*metadata=*/arrow::key_value_metadata({{"bar", "baz"}})));
+  ASSERT_OK_AND_ASSIGN(output,
+                       fs_with_defaults->OpenAppendStream(
+                           full_path, arrow::key_value_metadata({{"bar", "baz"}})));
   ASSERT_OK(output->Close());
-  blob_metadata = blob_service_client_->GetBlobContainerClient(data.container_name)
-                      .GetBlockBlobClient(blob_path)
-                      .GetProperties()
-                      .Value.Metadata;
+  ASSERT_OK_AND_ASSIGN(input, fs()->OpenInputStream(full_path));
+  ASSERT_OK_AND_ASSIGN(metadata, input->ReadMetadata());
   // Defaults are overwritten and not merged.
-  EXPECT_EQ(Core::CaseInsensitiveMap{std::make_pair("bar", "baz")}, blob_metadata);
+  ASSERT_NOT_OK(metadata->Get("foo"));
+  ASSERT_OK_AND_ASSIGN(bar_value, metadata->Get("bar"));
+  ASSERT_EQ("baz", bar_value);
+}
+
+TEST_F(TestAzuriteFileSystem, WriteMetadataHttpHeaders) {
+  auto data = SetUpPreexistingData();
+  ASSERT_OK_AND_ASSIGN(auto output,
+                       fs()->OpenOutputStream(
+                           data.ObjectPath(),
+                           arrow::key_value_metadata({{"Content-Type", "text/plain"}})));
+  ASSERT_OK(output->Write(PreexistingData::kLoremIpsum));
+  ASSERT_OK(output->Close());
+
+  ASSERT_OK_AND_ASSIGN(auto input, fs()->OpenInputStream(data.ObjectPath()));
+  ASSERT_OK_AND_ASSIGN(auto metadata, input->ReadMetadata());
+  ASSERT_OK_AND_ASSIGN(auto content_type, metadata->Get("Content-Type"));
+  ASSERT_EQ("text/plain", content_type);
 }
 
 TEST_F(TestAzuriteFileSystem, OpenOutputStreamSmall) {