Skip to content

Commit

Permalink
apacheGH-38330: [C++][Azure] Use properties for input stream metadata
Browse files Browse the repository at this point in the history
Instead of user defined metadata.
  • Loading branch information
kou committed Oct 31, 2023
1 parent efd945d commit c0c316a
Show file tree
Hide file tree
Showing 2 changed files with 201 additions and 16 deletions.
150 changes: 143 additions & 7 deletions cpp/src/arrow/filesystem/azurefs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "arrow/filesystem/util_internal.h"
#include "arrow/result.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/formatting.h"
#include "arrow/util/future.h"
#include "arrow/util/key_value_metadata.h"
#include "arrow/util/logging.h"
Expand Down Expand Up @@ -150,13 +151,148 @@ Status ErrorToStatus(const std::string& prefix,
return Status::IOError(prefix, " Azure Error: ", exception.what());
}

template <typename ObjectResult>
std::shared_ptr<const KeyValueMetadata> GetObjectMetadata(const ObjectResult& result) {
auto md = std::make_shared<KeyValueMetadata>();
for (auto prop : result) {
md->Append(prop.first, prop.second);
template <typename ArrowType>
std::string FormatValue(typename TypeTraits<ArrowType>::CType value) {
struct StringAppender {
std::string string;
Status operator()(std::string_view view) {
string.append(view.data(), view.size());
return Status::OK();
}
} appender;
arrow::internal::StringFormatter<ArrowType> formatter;
ARROW_UNUSED(formatter(value, appender));
return appender.string;
}

std::shared_ptr<const KeyValueMetadata> PropertiesToMetadata(
const Azure::Storage::Blobs::Models::BlobProperties& properties) {
auto metadata = std::make_shared<KeyValueMetadata>();
// Not supported yet:
// * properties.ObjectReplicationSourceProperties
// * properties.Metadata
//
// They may have the same key defined in the following
// metadata->Append() list. If we have duplicated key in metadata,
// the first value may be only used by users because
// KeyValueMetadata::Get() returns the first found value. Note that
// users can use all values by using KeyValueMetadata::keys() and
// KeyValueMetadata::values().
if (properties.ImmutabilityPolicy.HasValue()) {
metadata->Append("Immutability-Policy-Expires-On",
properties.ImmutabilityPolicy.Value().ExpiresOn.ToString());
metadata->Append("Immutability-Policy-Mode",
properties.ImmutabilityPolicy.Value().PolicyMode.ToString());
}
metadata->Append("Content-Type", properties.HttpHeaders.ContentType);
metadata->Append("Content-Encoding", properties.HttpHeaders.ContentEncoding);
metadata->Append("Content-Language", properties.HttpHeaders.ContentEncoding);
const auto& content_hash = properties.HttpHeaders.ContentHash.Value;
metadata->Append("Content-Hash", HexEncode(content_hash.data(), content_hash.size()));
metadata->Append("Content-Disposition", properties.HttpHeaders.ContentDisposition);
metadata->Append("Cache-Control", properties.HttpHeaders.CacheControl);
metadata->Append("Last-Modified", properties.LastModified.ToString());
metadata->Append("Created-On", properties.CreatedOn.ToString());
if (properties.ObjectReplicationDestinationPolicyId.HasValue()) {
metadata->Append("Object-Replication-Destination-Policy-Id",
properties.ObjectReplicationDestinationPolicyId.Value());
}
metadata->Append("Blob-Type", properties.BlobType.ToString());
if (properties.CopyCompletedOn.HasValue()) {
metadata->Append("Copy-Completed-On", properties.CopyCompletedOn.Value().ToString());
}
if (properties.CopyStatusDescription.HasValue()) {
metadata->Append("Copy-Status-Description", properties.CopyStatusDescription.Value());
}
if (properties.CopyId.HasValue()) {
metadata->Append("Copy-Id", properties.CopyId.Value());
}
if (properties.CopyProgress.HasValue()) {
metadata->Append("Copy-Progress", properties.CopyProgress.Value());
}
if (properties.CopySource.HasValue()) {
metadata->Append("Copy-Source", properties.CopySource.Value());
}
if (properties.CopyStatus.HasValue()) {
metadata->Append("Copy-Status", properties.CopyStatus.Value().ToString());
}
if (properties.IsIncrementalCopy.HasValue()) {
metadata->Append("Is-Incremental-Copy",
FormatValue<BooleanType>(properties.IsIncrementalCopy.Value()));
}
if (properties.IncrementalCopyDestinationSnapshot.HasValue()) {
metadata->Append("Incremental-Copy-Destination-Snapshot",
properties.IncrementalCopyDestinationSnapshot.Value());
}
if (properties.LeaseDuration.HasValue()) {
metadata->Append("Lease-Duration", properties.LeaseDuration.Value().ToString());
}
if (properties.LeaseState.HasValue()) {
metadata->Append("Lease-State", properties.LeaseState.Value().ToString());
}
if (properties.LeaseStatus.HasValue()) {
metadata->Append("Lease-Status", properties.LeaseStatus.Value().ToString());
}
metadata->Append("Content-Length", FormatValue<Int64Type>(properties.BlobSize));
if (properties.ETag.HasValue()) {
metadata->Append("ETag", properties.ETag.ToString());
}
if (properties.SequenceNumber.HasValue()) {
metadata->Append("Sequence-Number",
FormatValue<Int64Type>(properties.SequenceNumber.Value()));
}
if (properties.CommittedBlockCount.HasValue()) {
metadata->Append("Committed-Block-Count",
FormatValue<Int32Type>(properties.CommittedBlockCount.Value()));
}
metadata->Append("IsServerEncrypted",
FormatValue<BooleanType>(properties.IsServerEncrypted));
if (properties.EncryptionKeySha256.HasValue()) {
const auto& sha256 = properties.EncryptionKeySha256.Value();
metadata->Append("Encryption-Key-Sha-256", HexEncode(sha256.data(), sha256.size()));
}
if (properties.EncryptionScope.HasValue()) {
metadata->Append("Encryption-Scope", properties.EncryptionScope.Value());
}
if (properties.AccessTier.HasValue()) {
metadata->Append("Access-Tier", properties.AccessTier.Value().ToString());
}
if (properties.IsAccessTierInferred.HasValue()) {
metadata->Append("Is-Access-Tier-Inferred",
FormatValue<BooleanType>(properties.IsAccessTierInferred.Value()));
}
if (properties.ArchiveStatus.HasValue()) {
metadata->Append("Archive-Status", properties.ArchiveStatus.Value().ToString());
}
if (properties.AccessTierChangedOn.HasValue()) {
metadata->Append("Access-Tier-Changed-On",
properties.AccessTierChangedOn.Value().ToString());
}
if (properties.VersionId.HasValue()) {
metadata->Append("Version-Id", properties.VersionId.Value());
}
if (properties.IsCurrentVersion.HasValue()) {
metadata->Append("Is-Current-Version",
FormatValue<BooleanType>(properties.IsCurrentVersion.Value()));
}
if (properties.TagCount.HasValue()) {
metadata->Append("Tag-Count", FormatValue<Int32Type>(properties.TagCount.Value()));
}
if (properties.ExpiresOn.HasValue()) {
metadata->Append("Expires-On", properties.ExpiresOn.Value().ToString());
}
if (properties.IsSealed.HasValue()) {
metadata->Append("Is-Sealed", FormatValue<BooleanType>(properties.IsSealed.Value()));
}
if (properties.RehydratePriority.HasValue()) {
metadata->Append("Rehydrate-Priority",
properties.RehydratePriority.Value().ToString());
}
if (properties.LastAccessedOn.HasValue()) {
metadata->Append("Last-Accessed-On", properties.LastAccessedOn.Value().ToString());
}
return md;
metadata->Append("Has-Legal-Hold", FormatValue<BooleanType>(properties.HasLegalHold));
return metadata;
}

class ObjectInputFile final : public io::RandomAccessFile {
Expand All @@ -176,7 +312,7 @@ class ObjectInputFile final : public io::RandomAccessFile {
try {
auto properties = blob_client_->GetProperties();
content_length_ = properties.Value.BlobSize;
metadata_ = GetObjectMetadata(properties.Value.Metadata);
metadata_ = PropertiesToMetadata(properties.Value);
return Status::OK();
} catch (const Azure::Storage::StorageException& exception) {
if (exception.StatusCode == Azure::Core::Http::HttpStatusCode::NotFound) {
Expand Down
67 changes: 58 additions & 9 deletions cpp/src/arrow/filesystem/azurefs_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
#include "arrow/testing/util.h"
#include "arrow/util/io_util.h"
#include "arrow/util/key_value_metadata.h"
#include "arrow/util/string.h"
#include "arrow/util/value_parsing.h"

namespace arrow {
using internal::TemporaryDir;
Expand Down Expand Up @@ -296,21 +298,68 @@ TEST_F(TestAzureFileSystem, OpenInputStreamTrailingSlash) {
ASSERT_RAISES(IOError, fs_->OpenInputStream(PreexistingObjectPath() + '/'));
}

TEST_F(TestAzureFileSystem, OpenInputStreamReadMetadata) {
const std::string object_name = "OpenInputStreamMetadataTest/simple.txt";

service_client_->GetBlobContainerClient(PreexistingContainerName())
.GetBlobClient(PreexistingObjectName())
.SetMetadata(Azure::Storage::Metadata{{"key0", "value0"}});
namespace {
std::shared_ptr<const KeyValueMetadata> NormalizerKeyValueMetadata(
std::shared_ptr<const KeyValueMetadata> metadata) {
auto normalized = std::make_shared<KeyValueMetadata>();
for (int64_t i = 0; i < metadata->size(); ++i) {
auto key = metadata->key(i);
auto value = metadata->value(i);
if (key == "Content-Hash") {
std::vector<uint8_t> output;
output.reserve(value.size() / 2);
if (ParseHexValues(value, output.data()).ok()) {
// Valid value
value = std::string(value.size(), 'F');
}
} else if (key == "Last-Modified" || key == "Created-On" ||
key == "Access-Tier-Changed-On") {
auto parser = TimestampParser::MakeISO8601();
int64_t output;
if ((*parser)(value.data(), value.size(), TimeUnit::NANO, &output)) {
// Valid value
value = "2023-10-31T08:15:20Z";
}
} else if (key == "ETag") {
if (internal::StartsWith(value, "\"") && internal::EndsWith(value, "\"")) {
// Valid value
value = "\"ETagValue\"";
}
}
normalized->Append(key, value);
}
return normalized;
}
}; // namespace

TEST_F(TestAzureFileSystem, OpenInputStreamReadMetadata) {
std::shared_ptr<io::InputStream> stream;
ASSERT_OK_AND_ASSIGN(stream, fs_->OpenInputStream(PreexistingObjectPath()));

std::shared_ptr<const KeyValueMetadata> actual;
ASSERT_OK_AND_ASSIGN(actual, stream->ReadMetadata());
// TODO(GH-38330): This is asserting that the user defined metadata is returned but this
// is probably not the correct behaviour.
ASSERT_OK_AND_EQ("value0", actual->Get("key0"));
ASSERT_EQ(
"\n"
"-- metadata --\n"
"Content-Type: application/octet-stream\n"
"Content-Encoding: \n"
"Content-Language: \n"
"Content-Hash: FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF\n"
"Content-Disposition: \n"
"Cache-Control: \n"
"Last-Modified: 2023-10-31T08:15:20Z\n"
"Created-On: 2023-10-31T08:15:20Z\n"
"Blob-Type: BlockBlob\n"
"Lease-State: available\n"
"Lease-Status: unlocked\n"
"Content-Length: 447\n"
"ETag: \"ETagValue\"\n"
"IsServerEncrypted: true\n"
"Access-Tier: Hot\n"
"Is-Access-Tier-Inferred: true\n"
"Access-Tier-Changed-On: 2023-10-31T08:15:20Z\n"
"Has-Legal-Hold: false",
NormalizerKeyValueMetadata(actual)->ToString());
}

TEST_F(TestAzureFileSystem, OpenInputStreamClosed) {
Expand Down

0 comments on commit c0c316a

Please sign in to comment.