From 282d9054a8aadc7f094a0428fe6ca00f0370c883 Mon Sep 17 00:00:00 2001 From: Sutou Kouhei Date: Sat, 2 Dec 2023 05:43:12 +0900 Subject: [PATCH] GH-38701: [C++][FS][Azure] Implement `DeleteDirContents()` (#38888) ### Rationale for this change `DeleteDirContents()` deletes the given directory contents recursively like other filesystem implementations. Azure file system treats the following cases as root directory: * Empty container * Empty path ### What changes are included in this PR? List and delete approach is used with/without hierarchical namespace support. Because Azure doesn't provide "DeleteDirContents" API with hierarchical namespace support. We may want to use delete the given directory and create an empty approach instead of list and delete approach for performance but it may not be acceptable with some use cases. ### Are these changes tested? Yes. ### Are there any user-facing changes? Yes. * Closes: #38701 Authored-by: Sutou Kouhei Signed-off-by: Sutou Kouhei --- cpp/src/arrow/filesystem/azurefs.cc | 183 ++++++++++++++++------- cpp/src/arrow/filesystem/azurefs_test.cc | 105 +++++++++++++ 2 files changed, 232 insertions(+), 56 deletions(-) diff --git a/cpp/src/arrow/filesystem/azurefs.cc b/cpp/src/arrow/filesystem/azurefs.cc index 4dde275da135f..ecc8d06f97567 100644 --- a/cpp/src/arrow/filesystem/azurefs.cc +++ b/cpp/src/arrow/filesystem/azurefs.cc @@ -970,6 +970,81 @@ class AzureFileSystem::Impl { return stream; } + private: + Status DeleteDirContentsWihtoutHierarchicalNamespace(const AzureLocation& location, + bool missing_dir_ok) { + auto container_client = + blob_service_client_->GetBlobContainerClient(location.container); + Azure::Storage::Blobs::ListBlobsOptions options; + if (!location.path.empty()) { + options.Prefix = internal::EnsureTrailingSlash(location.path); + } + // https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch#remarks + // + // Only supports up to 256 subrequests in a single batch. The + // size of the body for a batch request can't exceed 4 MB. + const int32_t kNumMaxRequestsInBatch = 256; + options.PageSizeHint = kNumMaxRequestsInBatch; + try { + auto list_response = container_client.ListBlobs(options); + if (!missing_dir_ok && list_response.Blobs.empty()) { + return PathNotFound(location); + } + for (; list_response.HasPage(); list_response.MoveToNextPage()) { + if (list_response.Blobs.empty()) { + continue; + } + auto batch = container_client.CreateBatch(); + std::vector> + deferred_responses; + for (const auto& blob_item : list_response.Blobs) { + deferred_responses.push_back(batch.DeleteBlob(blob_item.Name)); + } + try { + container_client.SubmitBatch(batch); + } catch (const Azure::Storage::StorageException& exception) { + return internal::ExceptionToStatus( + "Failed to delete blobs in a directory: " + location.path + ": " + + container_client.GetUrl(), + exception); + } + std::vector failed_blob_names; + for (size_t i = 0; i < deferred_responses.size(); ++i) { + const auto& deferred_response = deferred_responses[i]; + bool success = true; + try { + auto delete_result = deferred_response.GetResponse(); + success = delete_result.Value.Deleted; + } catch (const Azure::Storage::StorageException& exception) { + success = false; + } + if (!success) { + const auto& blob_item = list_response.Blobs[i]; + failed_blob_names.push_back(blob_item.Name); + } + } + if (!failed_blob_names.empty()) { + if (failed_blob_names.size() == 1) { + return Status::IOError("Failed to delete a blob: ", failed_blob_names[0], + ": " + container_client.GetUrl()); + } else { + return Status::IOError("Failed to delete blobs: [", + arrow::internal::JoinStrings(failed_blob_names, ", "), + "]: " + container_client.GetUrl()); + } + } + } + } catch (const Azure::Storage::StorageException& exception) { + return internal::ExceptionToStatus( + "Failed to list blobs in a directory: " + location.path + ": " + + container_client.GetUrl(), + exception); + } + return Status::OK(); + } + + public: Status DeleteDir(const AzureLocation& location) { if (location.container.empty()) { return Status::Invalid("Cannot delete an empty container"); @@ -1017,69 +1092,64 @@ class AzureFileSystem::Impl { exception); } } else { - auto container_client = - blob_service_client_->GetBlobContainerClient(location.container); - Azure::Storage::Blobs::ListBlobsOptions options; - options.Prefix = internal::EnsureTrailingSlash(location.path); - // https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch#remarks - // - // Only supports up to 256 subrequests in a single batch. The - // size of the body for a batch request can't exceed 4 MB. - const int32_t kNumMaxRequestsInBatch = 256; - options.PageSizeHint = kNumMaxRequestsInBatch; + return DeleteDirContentsWihtoutHierarchicalNamespace(location, + /*missing_dir_ok=*/true); + } + } + + Status DeleteDirContents(const AzureLocation& location, bool missing_dir_ok) { + if (location.container.empty()) { + return internal::InvalidDeleteDirContents(location.all); + } + + ARROW_ASSIGN_OR_RAISE(auto hierarchical_namespace_enabled, + hierarchical_namespace_.Enabled(location.container)); + if (hierarchical_namespace_enabled) { + auto file_system_client = + datalake_service_client_->GetFileSystemClient(location.container); + auto directory_client = file_system_client.GetDirectoryClient(location.path); try { - auto list_response = container_client.ListBlobs(options); - while (list_response.HasPage() && !list_response.Blobs.empty()) { - auto batch = container_client.CreateBatch(); - std::vector> - deferred_responses; - for (const auto& blob_item : list_response.Blobs) { - deferred_responses.push_back(batch.DeleteBlob(blob_item.Name)); - } - try { - container_client.SubmitBatch(batch); - } catch (const Azure::Storage::StorageException& exception) { - return internal::ExceptionToStatus( - "Failed to delete blobs in a directory: " + location.path + ": " + - container_client.GetUrl(), - exception); - } - std::vector failed_blob_names; - for (size_t i = 0; i < deferred_responses.size(); ++i) { - const auto& deferred_response = deferred_responses[i]; - bool success = true; - try { - auto delete_result = deferred_response.GetResponse(); - success = delete_result.Value.Deleted; - } catch (const Azure::Storage::StorageException& exception) { - success = false; - } - if (!success) { - const auto& blob_item = list_response.Blobs[i]; - failed_blob_names.push_back(blob_item.Name); - } - } - if (!failed_blob_names.empty()) { - if (failed_blob_names.size() == 1) { - return Status::IOError("Failed to delete a blob: ", failed_blob_names[0], - ": " + container_client.GetUrl()); + auto list_response = directory_client.ListPaths(false); + for (; list_response.HasPage(); list_response.MoveToNextPage()) { + for (const auto& path : list_response.Paths) { + if (path.IsDirectory) { + auto sub_directory_client = + file_system_client.GetDirectoryClient(path.Name); + try { + sub_directory_client.DeleteRecursive(); + } catch (const Azure::Storage::StorageException& exception) { + return internal::ExceptionToStatus( + "Failed to delete a sub directory: " + location.container + + internal::kSep + path.Name + ": " + sub_directory_client.GetUrl(), + exception); + } } else { - return Status::IOError( - "Failed to delete blobs: [", - arrow::internal::JoinStrings(failed_blob_names, ", "), - "]: " + container_client.GetUrl()); + auto sub_file_client = file_system_client.GetFileClient(path.Name); + try { + sub_file_client.Delete(); + } catch (const Azure::Storage::StorageException& exception) { + return internal::ExceptionToStatus( + "Failed to delete a sub file: " + location.container + + internal::kSep + path.Name + ": " + sub_file_client.GetUrl(), + exception); + } } } - list_response.MoveToNextPage(); } } catch (const Azure::Storage::StorageException& exception) { - return internal::ExceptionToStatus( - "Failed to list blobs in a directory: " + location.path + ": " + - container_client.GetUrl(), - exception); + if (missing_dir_ok && + exception.StatusCode == Azure::Core::Http::HttpStatusCode::NotFound) { + return Status::OK(); + } else { + return internal::ExceptionToStatus( + "Failed to delete directory contents: " + location.path + ": " + + directory_client.GetUrl(), + exception); + } } return Status::OK(); + } else { + return DeleteDirContentsWihtoutHierarchicalNamespace(location, missing_dir_ok); } } }; @@ -1121,7 +1191,8 @@ Status AzureFileSystem::DeleteDir(const std::string& path) { } Status AzureFileSystem::DeleteDirContents(const std::string& path, bool missing_dir_ok) { - return Status::NotImplemented("The Azure FileSystem is not fully implemented"); + ARROW_ASSIGN_OR_RAISE(auto location, AzureLocation::FromString(path)); + return impl_->DeleteDirContents(location, missing_dir_ok); } Status AzureFileSystem::DeleteRootDirContents() { diff --git a/cpp/src/arrow/filesystem/azurefs_test.cc b/cpp/src/arrow/filesystem/azurefs_test.cc index 7c86385126d40..b6f75987693c5 100644 --- a/cpp/src/arrow/filesystem/azurefs_test.cc +++ b/cpp/src/arrow/filesystem/azurefs_test.cc @@ -287,6 +287,45 @@ class AzureFileSystemTest : public ::testing::Test { void RunGetFileInfoObjectWithNestedStructureTest(); void RunGetFileInfoObjectTest(); + + struct HierarchicalPaths { + std::string container; + std::string directory; + std::vector sub_paths; + }; + + // Need to use "void" as the return type to use ASSERT_* in this method. + void CreateHierarchicalData(HierarchicalPaths& paths) { + const auto container_path = RandomContainerName(); + const auto directory_path = + internal::ConcatAbstractPath(container_path, RandomDirectoryName()); + const auto sub_directory_path = + internal::ConcatAbstractPath(directory_path, "new-sub"); + const auto sub_blob_path = + internal::ConcatAbstractPath(sub_directory_path, "sub.txt"); + const auto top_blob_path = internal::ConcatAbstractPath(directory_path, "top.txt"); + ASSERT_OK(fs_->CreateDir(sub_directory_path, true)); + ASSERT_OK_AND_ASSIGN(auto output, fs_->OpenOutputStream(sub_blob_path)); + ASSERT_OK(output->Write(std::string_view("sub"))); + ASSERT_OK(output->Close()); + ASSERT_OK_AND_ASSIGN(output, fs_->OpenOutputStream(top_blob_path)); + ASSERT_OK(output->Write(std::string_view("top"))); + ASSERT_OK(output->Close()); + + AssertFileInfo(fs_.get(), container_path, FileType::Directory); + AssertFileInfo(fs_.get(), directory_path, FileType::Directory); + AssertFileInfo(fs_.get(), sub_directory_path, FileType::Directory); + AssertFileInfo(fs_.get(), sub_blob_path, FileType::File); + AssertFileInfo(fs_.get(), top_blob_path, FileType::File); + + paths.container = container_path; + paths.directory = directory_path; + paths.sub_paths = { + sub_directory_path, + sub_blob_path, + top_blob_path, + }; + } }; class AzuriteFileSystemTest : public AzureFileSystemTest { @@ -666,6 +705,72 @@ TEST_F(AzuriteFileSystemTest, DeleteDirUri) { ASSERT_RAISES(Invalid, fs_->DeleteDir("abfs://" + PreexistingContainerPath())); } +TEST_F(AzuriteFileSystemTest, DeleteDirContentsSuccessContainer) { +#ifdef __APPLE__ + GTEST_SKIP() << "This test fails by an Azurite problem: " + "https://github.com/Azure/Azurite/pull/2302"; +#endif + HierarchicalPaths paths; + CreateHierarchicalData(paths); + ASSERT_OK(fs_->DeleteDirContents(paths.container)); + arrow::fs::AssertFileInfo(fs_.get(), paths.container, FileType::Directory); + arrow::fs::AssertFileInfo(fs_.get(), paths.directory, FileType::NotFound); + for (const auto& sub_path : paths.sub_paths) { + arrow::fs::AssertFileInfo(fs_.get(), sub_path, FileType::NotFound); + } +} + +TEST_F(AzuriteFileSystemTest, DeleteDirContentsSuccessDirectory) { +#ifdef __APPLE__ + GTEST_SKIP() << "This test fails by an Azurite problem: " + "https://github.com/Azure/Azurite/pull/2302"; +#endif + HierarchicalPaths paths; + CreateHierarchicalData(paths); + ASSERT_OK(fs_->DeleteDirContents(paths.directory)); + // GH-38772: We may change this to FileType::Directory. + arrow::fs::AssertFileInfo(fs_.get(), paths.directory, FileType::NotFound); + for (const auto& sub_path : paths.sub_paths) { + arrow::fs::AssertFileInfo(fs_.get(), sub_path, FileType::NotFound); + } +} + +TEST_F(AzuriteFileSystemTest, DeleteDirContentsSuccessNonexistent) { + const auto directory_path = + internal::ConcatAbstractPath(PreexistingContainerName(), RandomDirectoryName()); + ASSERT_OK(fs_->DeleteDirContents(directory_path, true)); + arrow::fs::AssertFileInfo(fs_.get(), directory_path, FileType::NotFound); +} + +TEST_F(AzuriteFileSystemTest, DeleteDirContentsFailureNonexistent) { + const auto directory_path = + internal::ConcatAbstractPath(PreexistingContainerName(), RandomDirectoryName()); + ASSERT_RAISES(IOError, fs_->DeleteDirContents(directory_path, false)); +} + +TEST_F(AzureHierarchicalNamespaceFileSystemTest, DeleteDirContentsSuccessExist) { + HierarchicalPaths paths; + CreateHierarchicalData(paths); + ASSERT_OK(fs_->DeleteDirContents(paths.directory)); + arrow::fs::AssertFileInfo(fs_.get(), paths.directory, FileType::Directory); + for (const auto& sub_path : paths.sub_paths) { + arrow::fs::AssertFileInfo(fs_.get(), sub_path, FileType::NotFound); + } +} + +TEST_F(AzureHierarchicalNamespaceFileSystemTest, DeleteDirContentsSuccessNonexistent) { + const auto directory_path = + internal::ConcatAbstractPath(PreexistingContainerName(), RandomDirectoryName()); + ASSERT_OK(fs_->DeleteDirContents(directory_path, true)); + arrow::fs::AssertFileInfo(fs_.get(), directory_path, FileType::NotFound); +} + +TEST_F(AzureHierarchicalNamespaceFileSystemTest, DeleteDirContentsFailureNonexistent) { + const auto directory_path = + internal::ConcatAbstractPath(PreexistingContainerName(), RandomDirectoryName()); + ASSERT_RAISES(IOError, fs_->DeleteDirContents(directory_path, false)); +} + TEST_F(AzuriteFileSystemTest, OpenInputStreamString) { std::shared_ptr stream; ASSERT_OK_AND_ASSIGN(stream, fs_->OpenInputStream(PreexistingObjectPath()));