Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-37511: [C++] Implement file reads for Azure filesystem #38269

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
dbbaf92
Paste in `AzurePath` and `ObjectInputFile` from #12914
Tom-Newton Sep 29, 2023
1db80b3
Paste in TestAzureFileSystem and an example test (TestAzureFileSystem…
Tom-Newton Sep 29, 2023
81d1523
Paste in input file test cases from gscfs_test.cc
Tom-Newton Sep 29, 2023
9e31d1a
Minimal changes for successful build
Tom-Newton Sep 29, 2023
5e4803b
Paste in ConfigureAccountKeyCredentials from #12914
Tom-Newton Sep 29, 2023
cc552bf
TestAzureFileSystem builds successfully
Tom-Newton Sep 29, 2023
8874fbe
Paste in OpenInputFile from #12914
Tom-Newton Sep 29, 2023
7402412
First test builds successfully
Tom-Newton Oct 1, 2023
8d574dc
First ReadAt test passes
Tom-Newton Oct 1, 2023
07947f3
Majority of tests working
Tom-Newton Oct 3, 2023
bb8421f
Implement open file from info and enable the relevant tests
Tom-Newton Oct 3, 2023
e8cde8f
Add OpenInputStream implementation
Tom-Newton Oct 14, 2023
0a693c8
Paste in input stream tests from gcsfd_test.cc
Tom-Newton Oct 14, 2023
8ff5684
Fix input stream tests
Tom-Newton Oct 14, 2023
bd85d5a
Fix metadata test
Tom-Newton Oct 15, 2023
6cb904f
Use basic blob client for tests
Tom-Newton Oct 15, 2023
3ff7051
Rename file_client -> blob_client
Tom-Newton Oct 15, 2023
e70d01d
Fix implementation to pass OpenInputStreamUri test
Tom-Newton Oct 15, 2023
00b8139
Adjust some error handling
Tom-Newton Oct 15, 2023
b14b83e
Tidy tests
Tom-Newton Oct 15, 2023
918c68d
Make AzurePath consistent with changes from #11997
Tom-Newton Oct 15, 2023
f696aee
Tidy path validation
Tom-Newton Oct 15, 2023
98e019c
Remote un-needed includes for datalake client
Tom-Newton Oct 15, 2023
0c58509
Remove one of the placeholder tests
Tom-Newton Oct 15, 2023
8505e93
Tidy
Tom-Newton Oct 15, 2023
2b048ad
Better error messges
Tom-Newton Oct 15, 2023
7fcf9db
Remove unnecessary move
Tom-Newton Oct 15, 2023
cffc9be
Improve compliance with style guide
Tom-Newton Oct 15, 2023
08e5206
Make open file consistent with #13577
Tom-Newton Oct 16, 2023
c3405f5
Tidy
Tom-Newton Oct 16, 2023
cad62df
PR comments
Tom-Newton Oct 18, 2023
10c25e2
Reference issue for better metadata response
Tom-Newton Oct 18, 2023
b9f1eaf
Avoid unnecessary copying of AzurePath
Tom-Newton Oct 18, 2023
835e6ab
Better status message for invalid path
Tom-Newton Oct 18, 2023
8e9a985
Remove unnecessary and dangerous path string parsing
Tom-Newton Oct 18, 2023
7f329cc
Update cpp/src/arrow/filesystem/azurefs_test.cc
Tom-Newton Oct 18, 2023
13929d8
Avoid designated initializers
Tom-Newton Oct 18, 2023
f652a4f
Make another test more concise
Tom-Newton Oct 18, 2023
26d425e
Fix a clang build warning
Tom-Newton Oct 18, 2023
13c0d1c
Reference follow up github issues in TODO comments
Tom-Newton Oct 18, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
335 changes: 318 additions & 17 deletions cpp/src/arrow/filesystem/azurefs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,17 @@

#include "arrow/filesystem/azurefs.h"

#include <azure/identity/default_azure_credential.hpp>
#include <azure/storage/blobs.hpp>

#include "arrow/buffer.h"
#include "arrow/filesystem/path_util.h"
#include "arrow/filesystem/util_internal.h"
#include "arrow/result.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/future.h"
#include "arrow/util/key_value_metadata.h"
#include "arrow/util/logging.h"
#include "arrow/util/string.h"

namespace arrow {
namespace fs {
Expand All @@ -37,34 +43,329 @@ bool AzureOptions::Equals(const AzureOptions& other) const {
credentials_kind == other.credentials_kind);
}

Status AzureOptions::ConfigureAccountKeyCredentials(const std::string& account_name,
const std::string& account_key) {
if (this->backend == AzureBackend::Azurite) {
account_blob_url = "http://127.0.0.1:10000/" + account_name + "/";
account_dfs_url = "http://127.0.0.1:10000/" + account_name + "/";
} else {
account_dfs_url = "https://" + account_name + ".dfs.core.windows.net/";
account_blob_url = "https://" + account_name + ".blob.core.windows.net/";
}
storage_credentials_provider =
std::make_shared<Azure::Storage::StorageSharedKeyCredential>(account_name,
account_key);
credentials_kind = AzureCredentialsKind::StorageCredentials;
return Status::OK();
}
namespace {

// An AzureFileSystem represents a single Azure storage account. AzurePath describes a
// container and path within that storage account.
struct AzurePath {
std::string full_path;
std::string container;
std::string path_to_file;
std::vector<std::string> path_to_file_parts;
Tom-Newton marked this conversation as resolved.
Show resolved Hide resolved

static Result<AzurePath> FromString(const std::string& s) {
// Example expected string format: testcontainer/testdir/testfile.txt
// container = testcontainer
// path_to_file = testdir/testfile.txt
// path_to_file_parts = [testdir, testfile.txt]
if (internal::IsLikelyUri(s)) {
return Status::Invalid(
"Expected an Azure object path of the form 'container/path...', got a URI: '",
s, "'");
}
const auto src = internal::RemoveTrailingSlash(s);
auto first_sep = src.find_first_of(internal::kSep);
if (first_sep == 0) {
return Status::Invalid("Path cannot start with a separator ('", s, "')");
}
if (first_sep == std::string::npos) {
return AzurePath{std::string(src), std::string(src), "", {}};
}
AzurePath path;
path.full_path = std::string(src);
path.container = std::string(src.substr(0, first_sep));
path.path_to_file = std::string(src.substr(first_sep + 1));
path.path_to_file_parts = internal::SplitAbstractPath(path.path_to_file);
RETURN_NOT_OK(Validate(path));
return path;
}

static Status Validate(const AzurePath& path) {
auto status = internal::ValidateAbstractPathParts(path.path_to_file_parts);
if (!status.ok()) {
return Status::Invalid(status.message(), " in path ", path.full_path);
} else {
return status;
}
}

AzurePath parent() const {
DCHECK(has_parent());
auto parent = AzurePath{"", container, "", path_to_file_parts};
parent.path_to_file_parts.pop_back();
parent.path_to_file = internal::JoinAbstractPath(parent.path_to_file_parts);
if (parent.path_to_file.empty()) {
parent.full_path = parent.container;
} else {
parent.full_path = parent.container + internal::kSep + parent.path_to_file;
}
return parent;
}

bool has_parent() const { return !path_to_file.empty(); }

bool empty() const { return container.empty() && path_to_file.empty(); }

bool operator==(const AzurePath& other) const {
return container == other.container && path_to_file == other.path_to_file;
}
};

Status PathNotFound(const AzurePath& path) {
return ::arrow::fs::internal::PathNotFound(path.full_path);
}

Status NotAFile(const AzurePath& path) {
return ::arrow::fs::internal::NotAFile(path.full_path);
}

Status ValidateFilePath(const AzurePath& path) {
if (path.container.empty()) {
return PathNotFound(path);
}

if (path.path_to_file.empty()) {
return NotAFile(path);
}
return Status::OK();
}
Tom-Newton marked this conversation as resolved.
Show resolved Hide resolved

Status ErrorToStatus(const std::string& prefix,
const Azure::Storage::StorageException& exception) {
return Status::IOError(prefix, " Azure Error: ", exception.what());
}

template <typename ObjectResult>
std::shared_ptr<const KeyValueMetadata> GetObjectMetadata(const ObjectResult& result) {
auto md = std::make_shared<KeyValueMetadata>();
for (auto prop : result) {
md->Append(prop.first, prop.second);
}
return md;
}

class ObjectInputFile final : public io::RandomAccessFile {
public:
ObjectInputFile(std::shared_ptr<Azure::Storage::Blobs::BlobClient> blob_client,
const io::IOContext& io_context, AzurePath path, int64_t size = kNoSize)
: blob_client_(std::move(blob_client)),
io_context_(io_context),
path_(std::move(path)),
content_length_(size) {}

Status Init() {
if (content_length_ != kNoSize) {
DCHECK_GE(content_length_, 0);
return Status::OK();
}
try {
auto properties = blob_client_->GetProperties();
content_length_ = properties.Value.BlobSize;
metadata_ = GetObjectMetadata(properties.Value.Metadata);
return Status::OK();
} catch (const Azure::Storage::StorageException& exception) {
if (exception.StatusCode == Azure::Core::Http::HttpStatusCode::NotFound) {
// Could be either container or blob not found.
return PathNotFound(path_);
}
return ErrorToStatus(
"When fetching properties for '" + blob_client_->GetUrl() + "': ", exception);
}
}

Status CheckClosed(const char* action) const {
if (closed_) {
return Status::Invalid("Cannot ", action, " on closed file.");
}
return Status::OK();
}

Status CheckPosition(int64_t position, const char* action) const {
DCHECK_GE(content_length_, 0);
if (position < 0) {
return Status::Invalid("Cannot ", action, " from negative position");
}
if (position > content_length_) {
return Status::IOError("Cannot ", action, " past end of file");
Tom-Newton marked this conversation as resolved.
Show resolved Hide resolved
}
return Status::OK();
}

// RandomAccessFile APIs

Result<std::shared_ptr<const KeyValueMetadata>> ReadMetadata() override {
return metadata_;
}

Future<std::shared_ptr<const KeyValueMetadata>> ReadMetadataAsync(
const io::IOContext& io_context) override {
return metadata_;
}

Status Close() override {
blob_client_ = nullptr;
closed_ = true;
return Status::OK();
}

bool closed() const override { return closed_; }

Result<int64_t> Tell() const override {
RETURN_NOT_OK(CheckClosed("tell"));
return pos_;
}

Result<int64_t> GetSize() override {
RETURN_NOT_OK(CheckClosed("size"));
return content_length_;
}

Status Seek(int64_t position) override {
RETURN_NOT_OK(CheckClosed("seek"));
RETURN_NOT_OK(CheckPosition(position, "seek"));

pos_ = position;
return Status::OK();
}

Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) override {
RETURN_NOT_OK(CheckClosed("read"));
RETURN_NOT_OK(CheckPosition(position, "read"));

nbytes = std::min(nbytes, content_length_ - position);
if (nbytes == 0) {
return 0;
}

// Read the desired range of bytes
Azure::Core::Http::HttpRange range{position, nbytes};
Azure::Storage::Blobs::DownloadBlobToOptions download_options;
download_options.Range = range;
try {
return blob_client_
->DownloadTo(reinterpret_cast<uint8_t*>(out), nbytes, download_options)
.Value.ContentRange.Length.Value();
} catch (const Azure::Storage::StorageException& exception) {
return ErrorToStatus("When reading from '" + blob_client_->GetUrl() +
"' at position " + std::to_string(position) + " for " +
std::to_string(nbytes) + " bytes: ",
exception);
}
}

Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes) override {
RETURN_NOT_OK(CheckClosed("read"));
RETURN_NOT_OK(CheckPosition(position, "read"));

// No need to allocate more than the remaining number of bytes
nbytes = std::min(nbytes, content_length_ - position);

ARROW_ASSIGN_OR_RAISE(auto buffer,
AllocateResizableBuffer(nbytes, io_context_.pool()));
if (nbytes > 0) {
ARROW_ASSIGN_OR_RAISE(int64_t bytes_read,
ReadAt(position, nbytes, buffer->mutable_data()));
DCHECK_LE(bytes_read, nbytes);
RETURN_NOT_OK(buffer->Resize(bytes_read));
}
return std::move(buffer);
}

Result<int64_t> Read(int64_t nbytes, void* out) override {
ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, ReadAt(pos_, nbytes, out));
pos_ += bytes_read;
return bytes_read;
}

Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) override {
ARROW_ASSIGN_OR_RAISE(auto buffer, ReadAt(pos_, nbytes));
pos_ += buffer->size();
return std::move(buffer);
}

