Skip to content

Commit

Permalink
Paste in OpenOutputStream and OpenAppendStream from apache#12914
Browse files Browse the repository at this point in the history
  • Loading branch information
Tom-Newton committed Nov 15, 2023
1 parent 6fe6730 commit 76a2688
Showing 1 changed file with 122 additions and 2 deletions.
124 changes: 122 additions & 2 deletions cpp/src/arrow/filesystem/azurefs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
#include "arrow/buffer.h"
#include "arrow/filesystem/path_util.h"
#include "arrow/filesystem/util_internal.h"
#include "arrow/io/interfaces.h"
#include "arrow/io/util_internal.h"
#include "arrow/result.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/formatting.h"
Expand Down Expand Up @@ -755,6 +755,126 @@ class AzureFileSystem::Impl {
RETURN_NOT_OK(ptr->Init());
return ptr;
}

Result<std::shared_ptr<ObjectOutputStream>> OpenOutputStream(
const std::string& s, const std::shared_ptr<const KeyValueMetadata>& metadata,
AzureBlobFileSystem* fs) {
ARROW_ASSIGN_OR_RAISE(auto path, AzurePath::FromString(s));

if (path.empty() || path.path_to_file.empty()) {
return ::arrow::fs::internal::PathNotFound(path.full_path);
}
std::string endpoint_url = dfs_endpoint_url_;
if (!is_hierarchical_namespace_enabled_) {
if (path.path_to_file_parts.size() > 1) {
return Status::IOError(
"Invalid path provided,"
" hierarchical namespace not enabled");
}
endpoint_url = blob_endpoint_url_;
}
ARROW_ASSIGN_OR_RAISE(auto response, DirExists(dfs_endpoint_url_ + path.full_path));
if (response) {
return ::arrow::fs::internal::PathNotFound(path.full_path);
}
std::shared_ptr<Azure::Storage::Files::DataLake::DataLakeFileClient> file_client;
ARROW_ASSIGN_OR_RAISE(
file_client,
InitPathClient<Azure::Storage::Files::DataLake::DataLakeFileClient>(
options_, endpoint_url + path.full_path, path.container, path.path_to_file));

std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient> blob_client;
ARROW_ASSIGN_OR_RAISE(
blob_client,
InitPathClient<Azure::Storage::Blobs::BlockBlobClient>(
options_, endpoint_url + path.full_path, path.container, path.path_to_file));

if (path.has_parent()) {
AzurePath parent_path = path.parent();
if (parent_path.path_to_file.empty()) {
ARROW_ASSIGN_OR_RAISE(response, ContainerExists(parent_path.container));
if (!response) {
return Status::IOError("Cannot write to file '", path.full_path,
"': parent directory does not exist");
}
} else {
ARROW_ASSIGN_OR_RAISE(response,
DirExists(dfs_endpoint_url_ + parent_path.full_path));
if (!response) {
return Status::IOError("Cannot write to file '", path.full_path,
"': parent directory does not exist");
}
}
}
auto ptr = std::make_shared<ObjectOutputStream>(file_client, blob_client,
is_hierarchical_namespace_enabled_,
fs->io_context(), path, metadata);
RETURN_NOT_OK(ptr->Init());
return ptr;
}

Result<std::shared_ptr<ObjectAppendStream>> OpenAppendStream(
const std::string& s, const std::shared_ptr<const KeyValueMetadata>& metadata,
AzureBlobFileSystem* fs) {
ARROW_ASSIGN_OR_RAISE(auto path, AzurePath::FromString(s));

if (path.empty() || path.path_to_file.empty()) {
return ::arrow::fs::internal::PathNotFound(path.full_path);
}
std::string endpoint_url = dfs_endpoint_url_;
if (!is_hierarchical_namespace_enabled_) {
if (path.path_to_file_parts.size() > 1) {
return Status::IOError(
"Invalid Azure Blob Storage path provided,"
" hierarchical namespace not enabled in storage account");
}
endpoint_url = blob_endpoint_url_;
}
ARROW_ASSIGN_OR_RAISE(auto response, DirExists(dfs_endpoint_url_ + path.full_path));
if (response) {
return ::arrow::fs::internal::PathNotFound(path.full_path);
}
std::shared_ptr<Azure::Storage::Files::DataLake::DataLakePathClient> path_client;
ARROW_ASSIGN_OR_RAISE(
path_client,
InitPathClient<Azure::Storage::Files::DataLake::DataLakePathClient>(
options_, endpoint_url + path.full_path, path.container, path.path_to_file));

std::shared_ptr<Azure::Storage::Files::DataLake::DataLakeFileClient> file_client;
ARROW_ASSIGN_OR_RAISE(
file_client,
InitPathClient<Azure::Storage::Files::DataLake::DataLakeFileClient>(
options_, endpoint_url + path.full_path, path.container, path.path_to_file));

std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient> blob_client;
ARROW_ASSIGN_OR_RAISE(
blob_client,
InitPathClient<Azure::Storage::Blobs::BlockBlobClient>(
options_, endpoint_url + path.full_path, path.container, path.path_to_file));

if (path.has_parent()) {
AzurePath parent_path = path.parent();
if (parent_path.path_to_file.empty()) {
ARROW_ASSIGN_OR_RAISE(response, ContainerExists(parent_path.container));
if (!response) {
return Status::IOError("Cannot write to file '", path.full_path,
"': parent directory does not exist");
}
} else {
ARROW_ASSIGN_OR_RAISE(response,
DirExists(dfs_endpoint_url_ + parent_path.full_path));
if (!response) {
return Status::IOError("Cannot write to file '", path.full_path,
"': parent directory does not exist");
}
}
}
auto ptr = std::make_shared<ObjectAppendStream>(path_client, file_client, blob_client,
is_hierarchical_namespace_enabled_,
fs->io_context(), path, metadata);
RETURN_NOT_OK(ptr->Init());
return ptr;
}
};

const AzureOptions& AzureFileSystem::options() const { return impl_->options(); }
Expand Down Expand Up @@ -829,7 +949,7 @@ Result<std::shared_ptr<io::RandomAccessFile>> AzureFileSystem::OpenInputFile(

Result<std::shared_ptr<io::OutputStream>> AzureFileSystem::OpenOutputStream(
const std::string& path, const std::shared_ptr<const KeyValueMetadata>& metadata) {
return Status::NotImplemented("The Azure FileSystem is not fully implemented");
// return
}

Result<std::shared_ptr<io::OutputStream>> AzureFileSystem::OpenAppendStream(
Expand Down

0 comments on commit 76a2688

Please sign in to comment.