From 605f8a792c388afb2230b1f19e0f3e4df90d5abe Mon Sep 17 00:00:00 2001 From: Sutou Kouhei Date: Tue, 12 Mar 2024 06:34:58 +0900 Subject: [PATCH] GH-40028: [C++][FS][Azure] Add AzureFileSystem support to FileSystemFromUri() (#40325) ### Rationale for this change `FileSystemFromUri()` is a common API to create a file system object. `FileSystemFromUri()` should be able to create an `AzureFileSystem` object. ### What changes are included in this PR? Add `AzureOptions::FromUri()` and use it from `FileSystemFromUri()`. See the `AzureOptions::FromUri()`'s docstring about the supported formats. ### Are these changes tested? Yes. ### Are there any user-facing changes? Yes. * GitHub Issue: #40028 Authored-by: Sutou Kouhei Signed-off-by: Sutou Kouhei --- cpp/src/arrow/filesystem/azurefs.cc | 166 +++++++++++++++++++ cpp/src/arrow/filesystem/azurefs.h | 56 +++++++ cpp/src/arrow/filesystem/azurefs_test.cc | 196 +++++++++++++++++++++++ cpp/src/arrow/filesystem/filesystem.cc | 18 ++- 4 files changed, 433 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/filesystem/azurefs.cc b/cpp/src/arrow/filesystem/azurefs.cc index ff078f78aeac0..dd9fb817b7aca 100644 --- a/cpp/src/arrow/filesystem/azurefs.cc +++ b/cpp/src/arrow/filesystem/azurefs.cc @@ -65,6 +65,172 @@ AzureOptions::AzureOptions() = default; AzureOptions::~AzureOptions() = default; +void AzureOptions::ExtractFromUriSchemeAndHierPart(const arrow::internal::Uri& uri, + std::string* out_path) { + const auto host = uri.host(); + std::string path; + if (arrow::internal::EndsWith(host, blob_storage_authority)) { + account_name = host.substr(0, host.size() - blob_storage_authority.size()); + path = internal::RemoveLeadingSlash(uri.path()); + } else if (arrow::internal::EndsWith(host, dfs_storage_authority)) { + account_name = host.substr(0, host.size() - dfs_storage_authority.size()); + path = internal::ConcatAbstractPath(uri.username(), uri.path()); + } else { + account_name = uri.username(); + const auto port_text = uri.port_text(); + if (host.find(".") == std::string::npos && port_text.empty()) { + // abfs://container/dir/file + path = internal::ConcatAbstractPath(host, uri.path()); + } else { + // abfs://host.domain/container/dir/file + // abfs://host.domain:port/container/dir/file + // abfs://host:port/container/dir/file + std::string host_port = host; + if (!port_text.empty()) { + host_port += ":" + port_text; + } + blob_storage_authority = host_port; + dfs_storage_authority = host_port; + path = internal::RemoveLeadingSlash(uri.path()); + } + } + if (out_path != nullptr) { + *out_path = path; + } +} + +Status AzureOptions::ExtractFromUriQuery(const arrow::internal::Uri& uri) { + const auto account_key = uri.password(); + std::optional credential_kind; + std::optional credential_kind_value; + std::string tenant_id; + std::string client_id; + std::string client_secret; + ARROW_ASSIGN_OR_RAISE(const auto options_items, uri.query_items()); + for (const auto& kv : options_items) { + if (kv.first == "blob_storage_authority") { + blob_storage_authority = kv.second; + } else if (kv.first == "dfs_storage_authority") { + dfs_storage_authority = kv.second; + } else if (kv.first == "credential_kind") { + if (kv.second == "default") { + credential_kind = CredentialKind::kDefault; + } else if (kv.second == "anonymous") { + credential_kind = CredentialKind::kAnonymous; + } else if (kv.second == "workload_identity") { + credential_kind = CredentialKind::kWorkloadIdentity; + } else { + // Other credential kinds should be inferred from the given + // parameters automatically. + return Status::Invalid("Unexpected credential_kind: '", kv.second, "'"); + } + credential_kind_value = kv.second; + } else if (kv.first == "tenant_id") { + tenant_id = kv.second; + } else if (kv.first == "client_id") { + client_id = kv.second; + } else if (kv.first == "client_secret") { + client_secret = kv.second; + } else if (kv.first == "enable_tls") { + ARROW_ASSIGN_OR_RAISE(auto enable_tls, ::arrow::internal::ParseBoolean(kv.second)); + if (enable_tls) { + blob_storage_scheme = "https"; + dfs_storage_scheme = "https"; + } else { + blob_storage_scheme = "http"; + dfs_storage_scheme = "http"; + } + } else { + return Status::Invalid( + "Unexpected query parameter in Azure Blob File System URI: '", kv.first, "'"); + } + } + + if (credential_kind) { + if (!account_key.empty()) { + return Status::Invalid("Password must not be specified with credential_kind=", + *credential_kind_value); + } + if (!tenant_id.empty()) { + return Status::Invalid("tenant_id must not be specified with credential_kind=", + *credential_kind_value); + } + if (!client_id.empty()) { + return Status::Invalid("client_id must not be specified with credential_kind=", + *credential_kind_value); + } + if (!client_secret.empty()) { + return Status::Invalid("client_secret must not be specified with credential_kind=", + *credential_kind_value); + } + + switch (*credential_kind) { + case CredentialKind::kAnonymous: + RETURN_NOT_OK(ConfigureAnonymousCredential()); + break; + case CredentialKind::kWorkloadIdentity: + RETURN_NOT_OK(ConfigureWorkloadIdentityCredential()); + break; + default: + // Default credential + break; + } + } else { + if (!account_key.empty()) { + // With password + if (!tenant_id.empty()) { + return Status::Invalid("tenant_id must not be specified with password"); + } + if (!client_id.empty()) { + return Status::Invalid("client_id must not be specified with password"); + } + if (!client_secret.empty()) { + return Status::Invalid("client_secret must not be specified with password"); + } + RETURN_NOT_OK(ConfigureAccountKeyCredential(account_key)); + } else { + // Without password + if (tenant_id.empty() && client_id.empty() && client_secret.empty()) { + // No related parameters + if (account_name.empty()) { + RETURN_NOT_OK(ConfigureAnonymousCredential()); + } else { + // Default credential + } + } else { + // One or more tenant_id, client_id or client_secret are specified + if (client_id.empty()) { + return Status::Invalid("client_id must be specified"); + } + if (tenant_id.empty() && client_secret.empty()) { + RETURN_NOT_OK(ConfigureManagedIdentityCredential(client_id)); + } else if (!tenant_id.empty() && !client_secret.empty()) { + RETURN_NOT_OK( + ConfigureClientSecretCredential(tenant_id, client_id, client_secret)); + } else { + return Status::Invalid("Both of tenant_id and client_secret must be specified"); + } + } + } + } + return Status::OK(); +} + +Result AzureOptions::FromUri(const arrow::internal::Uri& uri, + std::string* out_path) { + AzureOptions options; + options.ExtractFromUriSchemeAndHierPart(uri, out_path); + RETURN_NOT_OK(options.ExtractFromUriQuery(uri)); + return options; +} + +Result AzureOptions::FromUri(const std::string& uri_string, + std::string* out_path) { + arrow::internal::Uri uri; + RETURN_NOT_OK(uri.Parse(uri_string)); + return FromUri(uri, out_path); +} + bool AzureOptions::Equals(const AzureOptions& other) const { // TODO(GH-38598): update here when more auth methods are added. const bool equals = blob_storage_authority == other.blob_storage_authority && diff --git a/cpp/src/arrow/filesystem/azurefs.h b/cpp/src/arrow/filesystem/azurefs.h index 2a131e40c05bf..6218bf574e8dd 100644 --- a/cpp/src/arrow/filesystem/azurefs.h +++ b/cpp/src/arrow/filesystem/azurefs.h @@ -45,6 +45,7 @@ class DataLakeServiceClient; namespace arrow::fs { class TestAzureFileSystem; +class TestAzureOptions; /// Options for the AzureFileSystem implementation. /// @@ -59,6 +60,8 @@ class TestAzureFileSystem; /// /// Functions are provided for explicit configuration of credentials if that is preferred. struct ARROW_EXPORT AzureOptions { + friend class TestAzureOptions; + /// \brief The name of the Azure Storage Account being accessed. /// /// All service URLs will be constructed using this storage account name. @@ -123,6 +126,59 @@ struct ARROW_EXPORT AzureOptions { AzureOptions(); ~AzureOptions(); + private: + void ExtractFromUriSchemeAndHierPart(const arrow::internal::Uri& uri, + std::string* out_path); + Status ExtractFromUriQuery(const arrow::internal::Uri& uri); + + public: + /// \brief Construct a new AzureOptions from an URI. + /// + /// Supported formats: + /// + /// 1. abfs[s]://[:\@]\.blob.core.windows.net + /// [/\[/\]] + /// 2. abfs[s]://\[:\]@\.dfs.core.windows.net + /// [/path] + /// 3. abfs[s]://[\]@]\[\<:port\>] + /// [/\[/path]] + /// 4. abfs[s]://[\]@]\[/path] + /// + /// 1. and 2. are compatible with the Azure Data Lake Storage Gen2 URIs: + /// https://learn.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-introduction-abfs-uri + /// + /// 3. is for Azure Blob Storage compatible service including Azurite. + /// + /// 4. is a shorter version of 1. and 2. + /// + /// Note that there is no difference between abfs and abfss. HTTPS is + /// used with abfs by default. You can force to use HTTP by specifying + /// "enable_tls=false" query. + /// + /// Supported query parameters: + /// + /// * blob_storage_authority: Set AzureOptions::blob_storage_authority + /// * dfs_storage_authority: Set AzureOptions::dfs_storage_authority + /// * enable_tls: If it's "false" or "0", HTTP not HTTPS is used. + /// * credential_kind: One of "default", "anonymous", + /// "workload_identity". If "default" is specified, it's just + /// ignored. If "anonymous" is specified, + /// AzureOptions::ConfigureAnonymousCredential() is called. If + /// "workload_identity" is specified, + /// AzureOptions::ConfigureWorkloadIdentityCredential() is called. + /// * tenant_id: You must specify "client_id" and "client_secret" + /// too. AzureOptions::ConfigureClientSecretCredential() is called. + /// * client_id: If you don't specify "tenant_id" and + /// "client_secret", + /// AzureOptions::ConfigureManagedIdentityCredential() is + /// called. If you specify "tenant_id" and "client_secret" too, + /// AzureOptions::ConfigureClientSecretCredential() is called. + /// * client_secret: You must specify "tenant_id" and "client_id" + /// too. AzureOptions::ConfigureClientSecretCredential() is called. + static Result FromUri(const arrow::internal::Uri& uri, + std::string* out_path); + static Result FromUri(const std::string& uri, std::string* out_path); + Status ConfigureDefaultCredential(); Status ConfigureAnonymousCredential(); Status ConfigureAccountKeyCredential(const std::string& account_key); diff --git a/cpp/src/arrow/filesystem/azurefs_test.cc b/cpp/src/arrow/filesystem/azurefs_test.cc index f21876f03cc95..0ce84043a537c 100644 --- a/cpp/src/arrow/filesystem/azurefs_test.cc +++ b/cpp/src/arrow/filesystem/azurefs_test.cc @@ -336,6 +336,202 @@ TEST(AzureFileSystem, OptionsCompare) { EXPECT_TRUE(options.Equals(options)); } +class TestAzureOptions : public ::testing::Test { + public: + void TestFromUriBlobStorage() { + AzureOptions default_options; + std::string path; + ASSERT_OK_AND_ASSIGN( + auto options, + AzureOptions::FromUri("abfs://account.blob.core.windows.net/container/dir/blob", + &path)); + ASSERT_EQ(options.account_name, "account"); + ASSERT_EQ(options.blob_storage_authority, default_options.blob_storage_authority); + ASSERT_EQ(options.dfs_storage_authority, default_options.dfs_storage_authority); + ASSERT_EQ(options.blob_storage_scheme, default_options.blob_storage_scheme); + ASSERT_EQ(options.dfs_storage_scheme, default_options.dfs_storage_scheme); + ASSERT_EQ(options.credential_kind_, AzureOptions::CredentialKind::kDefault); + ASSERT_EQ(path, "container/dir/blob"); + } + + void TestFromUriDfsStorage() { + AzureOptions default_options; + std::string path; + ASSERT_OK_AND_ASSIGN( + auto options, + AzureOptions::FromUri("abfs://file_system@account.dfs.core.windows.net/dir/file", + &path)); + ASSERT_EQ(options.account_name, "account"); + ASSERT_EQ(options.blob_storage_authority, default_options.blob_storage_authority); + ASSERT_EQ(options.dfs_storage_authority, default_options.dfs_storage_authority); + ASSERT_EQ(options.blob_storage_scheme, default_options.blob_storage_scheme); + ASSERT_EQ(options.dfs_storage_scheme, default_options.dfs_storage_scheme); + ASSERT_EQ(options.credential_kind_, AzureOptions::CredentialKind::kDefault); + ASSERT_EQ(path, "file_system/dir/file"); + } + + void TestFromUriAbfs() { + std::string path; + ASSERT_OK_AND_ASSIGN( + auto options, + AzureOptions::FromUri( + "abfs://account:password@127.0.0.1:10000/container/dir/blob", &path)); + ASSERT_EQ(options.account_name, "account"); + ASSERT_EQ(options.blob_storage_authority, "127.0.0.1:10000"); + ASSERT_EQ(options.dfs_storage_authority, "127.0.0.1:10000"); + ASSERT_EQ(options.blob_storage_scheme, "https"); + ASSERT_EQ(options.dfs_storage_scheme, "https"); + ASSERT_EQ(options.credential_kind_, AzureOptions::CredentialKind::kStorageSharedKey); + ASSERT_EQ(path, "container/dir/blob"); + } + + void TestFromUriAbfss() { + std::string path; + ASSERT_OK_AND_ASSIGN( + auto options, + AzureOptions::FromUri( + "abfss://account:password@127.0.0.1:10000/container/dir/blob", &path)); + ASSERT_EQ(options.account_name, "account"); + ASSERT_EQ(options.blob_storage_authority, "127.0.0.1:10000"); + ASSERT_EQ(options.dfs_storage_authority, "127.0.0.1:10000"); + ASSERT_EQ(options.blob_storage_scheme, "https"); + ASSERT_EQ(options.dfs_storage_scheme, "https"); + ASSERT_EQ(options.credential_kind_, AzureOptions::CredentialKind::kStorageSharedKey); + ASSERT_EQ(path, "container/dir/blob"); + } + + void TestFromUriEnableTls() { + std::string path; + ASSERT_OK_AND_ASSIGN(auto options, + AzureOptions::FromUri( + "abfs://account:password@127.0.0.1:10000/container/dir/blob?" + "enable_tls=false", + &path)); + ASSERT_EQ(options.account_name, "account"); + ASSERT_EQ(options.blob_storage_authority, "127.0.0.1:10000"); + ASSERT_EQ(options.dfs_storage_authority, "127.0.0.1:10000"); + ASSERT_EQ(options.blob_storage_scheme, "http"); + ASSERT_EQ(options.dfs_storage_scheme, "http"); + ASSERT_EQ(options.credential_kind_, AzureOptions::CredentialKind::kStorageSharedKey); + ASSERT_EQ(path, "container/dir/blob"); + } + + void TestFromUriCredentialDefault() { + ASSERT_OK_AND_ASSIGN( + auto options, + AzureOptions::FromUri("abfs://account.blob.core.windows.net/container/dir/blob?" + "credential_kind=default", + nullptr)); + ASSERT_EQ(options.credential_kind_, AzureOptions::CredentialKind::kDefault); + } + + void TestFromUriCredentialAnonymous() { + ASSERT_OK_AND_ASSIGN( + auto options, + AzureOptions::FromUri("abfs://account.blob.core.windows.net/container/dir/blob?" + "credential_kind=anonymous", + nullptr)); + ASSERT_EQ(options.credential_kind_, AzureOptions::CredentialKind::kAnonymous); + } + + void TestFromUriCredentialStorageSharedKey() { + ASSERT_OK_AND_ASSIGN( + auto options, + AzureOptions::FromUri( + "abfs://:password@account.blob.core.windows.net/container/dir/blob", + nullptr)); + ASSERT_EQ(options.credential_kind_, AzureOptions::CredentialKind::kStorageSharedKey); + } + + void TestFromUriCredentialClientSecret() { + ASSERT_OK_AND_ASSIGN( + auto options, + AzureOptions::FromUri("abfs://account.blob.core.windows.net/container/dir/blob?" + "tenant_id=tenant-id&" + "client_id=client-id&" + "client_secret=client-secret", + nullptr)); + ASSERT_EQ(options.credential_kind_, AzureOptions::CredentialKind::kClientSecret); + } + + void TestFromUriCredentialManagedIdentity() { + ASSERT_OK_AND_ASSIGN( + auto options, + AzureOptions::FromUri("abfs://account.blob.core.windows.net/container/dir/blob?" + "client_id=client-id", + nullptr)); + ASSERT_EQ(options.credential_kind_, AzureOptions::CredentialKind::kManagedIdentity); + } + + void TestFromUriCredentialWorkloadIdentity() { + ASSERT_OK_AND_ASSIGN( + auto options, + AzureOptions::FromUri("abfs://account.blob.core.windows.net/container/dir/blob?" + "credential_kind=workload_identity", + nullptr)); + ASSERT_EQ(options.credential_kind_, AzureOptions::CredentialKind::kWorkloadIdentity); + } + + void TestFromUriCredentialInvalid() { + ASSERT_RAISES(Invalid, AzureOptions::FromUri( + "abfs://file_system@account.dfs.core.windows.net/dir/file?" + "credential_kind=invalid", + nullptr)); + } + void TestFromUriBlobStorageAuthority() { + ASSERT_OK_AND_ASSIGN( + auto options, + AzureOptions::FromUri("abfs://account.blob.core.windows.net/container/dir/blob?" + "blob_storage_authority=.blob.local", + nullptr)); + ASSERT_EQ(options.blob_storage_authority, ".blob.local"); + } + + void TestFromUriDfsStorageAuthority() { + ASSERT_OK_AND_ASSIGN( + auto options, + AzureOptions::FromUri("abfs://file_system@account.dfs.core.windows.net/dir/file?" + "dfs_storage_authority=.dfs.local", + nullptr)); + ASSERT_EQ(options.dfs_storage_authority, ".dfs.local"); + } + + void TestFromUriInvalidQueryParameter() { + ASSERT_RAISES(Invalid, AzureOptions::FromUri( + "abfs://file_system@account.dfs.core.windows.net/dir/file?" + "unknown=invalid", + nullptr)); + } +}; + +TEST_F(TestAzureOptions, FromUriBlobStorage) { TestFromUriBlobStorage(); } +TEST_F(TestAzureOptions, FromUriDfsStorage) { TestFromUriDfsStorage(); } +TEST_F(TestAzureOptions, FromUriAbfs) { TestFromUriAbfs(); } +TEST_F(TestAzureOptions, FromUriAbfss) { TestFromUriAbfss(); } +TEST_F(TestAzureOptions, FromUriEnableTls) { TestFromUriEnableTls(); } +TEST_F(TestAzureOptions, FromUriCredentialDefault) { TestFromUriCredentialDefault(); } +TEST_F(TestAzureOptions, FromUriCredentialAnonymous) { TestFromUriCredentialAnonymous(); } +TEST_F(TestAzureOptions, FromUriCredentialStorageSharedKey) { + TestFromUriCredentialStorageSharedKey(); +} +TEST_F(TestAzureOptions, FromUriCredentialClientSecret) { + TestFromUriCredentialClientSecret(); +} +TEST_F(TestAzureOptions, FromUriCredentialManagedIdentity) { + TestFromUriCredentialManagedIdentity(); +} +TEST_F(TestAzureOptions, FromUriCredentialWorkloadIdentity) { + TestFromUriCredentialWorkloadIdentity(); +} +TEST_F(TestAzureOptions, FromUriCredentialInvalid) { TestFromUriCredentialInvalid(); } +TEST_F(TestAzureOptions, FromUriBlobStorageAuthority) { + TestFromUriBlobStorageAuthority(); +} +TEST_F(TestAzureOptions, FromUriDfsStorageAuthority) { TestFromUriDfsStorageAuthority(); } +TEST_F(TestAzureOptions, FromUriInvalidQueryParameter) { + TestFromUriInvalidQueryParameter(); +} + struct PreexistingData { public: using RNG = random::pcg32_fast; diff --git a/cpp/src/arrow/filesystem/filesystem.cc b/cpp/src/arrow/filesystem/filesystem.cc index 810e9c179b156..1fb74d412988d 100644 --- a/cpp/src/arrow/filesystem/filesystem.cc +++ b/cpp/src/arrow/filesystem/filesystem.cc @@ -21,12 +21,15 @@ #include "arrow/util/config.h" #include "arrow/filesystem/filesystem.h" -#ifdef ARROW_HDFS -#include "arrow/filesystem/hdfs.h" +#ifdef ARROW_AZURE +#include "arrow/filesystem/azurefs.h" #endif #ifdef ARROW_GCS #include "arrow/filesystem/gcsfs.h" #endif +#ifdef ARROW_HDFS +#include "arrow/filesystem/hdfs.h" +#endif #ifdef ARROW_S3 #include "arrow/filesystem/s3fs.h" #endif @@ -690,6 +693,16 @@ Result> FileSystemFromUriReal(const Uri& uri, } return std::make_shared(options, io_context); } + if (scheme == "abfs" || scheme == "abfss") { +#ifdef ARROW_AZURE + ARROW_ASSIGN_OR_RAISE(auto options, AzureOptions::FromUri(uri, out_path)); + return AzureFileSystem::Make(options, io_context); +#else + return Status::NotImplemented( + "Got Azure Blob File System URI but Arrow compiled without Azure Blob File " + "System support"); +#endif + } if (scheme == "gs" || scheme == "gcs") { #ifdef ARROW_GCS ARROW_ASSIGN_OR_RAISE(auto options, GcsOptions::FromUri(uri, out_path)); @@ -698,7 +711,6 @@ Result> FileSystemFromUriReal(const Uri& uri, return Status::NotImplemented("Got GCS URI but Arrow compiled without GCS support"); #endif } - if (scheme == "hdfs" || scheme == "viewfs") { #ifdef ARROW_HDFS ARROW_ASSIGN_OR_RAISE(auto options, HdfsOptions::FromUri(uri));