private:
std::shared_ptr<Azure::Storage::Blobs::BlobClient> blob_client_;
const io::IOContext io_context_;
AzurePath path_;

bool closed_ = false;
int64_t pos_ = 0;
int64_t content_length_ = kNoSize;
std::shared_ptr<const KeyValueMetadata> metadata_;
};

} // namespace

// -----------------------------------------------------------------------
// AzureFilesystem Implementation

class AzureFileSystem::Impl {
public:
io::IOContext io_context_;
bool is_hierarchical_namespace_enabled_;
std::shared_ptr<Azure::Storage::Blobs::BlobServiceClient> service_client_;
AzureOptions options_;

explicit Impl(AzureOptions options, io::IOContext io_context)
: io_context_(io_context), options_(std::move(options)) {}

Status Init() {
// TODO: GH-18014 Delete this once we have a proper implementation. This just
// initializes a pointless Azure blob service client with a fake endpoint to ensure
// the build will fail if the Azure SDK build is broken.
auto default_credential = std::make_shared<Azure::Identity::DefaultAzureCredential>();
auto service_client = Azure::Storage::Blobs::BlobServiceClient(
"http://fake-blob-storage-endpoint", default_credential);
if (options_.backend == AzureBackend::Azurite) {
// gen1Client_->GetAccountInfo().Value.IsHierarchicalNamespaceEnabled
// throws error in azurite
is_hierarchical_namespace_enabled_ = false;
}
service_client_ = std::make_shared<Azure::Storage::Blobs::BlobServiceClient>(
options_.account_blob_url, options_.storage_credentials_provider);
return Status::OK();
}

const AzureOptions& options() const { return options_; }

Result<std::shared_ptr<ObjectInputFile>> OpenInputFile(const std::string& s,
AzureFileSystem* fs) {
ARROW_RETURN_NOT_OK(internal::AssertNoTrailingSlash(s));
ARROW_ASSIGN_OR_RAISE(auto path, AzurePath::FromString(s));
RETURN_NOT_OK(ValidateFilePath(path));
auto blob_client = std::make_shared<Azure::Storage::Blobs::BlobClient>(
service_client_->GetBlobContainerClient(path.container)
.GetBlobClient(path.path_to_file));

auto ptr =
std::make_shared<ObjectInputFile>(blob_client, fs->io_context(), std::move(path));
RETURN_NOT_OK(ptr->Init());
return ptr;
}

Result<std::shared_ptr<ObjectInputFile>> OpenInputFile(const FileInfo& info,
AzureFileSystem* fs) {
ARROW_RETURN_NOT_OK(internal::AssertNoTrailingSlash(info.path()));
if (info.type() == FileType::NotFound) {
return ::arrow::fs::internal::PathNotFound(info.path());
}
if (info.type() != FileType::File && info.type() != FileType::Unknown) {
return ::arrow::fs::internal::NotAFile(info.path());
}
ARROW_ASSIGN_OR_RAISE(auto path, AzurePath::FromString(info.path()));
RETURN_NOT_OK(ValidateFilePath(path));
auto blob_client = std::make_shared<Azure::Storage::Blobs::BlobClient>(
service_client_->GetBlobContainerClient(path.container)
.GetBlobClient(path.path_to_file));

auto ptr = std::make_shared<ObjectInputFile>(blob_client, fs->io_context(),
std::move(path), info.size());
RETURN_NOT_OK(ptr->Init());
return ptr;
}
};

