From 76a26881421ae177782378b440f2771f19f9ece1 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Wed, 15 Nov 2023 09:02:47 +0000 Subject: [PATCH] Paste in OpenOutputStream and OpenAppendStream from #12914 --- cpp/src/arrow/filesystem/azurefs.cc | 124 +++++++++++++++++++++++++++- 1 file changed, 122 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/filesystem/azurefs.cc b/cpp/src/arrow/filesystem/azurefs.cc index 9a97efe26fefb..bc25f0a01a10d 100644 --- a/cpp/src/arrow/filesystem/azurefs.cc +++ b/cpp/src/arrow/filesystem/azurefs.cc @@ -24,7 +24,7 @@ #include "arrow/buffer.h" #include "arrow/filesystem/path_util.h" #include "arrow/filesystem/util_internal.h" -#include "arrow/io/interfaces.h" +#include "arrow/io/util_internal.h" #include "arrow/result.h" #include "arrow/util/checked_cast.h" #include "arrow/util/formatting.h" @@ -755,6 +755,126 @@ class AzureFileSystem::Impl { RETURN_NOT_OK(ptr->Init()); return ptr; } + + Result> OpenOutputStream( + const std::string& s, const std::shared_ptr& metadata, + AzureBlobFileSystem* fs) { + ARROW_ASSIGN_OR_RAISE(auto path, AzurePath::FromString(s)); + + if (path.empty() || path.path_to_file.empty()) { + return ::arrow::fs::internal::PathNotFound(path.full_path); + } + std::string endpoint_url = dfs_endpoint_url_; + if (!is_hierarchical_namespace_enabled_) { + if (path.path_to_file_parts.size() > 1) { + return Status::IOError( + "Invalid path provided," + " hierarchical namespace not enabled"); + } + endpoint_url = blob_endpoint_url_; + } + ARROW_ASSIGN_OR_RAISE(auto response, DirExists(dfs_endpoint_url_ + path.full_path)); + if (response) { + return ::arrow::fs::internal::PathNotFound(path.full_path); + } + std::shared_ptr file_client; + ARROW_ASSIGN_OR_RAISE( + file_client, + InitPathClient( + options_, endpoint_url + path.full_path, path.container, path.path_to_file)); + + std::shared_ptr blob_client; + ARROW_ASSIGN_OR_RAISE( + blob_client, + InitPathClient( + options_, endpoint_url + path.full_path, path.container, path.path_to_file)); + + if (path.has_parent()) { + AzurePath parent_path = path.parent(); + if (parent_path.path_to_file.empty()) { + ARROW_ASSIGN_OR_RAISE(response, ContainerExists(parent_path.container)); + if (!response) { + return Status::IOError("Cannot write to file '", path.full_path, + "': parent directory does not exist"); + } + } else { + ARROW_ASSIGN_OR_RAISE(response, + DirExists(dfs_endpoint_url_ + parent_path.full_path)); + if (!response) { + return Status::IOError("Cannot write to file '", path.full_path, + "': parent directory does not exist"); + } + } + } + auto ptr = std::make_shared(file_client, blob_client, + is_hierarchical_namespace_enabled_, + fs->io_context(), path, metadata); + RETURN_NOT_OK(ptr->Init()); + return ptr; + } + + Result> OpenAppendStream( + const std::string& s, const std::shared_ptr& metadata, + AzureBlobFileSystem* fs) { + ARROW_ASSIGN_OR_RAISE(auto path, AzurePath::FromString(s)); + + if (path.empty() || path.path_to_file.empty()) { + return ::arrow::fs::internal::PathNotFound(path.full_path); + } + std::string endpoint_url = dfs_endpoint_url_; + if (!is_hierarchical_namespace_enabled_) { + if (path.path_to_file_parts.size() > 1) { + return Status::IOError( + "Invalid Azure Blob Storage path provided," + " hierarchical namespace not enabled in storage account"); + } + endpoint_url = blob_endpoint_url_; + } + ARROW_ASSIGN_OR_RAISE(auto response, DirExists(dfs_endpoint_url_ + path.full_path)); + if (response) { + return ::arrow::fs::internal::PathNotFound(path.full_path); + } + std::shared_ptr path_client; + ARROW_ASSIGN_OR_RAISE( + path_client, + InitPathClient( + options_, endpoint_url + path.full_path, path.container, path.path_to_file)); + + std::shared_ptr file_client; + ARROW_ASSIGN_OR_RAISE( + file_client, + InitPathClient( + options_, endpoint_url + path.full_path, path.container, path.path_to_file)); + + std::shared_ptr blob_client; + ARROW_ASSIGN_OR_RAISE( + blob_client, + InitPathClient( + options_, endpoint_url + path.full_path, path.container, path.path_to_file)); + + if (path.has_parent()) { + AzurePath parent_path = path.parent(); + if (parent_path.path_to_file.empty()) { + ARROW_ASSIGN_OR_RAISE(response, ContainerExists(parent_path.container)); + if (!response) { + return Status::IOError("Cannot write to file '", path.full_path, + "': parent directory does not exist"); + } + } else { + ARROW_ASSIGN_OR_RAISE(response, + DirExists(dfs_endpoint_url_ + parent_path.full_path)); + if (!response) { + return Status::IOError("Cannot write to file '", path.full_path, + "': parent directory does not exist"); + } + } + } + auto ptr = std::make_shared(path_client, file_client, blob_client, + is_hierarchical_namespace_enabled_, + fs->io_context(), path, metadata); + RETURN_NOT_OK(ptr->Init()); + return ptr; + } }; const AzureOptions& AzureFileSystem::options() const { return impl_->options(); } @@ -829,7 +949,7 @@ Result> AzureFileSystem::OpenInputFile( Result> AzureFileSystem::OpenOutputStream( const std::string& path, const std::shared_ptr& metadata) { - return Status::NotImplemented("The Azure FileSystem is not fully implemented"); + // return } Result> AzureFileSystem::OpenAppendStream(