Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-40028: [C++][FS][Azure] Add AzureFileSystem support to FileSystemFromUri() #40325

Merged
merged 9 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 127 additions & 0 deletions cpp/src/arrow/filesystem/azurefs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,133 @@ AzureOptions::AzureOptions() = default;

AzureOptions::~AzureOptions() = default;

Result<AzureOptions> AzureOptions::FromUri(const arrow::internal::Uri& uri,
std::string* out_path) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A PR by @bkietz is moving Uri out of internal so we should be careful with the merges.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the info!
#39067

AzureOptions options;
const auto host = uri.host();
std::string container;
std::string path;
if (arrow::internal::EndsWith(host, options.blob_storage_authority)) {
options.account_name =
host.substr(0, host.size() - options.blob_storage_authority.size());
auto components = internal::SplitAbstractPath(uri.path());
if (!components.empty()) {
container = components[0];
path = internal::JoinAbstractPath(components.begin() + 1, components.end());
}
} else if (arrow::internal::EndsWith(host, options.dfs_storage_authority)) {
options.account_name =
host.substr(0, host.size() - options.dfs_storage_authority.size());
container = uri.username();
path = uri.path();
} else {
options.account_name = uri.username();
std::string host_port = host;
const auto port_text = uri.port_text();
if (!port_text.empty()) {
host_port += ":" + port_text;
}
options.blob_storage_authority = host_port;
options.dfs_storage_authority = host_port;
if (uri.scheme() == "abfs") {
options.blob_storage_scheme = "http";
options.dfs_storage_scheme = "http";
felipecrv marked this conversation as resolved.
Show resolved Hide resolved
}
auto components = internal::SplitAbstractPath(uri.path());
if (!components.empty()) {
container = components[0];
path = internal::JoinAbstractPath(components.begin() + 1, components.end());
}
}
const auto account_key = uri.password();