const AzureOptions& AzureFileSystem::options() const { return impl_->options(); }
Expand Down Expand Up @@ -118,22 +419,22 @@ Status AzureFileSystem::CopyFile(const std::string& src, const std::string& dest

Result<std::shared_ptr<io::InputStream>> AzureFileSystem::OpenInputStream(
const std::string& path) {
return Status::NotImplemented("The Azure FileSystem is not fully implemented");
return impl_->OpenInputFile(path, this);
}

Result<std::shared_ptr<io::InputStream>> AzureFileSystem::OpenInputStream(
const FileInfo& info) {
return Status::NotImplemented("The Azure FileSystem is not fully implemented");
return impl_->OpenInputFile(info, this);
}

Result<std::shared_ptr<io::RandomAccessFile>> AzureFileSystem::OpenInputFile(
const std::string& path) {
return Status::NotImplemented("The Azure FileSystem is not fully implemented");
return impl_->OpenInputFile(path, this);
}

Result<std::shared_ptr<io::RandomAccessFile>> AzureFileSystem::OpenInputFile(
const FileInfo& info) {
return Status::NotImplemented("The Azure FileSystem is not fully implemented");
return impl_->OpenInputFile(info, this);
}

Result<std::shared_ptr<io::OutputStream>> AzureFileSystem::OpenOutputStream(
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/arrow/filesystem/azurefs.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ struct ARROW_EXPORT AzureOptions {

AzureOptions();

Status ConfigureAccountKeyCredentials(const std::string& account_name,
const std::string& account_key);

bool Equals(const AzureOptions& other) const;
};

Expand Down
Loading
Loading