Skip to content

Commit

Permalink
GH-40028: [C++][FS][Azure] Add AzureFileSystem support to FileSystemF…
Browse files Browse the repository at this point in the history
…romUri() (#40325)

### Rationale for this change

`FileSystemFromUri()` is a common API to create a file system object. `FileSystemFromUri()` should be able to create an `AzureFileSystem` object.

### What changes are included in this PR?

Add `AzureOptions::FromUri()` and use it from `FileSystemFromUri()`.

See the `AzureOptions::FromUri()`'s docstring about the supported formats.

### Are these changes tested?

Yes.

### Are there any user-facing changes?

Yes.
* GitHub Issue: #40028

Authored-by: Sutou Kouhei <[email protected]>
Signed-off-by: Sutou Kouhei <[email protected]>
  • Loading branch information
kou authored Mar 11, 2024
1 parent 5127ef0 commit 605f8a7
Show file tree
Hide file tree
Showing 4 changed files with 433 additions and 3 deletions.
166 changes: 166 additions & 0 deletions cpp/src/arrow/filesystem/azurefs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,172 @@ AzureOptions::AzureOptions() = default;

AzureOptions::~AzureOptions() = default;

void AzureOptions::ExtractFromUriSchemeAndHierPart(const arrow::internal::Uri& uri,
std::string* out_path) {
const auto host = uri.host();
std::string path;
if (arrow::internal::EndsWith(host, blob_storage_authority)) {
account_name = host.substr(0, host.size() - blob_storage_authority.size());
path = internal::RemoveLeadingSlash(uri.path());
} else if (arrow::internal::EndsWith(host, dfs_storage_authority)) {
account_name = host.substr(0, host.size() - dfs_storage_authority.size());
path = internal::ConcatAbstractPath(uri.username(), uri.path());
} else {
account_name = uri.username();
const auto port_text = uri.port_text();
if (host.find(".") == std::string::npos && port_text.empty()) {
// abfs://container/dir/file
path = internal::ConcatAbstractPath(host, uri.path());
} else {
// abfs://host.domain/container/dir/file
// abfs://host.domain:port/container/dir/file
// abfs://host:port/container/dir/file
std::string host_port = host;
if (!port_text.empty()) {
host_port += ":" + port_text;
}
blob_storage_authority = host_port;
dfs_storage_authority = host_port;
path = internal::RemoveLeadingSlash(uri.path());
}
}
if (out_path != nullptr) {
*out_path = path;
}
}

Status AzureOptions::ExtractFromUriQuery(const arrow::internal::Uri& uri) {
const auto account_key = uri.password();
std::optional<CredentialKind> credential_kind;
std::optional<std::string> credential_kind_value;
std::string tenant_id;
std::string client_id;
std::string client_secret;
ARROW_ASSIGN_OR_RAISE(const auto options_items, uri.query_items());
for (const auto& kv : options_items) {
if (kv.first == "blob_storage_authority") {
blob_storage_authority = kv.second;
} else if (kv.first == "dfs_storage_authority") {
dfs_storage_authority = kv.second;
} else if (kv.first == "credential_kind") {
if (kv.second == "default") {
credential_kind = CredentialKind::kDefault;
} else if (kv.second == "anonymous") {
credential_kind = CredentialKind::kAnonymous;
} else if (kv.second == "workload_identity") {
credential_kind = CredentialKind::kWorkloadIdentity;
} else {
// Other credential kinds should be inferred from the given
// parameters automatically.
return Status::Invalid("Unexpected credential_kind: '", kv.second, "'");
}
credential_kind_value = kv.second;
} else if (kv.first == "tenant_id") {
tenant_id = kv.second;
} else if (kv.first == "client_id") {
client_id = kv.second;
} else if (kv.first == "client_secret") {
client_secret = kv.second;
} else if (kv.first == "enable_tls") {
ARROW_ASSIGN_OR_RAISE(auto enable_tls, ::arrow::internal::ParseBoolean(kv.second));
if (enable_tls) {
blob_storage_scheme = "https";
dfs_storage_scheme = "https";
} else {
blob_storage_scheme = "http";
dfs_storage_scheme = "http";
}
} else {
return Status::Invalid(
"Unexpected query parameter in Azure Blob File System URI: '", kv.first, "'");
}
}

if (credential_kind) {
if (!account_key.empty()) {
return Status::Invalid("Password must not be specified with credential_kind=",
*credential_kind_value);
}
if (!tenant_id.empty()) {
return Status::Invalid("tenant_id must not be specified with credential_kind=",
*credential_kind_value);
}
if (!client_id.empty()) {
return Status::Invalid("client_id must not be specified with credential_kind=",
*credential_kind_value);
}
if (!client_secret.empty()) {
return Status::Invalid("client_secret must not be specified with credential_kind=",
*credential_kind_value);
}

switch (*credential_kind) {
case CredentialKind::kAnonymous:
RETURN_NOT_OK(ConfigureAnonymousCredential());
break;
case CredentialKind::kWorkloadIdentity:
RETURN_NOT_OK(ConfigureWorkloadIdentityCredential());
break;
default:
// Default credential
break;
}
} else {
if (!account_key.empty()) {
// With password
if (!tenant_id.empty()) {
return Status::Invalid("tenant_id must not be specified with password");
}
if (!client_id.empty()) {
return Status::Invalid("client_id must not be specified with password");
}
if (!client_secret.empty()) {
return Status::Invalid("client_secret must not be specified with password");
}
RETURN_NOT_OK(ConfigureAccountKeyCredential(account_key));
} else {
// Without password
if (tenant_id.empty() && client_id.empty() && client_secret.empty()) {
// No related parameters
if (account_name.empty()) {
RETURN_NOT_OK(ConfigureAnonymousCredential());
} else {
// Default credential
}
} else {
// One or more tenant_id, client_id or client_secret are specified
if (client_id.empty()) {
return Status::Invalid("client_id must be specified");
}
if (tenant_id.empty() && client_secret.empty()) {
RETURN_NOT_OK(ConfigureManagedIdentityCredential(client_id));
} else if (!tenant_id.empty() && !client_secret.empty()) {
RETURN_NOT_OK(
ConfigureClientSecretCredential(tenant_id, client_id, client_secret));
} else {
return Status::Invalid("Both of tenant_id and client_secret must be specified");
}
}
}
}
return Status::OK();
}

Result<AzureOptions> AzureOptions::FromUri(const arrow::internal::Uri& uri,
std::string* out_path) {
AzureOptions options;
options.ExtractFromUriSchemeAndHierPart(uri, out_path);
RETURN_NOT_OK(options.ExtractFromUriQuery(uri));
return options;
}

Result<AzureOptions> AzureOptions::FromUri(const std::string& uri_string,
std::string* out_path) {
arrow::internal::Uri uri;
RETURN_NOT_OK(uri.Parse(uri_string));
return FromUri(uri, out_path);
}

bool AzureOptions::Equals(const AzureOptions& other) const {
// TODO(GH-38598): update here when more auth methods are added.
const bool equals = blob_storage_authority == other.blob_storage_authority &&
Expand Down
56 changes: 56 additions & 0 deletions cpp/src/arrow/filesystem/azurefs.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class DataLakeServiceClient;
namespace arrow::fs {

class TestAzureFileSystem;
class TestAzureOptions;

/// Options for the AzureFileSystem implementation.
///
Expand All @@ -59,6 +60,8 @@ class TestAzureFileSystem;
///
/// Functions are provided for explicit configuration of credentials if that is preferred.
struct ARROW_EXPORT AzureOptions {
friend class TestAzureOptions;

/// \brief The name of the Azure Storage Account being accessed.
///
/// All service URLs will be constructed using this storage account name.
Expand Down Expand Up @@ -123,6 +126,59 @@ struct ARROW_EXPORT AzureOptions {
AzureOptions();
~AzureOptions();

private:
void ExtractFromUriSchemeAndHierPart(const arrow::internal::Uri& uri,
std::string* out_path);
Status ExtractFromUriQuery(const arrow::internal::Uri& uri);

public:
/// \brief Construct a new AzureOptions from an URI.
///
/// Supported formats:
///
/// 1. abfs[s]://[:\<password\>@]\<account\>.blob.core.windows.net
/// [/\<container\>[/\<path\>]]
/// 2. abfs[s]://\<container\>[:\<password\>]@\<account\>.dfs.core.windows.net
/// [/path]
/// 3. abfs[s]://[\<account[:\<password\>]@]\<host[.domain]\>[\<:port\>]
/// [/\<container\>[/path]]
/// 4. abfs[s]://[\<account[:\<password\>]@]\<container\>[/path]
///
/// 1. and 2. are compatible with the Azure Data Lake Storage Gen2 URIs:
/// https://learn.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-introduction-abfs-uri
///
/// 3. is for Azure Blob Storage compatible service including Azurite.
///
/// 4. is a shorter version of 1. and 2.
///
/// Note that there is no difference between abfs and abfss. HTTPS is
/// used with abfs by default. You can force to use HTTP by specifying
/// "enable_tls=false" query.
///
/// Supported query parameters:
///
/// * blob_storage_authority: Set AzureOptions::blob_storage_authority
/// * dfs_storage_authority: Set AzureOptions::dfs_storage_authority
/// * enable_tls: If it's "false" or "0", HTTP not HTTPS is used.
/// * credential_kind: One of "default", "anonymous",
/// "workload_identity". If "default" is specified, it's just
/// ignored. If "anonymous" is specified,
/// AzureOptions::ConfigureAnonymousCredential() is called. If
/// "workload_identity" is specified,
/// AzureOptions::ConfigureWorkloadIdentityCredential() is called.
/// * tenant_id: You must specify "client_id" and "client_secret"
/// too. AzureOptions::ConfigureClientSecretCredential() is called.
/// * client_id: If you don't specify "tenant_id" and
/// "client_secret",
/// AzureOptions::ConfigureManagedIdentityCredential() is
/// called. If you specify "tenant_id" and "client_secret" too,
/// AzureOptions::ConfigureClientSecretCredential() is called.
/// * client_secret: You must specify "tenant_id" and "client_id"
/// too. AzureOptions::ConfigureClientSecretCredential() is called.
static Result<AzureOptions> FromUri(const arrow::internal::Uri& uri,
std::string* out_path);
static Result<AzureOptions> FromUri(const std::string& uri, std::string* out_path);

Status ConfigureDefaultCredential();
Status ConfigureAnonymousCredential();
Status ConfigureAccountKeyCredential(const std::string& account_key);
Expand Down
Loading

0 comments on commit 605f8a7

Please sign in to comment.