if (container.empty()) {
return Status::Invalid("Missing container name in Azure Blob File System URI");
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why you need a container name if the filesystem wraps the entire storage account?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, we don't need this check. I'll remove this.
(I used GcsOptions::FromUri() as a base implementation and forgot to remove this check.)

if (out_path != nullptr) {
*out_path = std::string(internal::ConcatAbstractPath(container, path));
}

std::unordered_map<std::string, std::string> options_map;
ARROW_ASSIGN_OR_RAISE(const auto options_items, uri.query_items());
for (const auto& kv : options_items) {
options_map.emplace(kv.first, kv.second);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to build the map if you're going to iterate over the kv pairs and switch. This is just randomizing the iteration order.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. I borrowed this implementation from GcsOptions::FromUri() but I should have noticed this.
(We should remove this conversion in GcsOptions::FromUri() too later.)


CredentialKind credential_kind = options.account_name.empty()
? CredentialKind::kAnonymous
: CredentialKind::kDefault;
std::string tenant_id;
std::string client_id;
std::string client_secret;
for (const auto& kv : options_map) {
if (kv.first == "blob_storage_authority") {
options.blob_storage_authority = kv.second;
} else if (kv.first == "dfs_storage_authority") {
options.dfs_storage_authority = kv.second;
} else if (kv.first == "blob_storage_scheme") {
options.blob_storage_scheme = kv.second;
} else if (kv.first == "dfs_storage_scheme") {
options.dfs_storage_scheme = kv.second;
} else if (kv.first == "credential_kind") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

credential_kind_ should be inferred from what you find on the URI without the user having to set both the credential kind and the credentials.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it mean that we should use ConfigureClientSecretCredential() if tenant_id, client_id and client_secret are specified but credential_kind=client_secret isn't specified?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

credential_kind should never be specified and we should validate the URI to keep the invariant that it doesn't configure two different auth methods. And when nothing is provided, we use the default auth chain provided by the SDK.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, how can we distinguish ConfigureAnonymousCredential(), ConfigureWorkloadIdentityCredential() and ConfigureDefaultCredential()? All of them don't require additional information.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The parameter-less auth methods can have dedicated query params for each. These being the valid configurations regarding auth:

  • nothing (use default auth chain)
  • ?anonymous
  • ?use_workload_identity
  • ?account_key=<ACCOUNT_KEY>
  • ?tenant_id=<TENANT_ID>&client_id=<CLIENT_ID>&client_secret=<CLIENT_SECRET>
  • ?client_id=<CLIENT_ID> (client_id alone means managed identity credential)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?anonymous and ?use_workaround_identity are conflicted parameters. (We can't specify both of them at once.) I think that it's better that we use the same parameter name for the type (XXX={anonymous,workload_identity}). If we use it, users can't specify both of them at once. (I know that URI spec accepts XXX=anonymous&XXX=workload_identity.)

How about accepting only (default, ) anonymous and use_workload_identity as valid credential_kind parameter?

  • nothing (use default auth chain) -> nothing or ?credential_kind=default
  • ?anonymous -> ?credential_kind=anonymous
  • ?use_workload_identity -> ?credential_kind=workload_identity
  • ?account_key=<ACCOUNT_KEY> -> not changed (?credential_kind=storage_shared_key is invalid)
  • ?tenant_id=<TENANT_ID>&client_id=<CLIENT_ID>&client_secret=<CLIENT_SECRET> -> not changed (?credential_kind=client_secret is invalid)
  • ?client_id=<CLIENT_ID> (client_id alone means managed identity credential) -> not changed (?credential_kind=managed_identity is invalid)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, we don't need ?account_key=<ACCOUNT_KEY> because we can get it from the URI's password part.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about accepting only (default, ) anonymous and use_workload_identity as valid credential_kind parameter?

Sure. That looks good.

if (kv.second == "default") {
credential_kind = CredentialKind::kDefault;
} else if (kv.second == "anonymous") {
credential_kind = CredentialKind::kAnonymous;
} else if (kv.second == "storage_shared_key") {
credential_kind = CredentialKind::kStorageSharedKey;
} else if (kv.second == "client_secret") {
credential_kind = CredentialKind::kClientSecret;
} else if (kv.second == "managed_identity") {
credential_kind = CredentialKind::kManagedIdentity;
} else if (kv.second == "workload_identity") {
credential_kind = CredentialKind::kWorkloadIdentity;
} else {
return Status::Invalid("Unexpected credential_kind: '", kv.second, "'");
}
} else if (kv.first == "tenant_id") {
tenant_id = kv.second;
} else if (kv.first == "client_id") {
client_id = kv.second;
} else if (kv.first == "client_secret") {
client_secret = kv.second;
} else {
return Status::Invalid(
"Unexpected query parameter in Azure Blob File System URI: '", kv.first, "'");
}
}

switch (credential_kind) {
case CredentialKind::kDefault:
break;
case CredentialKind::kAnonymous:
RETURN_NOT_OK(options.ConfigureAnonymousCredential());
break;
case CredentialKind::kStorageSharedKey:
RETURN_NOT_OK(options.ConfigureAccountKeyCredential(account_key));
break;
case CredentialKind::kClientSecret:
RETURN_NOT_OK(
options.ConfigureClientSecretCredential(tenant_id, client_id, client_secret));
break;
case CredentialKind::kManagedIdentity:
RETURN_NOT_OK(options.ConfigureManagedIdentityCredential(client_id));
break;
case CredentialKind::kWorkloadIdentity:
RETURN_NOT_OK(options.ConfigureWorkloadIdentityCredential());
break;
}

return options;
}

Result<AzureOptions> AzureOptions::FromUri(const std::string& uri_string,
std::string* out_path) {
arrow::internal::Uri uri;
RETURN_NOT_OK(uri.Parse(uri_string));
return FromUri(uri, out_path);
}

bool AzureOptions::Equals(const AzureOptions& other) const {
// TODO(GH-38598): update here when more auth methods are added.
const bool equals = blob_storage_authority == other.blob_storage_authority &&
Expand Down
10 changes: 10 additions & 0 deletions cpp/src/arrow/filesystem/azurefs.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class DataLakeServiceClient;
namespace arrow::fs {

class TestAzureFileSystem;
class TestAzureOptions;

/// Options for the AzureFileSystem implementation.
///
Expand All @@ -59,6 +60,8 @@ class TestAzureFileSystem;
///
/// Functions are provided for explicit configuration of credentials if that is preferred.
struct ARROW_EXPORT AzureOptions {
friend class TestAzureOptions;

/// \brief The name of the Azure Storage Account being accessed.
///
/// All service URLs will be constructed using this storage account name.
Expand Down Expand Up @@ -123,6 +126,13 @@ struct ARROW_EXPORT AzureOptions {
AzureOptions();
~AzureOptions();

/// Initialize from URIs such as
/// "abfs://account.blob.core.windows.net/container/dir/blob" and
/// "abfs://[email protected]/dir/file".
felipecrv marked this conversation as resolved.
Show resolved Hide resolved
static Result<AzureOptions> FromUri(const arrow::internal::Uri& uri,
std::string* out_path);
static Result<AzureOptions> FromUri(const std::string& uri, std::string* out_path);

Status ConfigureDefaultCredential();
Status ConfigureAnonymousCredential();
Status ConfigureAccountKeyCredential(const std::string& account_key);
Expand Down
223 changes: 223 additions & 0 deletions cpp/src/arrow/filesystem/azurefs_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,229 @@ TEST(AzureFileSystem, OptionsCompare) {
EXPECT_TRUE(options.Equals(options));
}

class TestAzureOptions : public ::testing::Test {
public:
void TestFromUriBlobStorage() {
AzureOptions default_options;
std::string path;
ASSERT_OK_AND_ASSIGN(
auto options,
AzureOptions::FromUri("abfs://account.blob.core.windows.net/container/dir/blob",
&path));
ASSERT_EQ(options.account_name, "account");
ASSERT_EQ(options.blob_storage_authority, default_options.blob_storage_authority);
ASSERT_EQ(options.dfs_storage_authority, default_options.dfs_storage_authority);
ASSERT_EQ(options.blob_storage_scheme, default_options.blob_storage_scheme);
ASSERT_EQ(options.dfs_storage_scheme, default_options.dfs_storage_scheme);
ASSERT_EQ(options.credential_kind_, AzureOptions::CredentialKind::kDefault);
ASSERT_EQ(path, "container/dir/blob");
}

void TestFromUriBlobStorageEmptyContainer() {
ASSERT_RAISES(
Invalid, AzureOptions::FromUri("abfs://account.blob.core.windows.net/", nullptr));
}

void TestFromUriDfsStorage() {
AzureOptions default_options;
std::string path;
ASSERT_OK_AND_ASSIGN(
auto options,
AzureOptions::FromUri("abfs://[email protected]/dir/file",
&path));
ASSERT_EQ(options.account_name, "account");
ASSERT_EQ(options.blob_storage_authority, default_options.blob_storage_authority);
ASSERT_EQ(options.dfs_storage_authority, default_options.dfs_storage_authority);
ASSERT_EQ(options.blob_storage_scheme, default_options.blob_storage_scheme);
ASSERT_EQ(options.dfs_storage_scheme, default_options.dfs_storage_scheme);
ASSERT_EQ(options.credential_kind_, AzureOptions::CredentialKind::kDefault);
ASSERT_EQ(path, "file_system/dir/file");
}

void TestFromUriDfsStorageEmptyContainer() {
ASSERT_RAISES(Invalid, AzureOptions::FromUri(
"abfs://account.dfs.core.windows.net/dir/file", nullptr));
}

void TestFromUriAbfs() {
std::string path;
ASSERT_OK_AND_ASSIGN(
auto options,
AzureOptions::FromUri(
"abfs://account:[email protected]:10000/container/dir/blob", &path));
ASSERT_EQ(options.account_name, "account");
ASSERT_EQ(options.blob_storage_authority, "127.0.0.1:10000");
ASSERT_EQ(options.dfs_storage_authority, "127.0.0.1:10000");
ASSERT_EQ(options.blob_storage_scheme, "http");
ASSERT_EQ(options.dfs_storage_scheme, "http");
ASSERT_EQ(options.credential_kind_, AzureOptions::CredentialKind::kDefault);
ASSERT_EQ(path, "container/dir/blob");
}

void TestFromUriAbfsEmptyContainer() {
ASSERT_RAISES(Invalid, AzureOptions::FromUri(
"abfs://account:[email protected]:10000/", nullptr));
}

void TestFromUriAbfss() {
std::string path;
ASSERT_OK_AND_ASSIGN(
auto options,
AzureOptions::FromUri(
"abfss://account:[email protected]:10000/container/dir/blob", &path));
ASSERT_EQ(options.account_name, "account");
ASSERT_EQ(options.blob_storage_authority, "127.0.0.1:10000");
ASSERT_EQ(options.dfs_storage_authority, "127.0.0.1:10000");
ASSERT_EQ(options.blob_storage_scheme, "https");
ASSERT_EQ(options.dfs_storage_scheme, "https");
ASSERT_EQ(options.credential_kind_, AzureOptions::CredentialKind::kDefault);
ASSERT_EQ(path, "container/dir/blob");
}

void TestFromUriCredentialDefault() {
ASSERT_OK_AND_ASSIGN(
auto options,
AzureOptions::FromUri("abfs://account.blob.core.windows.net/container/dir/blob?"
"credential_kind=default",
nullptr));
ASSERT_EQ(options.credential_kind_, AzureOptions::CredentialKind::kDefault);
}

void TestFromUriCredentialAnonymous() {
ASSERT_OK_AND_ASSIGN(
auto options,
AzureOptions::FromUri("abfs://account.blob.core.windows.net/container/dir/blob?"
"credential_kind=anonymous",
nullptr));
ASSERT_EQ(options.credential_kind_, AzureOptions::CredentialKind::kAnonymous);
}

void TestFromUriCredentialStorageSharedKey() {
ASSERT_OK_AND_ASSIGN(
auto options,
AzureOptions::FromUri("abfs://account.blob.core.windows.net/container/dir/blob?"
"credential_kind=storage_shared_key",
nullptr));
ASSERT_EQ(options.credential_kind_, AzureOptions::CredentialKind::kStorageSharedKey);
}

void TestFromUriCredentialClientSecret() {
ASSERT_OK_AND_ASSIGN(
auto options,
AzureOptions::FromUri("abfs://account.blob.core.windows.net/container/dir/blob?"
"credential_kind=client_secret&"
"tenant_id=tenant-id&"
"client_id=client-id&"
"client_secret=client-secret",
nullptr));
ASSERT_EQ(options.credential_kind_, AzureOptions::CredentialKind::kClientSecret);
}

void TestFromUriCredentialManagedIdentity() {
ASSERT_OK_AND_ASSIGN(
auto options,
AzureOptions::FromUri("abfs://account.blob.core.windows.net/container/dir/blob?"
"credential_kind=managed_identity&"
"client_id=client-id",
nullptr));
ASSERT_EQ(options.credential_kind_, AzureOptions::CredentialKind::kManagedIdentity);
}

void TestFromUriCredentialWorkloadIdentity() {
ASSERT_OK_AND_ASSIGN(
auto options,
AzureOptions::FromUri("abfs://account.blob.core.windows.net/container/dir/blob?"
"credential_kind=workload_identity",
nullptr));
ASSERT_EQ(options.credential_kind_, AzureOptions::CredentialKind::kWorkloadIdentity);
}

void TestFromUriCredentialInvalid() {
ASSERT_RAISES(Invalid, AzureOptions::FromUri(
"abfs://[email protected]/dir/file?"
"credential_kind=invalid",
nullptr));
}
void TestFromUriBlobStorageAuthority() {
ASSERT_OK_AND_ASSIGN(
auto options,
AzureOptions::FromUri("abfs://account.blob.core.windows.net/container/dir/blob?"
"blob_storage_authority=.blob.local",
nullptr));
ASSERT_EQ(options.blob_storage_authority, ".blob.local");
}

void TestFromUriDfsStorageAuthority() {
ASSERT_OK_AND_ASSIGN(
auto options,
AzureOptions::FromUri("abfs://[email protected]/dir/file?"
"dfs_storage_authority=.dfs.local",
nullptr));
ASSERT_EQ(options.dfs_storage_authority, ".dfs.local");
}

void TestFromUriBlobStorageScheme() {
ASSERT_OK_AND_ASSIGN(
auto options,
AzureOptions::FromUri("abfs://account.blob.core.windows.net/container/dir/blob?"
"blob_storage_scheme=http",
nullptr));
ASSERT_EQ(options.blob_storage_scheme, "http");
}

void TestFromUriDfsStorageScheme() {
ASSERT_OK_AND_ASSIGN(
auto options,
AzureOptions::FromUri("abfs://[email protected]/dir/file?"
"dfs_storage_scheme=http",
nullptr));
ASSERT_EQ(options.dfs_storage_scheme, "http");
}

void TestFromUriInvalidQueryParameter() {
ASSERT_RAISES(Invalid, AzureOptions::FromUri(
"abfs://[email protected]/dir/file?"
"unknown=invalid",
nullptr));
}
};

TEST_F(TestAzureOptions, FromUriBlobStorage) { TestFromUriBlobStorage(); }
TEST_F(TestAzureOptions, FromUriBlobStorageEmptyContainer) {
TestFromUriBlobStorageEmptyContainer();
}
TEST_F(TestAzureOptions, FromUriDfsStorage) { TestFromUriDfsStorage(); }
TEST_F(TestAzureOptions, FromUriDfsStorageEmptyContainer) {
TestFromUriDfsStorageEmptyContainer();
}
TEST_F(TestAzureOptions, FromUriAbfs) { TestFromUriAbfs(); }
TEST_F(TestAzureOptions, FromUriAbfsEmptyContainer) { TestFromUriAbfsEmptyContainer(); }
TEST_F(TestAzureOptions, FromUriAbfss) { TestFromUriAbfss(); }
TEST_F(TestAzureOptions, FromUriCredentialDefault) { TestFromUriCredentialDefault(); }
TEST_F(TestAzureOptions, FromUriCredentialAnonymous) { TestFromUriCredentialAnonymous(); }
TEST_F(TestAzureOptions, FromUriCredentialStorageSharedKey) {
TestFromUriCredentialStorageSharedKey();
}
TEST_F(TestAzureOptions, FromUriCredentialClientSecret) {
TestFromUriCredentialClientSecret();
}
TEST_F(TestAzureOptions, FromUriCredentialManagedIdentity) {
TestFromUriCredentialManagedIdentity();
}
TEST_F(TestAzureOptions, FromUriCredentialWorkloadIdentity) {
TestFromUriCredentialWorkloadIdentity();
}
TEST_F(TestAzureOptions, FromUriCredentialInvalid) { TestFromUriCredentialInvalid(); }
TEST_F(TestAzureOptions, FromUriBlobStorageAuthority) {
TestFromUriBlobStorageAuthority();
}
TEST_F(TestAzureOptions, FromUriDfsStorageAuthority) { TestFromUriDfsStorageAuthority(); }
TEST_F(TestAzureOptions, FromUriBlobStorageScheme) { TestFromUriBlobStorageScheme(); }
TEST_F(TestAzureOptions, FromUriDfsStorageScheme) { TestFromUriDfsStorageScheme(); }
TEST_F(TestAzureOptions, FromUriInvalidQueryParameter) {
TestFromUriInvalidQueryParameter();
}

struct PreexistingData {
public:
using RNG = random::pcg32_fast;
Expand Down
Loading
Loading