From cc40c20ac8a96bfcf5752cdfea6f918c72e42366 Mon Sep 17 00:00:00 2001 From: Sutou Kouhei Date: Mon, 27 Nov 2023 16:44:41 +0900 Subject: [PATCH] GH-38701: [C++][FS][Azure] Implement `DeleteDirContents()` --- cpp/src/arrow/filesystem/azurefs.cc | 181 ++++++++++++++++------- cpp/src/arrow/filesystem/azurefs_test.cc | 77 ++++++++++ 2 files changed, 203 insertions(+), 55 deletions(-) diff --git a/cpp/src/arrow/filesystem/azurefs.cc b/cpp/src/arrow/filesystem/azurefs.cc index 4dde275da135f..6d2ce1599e9a9 100644 --- a/cpp/src/arrow/filesystem/azurefs.cc +++ b/cpp/src/arrow/filesystem/azurefs.cc @@ -970,6 +970,78 @@ class AzureFileSystem::Impl { return stream; } + private: + Status DeleteDirContentsWihtoutHierarchicalNamespace(const AzureLocation& location, + bool missing_dir_ok) { + auto container_client = + blob_service_client_->GetBlobContainerClient(location.container); + Azure::Storage::Blobs::ListBlobsOptions options; + options.Prefix = internal::EnsureTrailingSlash(location.path); + // https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch#remarks + // + // Only supports up to 256 subrequests in a single batch. The + // size of the body for a batch request can't exceed 4 MB. + const int32_t kNumMaxRequestsInBatch = 256; + options.PageSizeHint = kNumMaxRequestsInBatch; + try { + auto list_response = container_client.ListBlobs(options); + if (!missing_dir_ok && list_response.Blobs.empty()) { + return Status::IOError("Specified directory doesn't exist: ", location.path, ": ", + container_client.GetUrl()); + } + while (list_response.HasPage() && !list_response.Blobs.empty()) { + auto batch = container_client.CreateBatch(); + std::vector> + deferred_responses; + for (const auto& blob_item : list_response.Blobs) { + deferred_responses.push_back(batch.DeleteBlob(blob_item.Name)); + } + try { + container_client.SubmitBatch(batch); + } catch (const Azure::Storage::StorageException& exception) { + return internal::ExceptionToStatus( + "Failed to delete blobs in a directory: " + location.path + ": " + + container_client.GetUrl(), + exception); + } + std::vector failed_blob_names; + for (size_t i = 0; i < deferred_responses.size(); ++i) { + const auto& deferred_response = deferred_responses[i]; + bool success = true; + try { + auto delete_result = deferred_response.GetResponse(); + success = delete_result.Value.Deleted; + } catch (const Azure::Storage::StorageException& exception) { + success = false; + } + if (!success) { + const auto& blob_item = list_response.Blobs[i]; + failed_blob_names.push_back(blob_item.Name); + } + } + if (!failed_blob_names.empty()) { + if (failed_blob_names.size() == 1) { + return Status::IOError("Failed to delete a blob: ", failed_blob_names[0], + ": " + container_client.GetUrl()); + } else { + return Status::IOError("Failed to delete blobs: [", + arrow::internal::JoinStrings(failed_blob_names, ", "), + "]: " + container_client.GetUrl()); + } + } + list_response.MoveToNextPage(); + } + } catch (const Azure::Storage::StorageException& exception) { + return internal::ExceptionToStatus( + "Failed to list blobs in a directory: " + location.path + ": " + + container_client.GetUrl(), + exception); + } + return Status::OK(); + } + + public: Status DeleteDir(const AzureLocation& location) { if (location.container.empty()) { return Status::Invalid("Cannot delete an empty container"); @@ -1017,69 +1089,67 @@ class AzureFileSystem::Impl { exception); } } else { - auto container_client = - blob_service_client_->GetBlobContainerClient(location.container); - Azure::Storage::Blobs::ListBlobsOptions options; - options.Prefix = internal::EnsureTrailingSlash(location.path); - // https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch#remarks - // - // Only supports up to 256 subrequests in a single batch. The - // size of the body for a batch request can't exceed 4 MB. - const int32_t kNumMaxRequestsInBatch = 256; - options.PageSizeHint = kNumMaxRequestsInBatch; + return DeleteDirContentsWihtoutHierarchicalNamespace(location, true); + } + } + + Status DeleteDirContents(const AzureLocation& location, bool missing_dir_ok) { + if (location.container.empty()) { + return internal::InvalidDeleteDirContents(location.all); + } + if (location.path.empty()) { + return internal::InvalidDeleteDirContents(location.all); + } + + ARROW_ASSIGN_OR_RAISE(auto hierarchical_namespace_enabled, + hierarchical_namespace_.Enabled(location.container)); + if (hierarchical_namespace_enabled) { + auto file_system_client = + datalake_service_client_->GetFileSystemClient(location.container); + auto directory_client = file_system_client.GetDirectoryClient(location.path); try { - auto list_response = container_client.ListBlobs(options); - while (list_response.HasPage() && !list_response.Blobs.empty()) { - auto batch = container_client.CreateBatch(); - std::vector> - deferred_responses; - for (const auto& blob_item : list_response.Blobs) { - deferred_responses.push_back(batch.DeleteBlob(blob_item.Name)); - } - try { - container_client.SubmitBatch(batch); - } catch (const Azure::Storage::StorageException& exception) { - return internal::ExceptionToStatus( - "Failed to delete blobs in a directory: " + location.path + ": " + - container_client.GetUrl(), - exception); - } - std::vector failed_blob_names; - for (size_t i = 0; i < deferred_responses.size(); ++i) { - const auto& deferred_response = deferred_responses[i]; - bool success = true; - try { - auto delete_result = deferred_response.GetResponse(); - success = delete_result.Value.Deleted; - } catch (const Azure::Storage::StorageException& exception) { - success = false; - } - if (!success) { - const auto& blob_item = list_response.Blobs[i]; - failed_blob_names.push_back(blob_item.Name); - } - } - if (!failed_blob_names.empty()) { - if (failed_blob_names.size() == 1) { - return Status::IOError("Failed to delete a blob: ", failed_blob_names[0], - ": " + container_client.GetUrl()); + auto list_response = directory_client.ListPaths(false); + while (list_response.HasPage() && !list_response.Paths.empty()) { + for (const auto& path : list_response.Paths) { + if (path.IsDirectory) { + auto sub_directory_client = + file_system_client.GetDirectoryClient(path.Name); + try { + sub_directory_client.DeleteRecursive(); + } catch (const Azure::Storage::StorageException& exception) { + return internal::ExceptionToStatus( + "Failed to delete a sub directory: " + location.container + + internal::kSep + path.Name + ": " + sub_directory_client.GetUrl(), + exception); + } } else { - return Status::IOError( - "Failed to delete blobs: [", - arrow::internal::JoinStrings(failed_blob_names, ", "), - "]: " + container_client.GetUrl()); + auto sub_file_client = file_system_client.GetFileClient(path.Name); + try { + sub_file_client.Delete(); + } catch (const Azure::Storage::StorageException& exception) { + return internal::ExceptionToStatus( + "Failed to delete a sub file: " + location.container + + internal::kSep + path.Name + ": " + sub_file_client.GetUrl(), + exception); + } } } list_response.MoveToNextPage(); } } catch (const Azure::Storage::StorageException& exception) { - return internal::ExceptionToStatus( - "Failed to list blobs in a directory: " + location.path + ": " + - container_client.GetUrl(), - exception); + if (missing_dir_ok && + exception.StatusCode == Azure::Core::Http::HttpStatusCode::NotFound) { + return Status::OK(); + } else { + return internal::ExceptionToStatus( + "Failed to delete directory contents: " + location.path + ": " + + directory_client.GetUrl(), + exception); + } } return Status::OK(); + } else { + return DeleteDirContentsWihtoutHierarchicalNamespace(location, missing_dir_ok); } } }; @@ -1121,7 +1191,8 @@ Status AzureFileSystem::DeleteDir(const std::string& path) { } Status AzureFileSystem::DeleteDirContents(const std::string& path, bool missing_dir_ok) { - return Status::NotImplemented("The Azure FileSystem is not fully implemented"); + ARROW_ASSIGN_OR_RAISE(auto location, AzureLocation::FromString(path)); + return impl_->DeleteDirContents(location, missing_dir_ok); } Status AzureFileSystem::DeleteRootDirContents() { diff --git a/cpp/src/arrow/filesystem/azurefs_test.cc b/cpp/src/arrow/filesystem/azurefs_test.cc index 7c86385126d40..15e38c760445e 100644 --- a/cpp/src/arrow/filesystem/azurefs_test.cc +++ b/cpp/src/arrow/filesystem/azurefs_test.cc @@ -666,6 +666,83 @@ TEST_F(AzuriteFileSystemTest, DeleteDirUri) { ASSERT_RAISES(Invalid, fs_->DeleteDir("abfs://" + PreexistingContainerPath())); } +TEST_F(AzuriteFileSystemTest, DeleteDirContentsSuccessExist) { + const auto directory_path = + internal::ConcatAbstractPath(PreexistingContainerName(), RandomDirectoryName()); + const auto sub_directory_path = internal::ConcatAbstractPath(directory_path, "new-sub"); + const auto sub_blob_path = internal::ConcatAbstractPath(sub_directory_path, "sub.txt"); + const auto top_blob_path = internal::ConcatAbstractPath(directory_path, "top.txt"); + ASSERT_OK(fs_->CreateDir(sub_directory_path, true)); + ASSERT_OK_AND_ASSIGN(auto output, fs_->OpenOutputStream(sub_blob_path)); + ASSERT_OK(output->Write(std::string_view("sub"))); + ASSERT_OK(output->Close()); + ASSERT_OK_AND_ASSIGN(output, fs_->OpenOutputStream(top_blob_path)); + ASSERT_OK(output->Write(std::string_view("top"))); + ASSERT_OK(output->Close()); + + arrow::fs::AssertFileInfo(fs_.get(), directory_path, FileType::Directory); + arrow::fs::AssertFileInfo(fs_.get(), sub_directory_path, FileType::Directory); + arrow::fs::AssertFileInfo(fs_.get(), sub_blob_path, FileType::File); + arrow::fs::AssertFileInfo(fs_.get(), top_blob_path, FileType::File); + ASSERT_OK(fs_->DeleteDirContents(directory_path)); + // GH-38772: We may change this to FileType::Directory. + arrow::fs::AssertFileInfo(fs_.get(), directory_path, FileType::NotFound); + arrow::fs::AssertFileInfo(fs_.get(), sub_directory_path, FileType::NotFound); + arrow::fs::AssertFileInfo(fs_.get(), sub_blob_path, FileType::NotFound); + arrow::fs::AssertFileInfo(fs_.get(), top_blob_path, FileType::NotFound); +} + +TEST_F(AzuriteFileSystemTest, DeleteDirContentsSuccessNonexistent) { + const auto directory_path = + internal::ConcatAbstractPath(PreexistingContainerName(), RandomDirectoryName()); + ASSERT_OK(fs_->DeleteDirContents(directory_path, true)); + arrow::fs::AssertFileInfo(fs_.get(), directory_path, FileType::NotFound); +} + +TEST_F(AzuriteFileSystemTest, DeleteDirContentsFailureNonexistent) { + const auto directory_path = + internal::ConcatAbstractPath(PreexistingContainerName(), RandomDirectoryName()); + ASSERT_RAISES(IOError, fs_->DeleteDirContents(directory_path, false)); +} + +TEST_F(AzureHierarchicalNamespaceFileSystemTest, DeleteDirContentsSuccessExist) { + const auto directory_path = + internal::ConcatAbstractPath(PreexistingContainerName(), RandomDirectoryName()); + const auto sub_directory_path = internal::ConcatAbstractPath(directory_path, "new-sub"); + const auto sub_blob_path = internal::ConcatAbstractPath(sub_directory_path, "sub.txt"); + const auto top_blob_path = internal::ConcatAbstractPath(directory_path, "top.txt"); + ASSERT_OK(fs_->CreateDir(sub_directory_path, true)); + ASSERT_OK_AND_ASSIGN(auto output, fs_->OpenOutputStream(sub_blob_path)); + ASSERT_OK(output->Write(std::string_view("sub"))); + ASSERT_OK(output->Close()); + ASSERT_OK_AND_ASSIGN(output, fs_->OpenOutputStream(top_blob_path)); + ASSERT_OK(output->Write(std::string_view("top"))); + ASSERT_OK(output->Close()); + + arrow::fs::AssertFileInfo(fs_.get(), directory_path, FileType::Directory); + arrow::fs::AssertFileInfo(fs_.get(), sub_directory_path, FileType::Directory); + arrow::fs::AssertFileInfo(fs_.get(), sub_blob_path, FileType::File); + arrow::fs::AssertFileInfo(fs_.get(), top_blob_path, FileType::File); + ASSERT_OK(fs_->DeleteDirContents(directory_path)); + arrow::fs::AssertFileInfo(fs_.get(), directory_path, FileType::Directory); + arrow::fs::AssertFileInfo(fs_.get(), sub_directory_path, FileType::NotFound); + arrow::fs::AssertFileInfo(fs_.get(), sub_blob_path, FileType::NotFound); + arrow::fs::AssertFileInfo(fs_.get(), top_blob_path, FileType::NotFound); +} + +TEST_F(AzureHierarchicalNamespaceFileSystemTest, DeleteDirContentsSuccessNonexistent) { + const auto directory_path = + internal::ConcatAbstractPath(PreexistingContainerName(), RandomDirectoryName()); + ASSERT_OK(fs_->DeleteDirContents(directory_path, true)); + arrow::fs::AssertFileInfo(fs_.get(), directory_path, FileType::NotFound); +} + +TEST_F(AzureHierarchicalNamespaceFileSystemTest, DeleteDirContentsFailureNonexistent) { + const auto directory_path = + internal::ConcatAbstractPath(PreexistingContainerName(), RandomDirectoryName()); + ASSERT_RAISES(IOError, fs_->DeleteDirContents(directory_path, false)); +} + TEST_F(AzuriteFileSystemTest, OpenInputStreamString) { std::shared_ptr stream; ASSERT_OK_AND_ASSIGN(stream, fs_->OpenInputStream(PreexistingObjectPath()));