Skip to content

Commit

Permalink
apacheGH-38701: [C++][FS][Azure] Implement DeleteDirContents()
Browse files Browse the repository at this point in the history
  • Loading branch information
kou committed Dec 1, 2023
1 parent f3cdd81 commit cc40c20
Show file tree
Hide file tree
Showing 2 changed files with 203 additions and 55 deletions.
181 changes: 126 additions & 55 deletions cpp/src/arrow/filesystem/azurefs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -970,6 +970,78 @@ class AzureFileSystem::Impl {
return stream;
}

private:
Status DeleteDirContentsWihtoutHierarchicalNamespace(const AzureLocation& location,
bool missing_dir_ok) {
auto container_client =
blob_service_client_->GetBlobContainerClient(location.container);
Azure::Storage::Blobs::ListBlobsOptions options;
options.Prefix = internal::EnsureTrailingSlash(location.path);
// https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch#remarks
//
// Only supports up to 256 subrequests in a single batch. The
// size of the body for a batch request can't exceed 4 MB.
const int32_t kNumMaxRequestsInBatch = 256;
options.PageSizeHint = kNumMaxRequestsInBatch;
try {
auto list_response = container_client.ListBlobs(options);
if (!missing_dir_ok && list_response.Blobs.empty()) {
return Status::IOError("Specified directory doesn't exist: ", location.path, ": ",
container_client.GetUrl());
}
while (list_response.HasPage() && !list_response.Blobs.empty()) {
auto batch = container_client.CreateBatch();
std::vector<Azure::Storage::DeferredResponse<
Azure::Storage::Blobs::Models::DeleteBlobResult>>
deferred_responses;
for (const auto& blob_item : list_response.Blobs) {
deferred_responses.push_back(batch.DeleteBlob(blob_item.Name));
}
try {
container_client.SubmitBatch(batch);
} catch (const Azure::Storage::StorageException& exception) {
return internal::ExceptionToStatus(
"Failed to delete blobs in a directory: " + location.path + ": " +
container_client.GetUrl(),
exception);
}
std::vector<std::string> failed_blob_names;
for (size_t i = 0; i < deferred_responses.size(); ++i) {
const auto& deferred_response = deferred_responses[i];
bool success = true;
try {
auto delete_result = deferred_response.GetResponse();
success = delete_result.Value.Deleted;
} catch (const Azure::Storage::StorageException& exception) {
success = false;
}
if (!success) {
const auto& blob_item = list_response.Blobs[i];
failed_blob_names.push_back(blob_item.Name);
}
}
if (!failed_blob_names.empty()) {
if (failed_blob_names.size() == 1) {
return Status::IOError("Failed to delete a blob: ", failed_blob_names[0],
": " + container_client.GetUrl());
} else {
return Status::IOError("Failed to delete blobs: [",
arrow::internal::JoinStrings(failed_blob_names, ", "),
"]: " + container_client.GetUrl());
}
}
list_response.MoveToNextPage();
}
} catch (const Azure::Storage::StorageException& exception) {
return internal::ExceptionToStatus(
"Failed to list blobs in a directory: " + location.path + ": " +
container_client.GetUrl(),
exception);
}
return Status::OK();
}

public:
Status DeleteDir(const AzureLocation& location) {
if (location.container.empty()) {
return Status::Invalid("Cannot delete an empty container");
Expand Down Expand Up @@ -1017,69 +1089,67 @@ class AzureFileSystem::Impl {
exception);
}
} else {
auto container_client =
blob_service_client_->GetBlobContainerClient(location.container);
Azure::Storage::Blobs::ListBlobsOptions options;
options.Prefix = internal::EnsureTrailingSlash(location.path);
// https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch#remarks
//
// Only supports up to 256 subrequests in a single batch. The
// size of the body for a batch request can't exceed 4 MB.
const int32_t kNumMaxRequestsInBatch = 256;
options.PageSizeHint = kNumMaxRequestsInBatch;
return DeleteDirContentsWihtoutHierarchicalNamespace(location, true);
}
}

Status DeleteDirContents(const AzureLocation& location, bool missing_dir_ok) {
if (location.container.empty()) {
return internal::InvalidDeleteDirContents(location.all);
}
if (location.path.empty()) {
return internal::InvalidDeleteDirContents(location.all);
}

ARROW_ASSIGN_OR_RAISE(auto hierarchical_namespace_enabled,
hierarchical_namespace_.Enabled(location.container));
if (hierarchical_namespace_enabled) {
auto file_system_client =
datalake_service_client_->GetFileSystemClient(location.container);
auto directory_client = file_system_client.GetDirectoryClient(location.path);
try {
auto list_response = container_client.ListBlobs(options);
while (list_response.HasPage() && !list_response.Blobs.empty()) {
auto batch = container_client.CreateBatch();
std::vector<Azure::Storage::DeferredResponse<
Azure::Storage::Blobs::Models::DeleteBlobResult>>
deferred_responses;
for (const auto& blob_item : list_response.Blobs) {
deferred_responses.push_back(batch.DeleteBlob(blob_item.Name));
}
try {
container_client.SubmitBatch(batch);
} catch (const Azure::Storage::StorageException& exception) {
return internal::ExceptionToStatus(
"Failed to delete blobs in a directory: " + location.path + ": " +
container_client.GetUrl(),
exception);
}
std::vector<std::string> failed_blob_names;
for (size_t i = 0; i < deferred_responses.size(); ++i) {
const auto& deferred_response = deferred_responses[i];
bool success = true;
try {
auto delete_result = deferred_response.GetResponse();
success = delete_result.Value.Deleted;
} catch (const Azure::Storage::StorageException& exception) {
success = false;
}
if (!success) {
const auto& blob_item = list_response.Blobs[i];
failed_blob_names.push_back(blob_item.Name);
}
}
if (!failed_blob_names.empty()) {
if (failed_blob_names.size() == 1) {
return Status::IOError("Failed to delete a blob: ", failed_blob_names[0],
": " + container_client.GetUrl());
auto list_response = directory_client.ListPaths(false);
while (list_response.HasPage() && !list_response.Paths.empty()) {
for (const auto& path : list_response.Paths) {
if (path.IsDirectory) {
auto sub_directory_client =
file_system_client.GetDirectoryClient(path.Name);
try {
sub_directory_client.DeleteRecursive();
} catch (const Azure::Storage::StorageException& exception) {
return internal::ExceptionToStatus(
"Failed to delete a sub directory: " + location.container +
internal::kSep + path.Name + ": " + sub_directory_client.GetUrl(),
exception);
}
} else {
return Status::IOError(
"Failed to delete blobs: [",
arrow::internal::JoinStrings(failed_blob_names, ", "),
"]: " + container_client.GetUrl());
auto sub_file_client = file_system_client.GetFileClient(path.Name);
try {
sub_file_client.Delete();
} catch (const Azure::Storage::StorageException& exception) {
return internal::ExceptionToStatus(
"Failed to delete a sub file: " + location.container +
internal::kSep + path.Name + ": " + sub_file_client.GetUrl(),
exception);
}
}
}
list_response.MoveToNextPage();
}
} catch (const Azure::Storage::StorageException& exception) {
return internal::ExceptionToStatus(
"Failed to list blobs in a directory: " + location.path + ": " +
container_client.GetUrl(),
exception);
if (missing_dir_ok &&
exception.StatusCode == Azure::Core::Http::HttpStatusCode::NotFound) {
return Status::OK();
} else {
return internal::ExceptionToStatus(
"Failed to delete directory contents: " + location.path + ": " +
directory_client.GetUrl(),
exception);
}
}
return Status::OK();
} else {
return DeleteDirContentsWihtoutHierarchicalNamespace(location, missing_dir_ok);
}
}
};
Expand Down Expand Up @@ -1121,7 +1191,8 @@ Status AzureFileSystem::DeleteDir(const std::string& path) {
}

Status AzureFileSystem::DeleteDirContents(const std::string& path, bool missing_dir_ok) {
return Status::NotImplemented("The Azure FileSystem is not fully implemented");
ARROW_ASSIGN_OR_RAISE(auto location, AzureLocation::FromString(path));
return impl_->DeleteDirContents(location, missing_dir_ok);
}

Status AzureFileSystem::DeleteRootDirContents() {
Expand Down
77 changes: 77 additions & 0 deletions cpp/src/arrow/filesystem/azurefs_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,83 @@ TEST_F(AzuriteFileSystemTest, DeleteDirUri) {
ASSERT_RAISES(Invalid, fs_->DeleteDir("abfs://" + PreexistingContainerPath()));
}

TEST_F(AzuriteFileSystemTest, DeleteDirContentsSuccessExist) {
const auto directory_path =
internal::ConcatAbstractPath(PreexistingContainerName(), RandomDirectoryName());
const auto sub_directory_path = internal::ConcatAbstractPath(directory_path, "new-sub");
const auto sub_blob_path = internal::ConcatAbstractPath(sub_directory_path, "sub.txt");
const auto top_blob_path = internal::ConcatAbstractPath(directory_path, "top.txt");
ASSERT_OK(fs_->CreateDir(sub_directory_path, true));
ASSERT_OK_AND_ASSIGN(auto output, fs_->OpenOutputStream(sub_blob_path));
ASSERT_OK(output->Write(std::string_view("sub")));
ASSERT_OK(output->Close());
ASSERT_OK_AND_ASSIGN(output, fs_->OpenOutputStream(top_blob_path));
ASSERT_OK(output->Write(std::string_view("top")));
ASSERT_OK(output->Close());

arrow::fs::AssertFileInfo(fs_.get(), directory_path, FileType::Directory);
arrow::fs::AssertFileInfo(fs_.get(), sub_directory_path, FileType::Directory);
arrow::fs::AssertFileInfo(fs_.get(), sub_blob_path, FileType::File);
arrow::fs::AssertFileInfo(fs_.get(), top_blob_path, FileType::File);
ASSERT_OK(fs_->DeleteDirContents(directory_path));
// GH-38772: We may change this to FileType::Directory.
arrow::fs::AssertFileInfo(fs_.get(), directory_path, FileType::NotFound);
arrow::fs::AssertFileInfo(fs_.get(), sub_directory_path, FileType::NotFound);
arrow::fs::AssertFileInfo(fs_.get(), sub_blob_path, FileType::NotFound);
arrow::fs::AssertFileInfo(fs_.get(), top_blob_path, FileType::NotFound);
}

TEST_F(AzuriteFileSystemTest, DeleteDirContentsSuccessNonexistent) {
const auto directory_path =
internal::ConcatAbstractPath(PreexistingContainerName(), RandomDirectoryName());
ASSERT_OK(fs_->DeleteDirContents(directory_path, true));
arrow::fs::AssertFileInfo(fs_.get(), directory_path, FileType::NotFound);
}

TEST_F(AzuriteFileSystemTest, DeleteDirContentsFailureNonexistent) {
const auto directory_path =
internal::ConcatAbstractPath(PreexistingContainerName(), RandomDirectoryName());
ASSERT_RAISES(IOError, fs_->DeleteDirContents(directory_path, false));
}

TEST_F(AzureHierarchicalNamespaceFileSystemTest, DeleteDirContentsSuccessExist) {
const auto directory_path =
internal::ConcatAbstractPath(PreexistingContainerName(), RandomDirectoryName());
const auto sub_directory_path = internal::ConcatAbstractPath(directory_path, "new-sub");
const auto sub_blob_path = internal::ConcatAbstractPath(sub_directory_path, "sub.txt");
const auto top_blob_path = internal::ConcatAbstractPath(directory_path, "top.txt");
ASSERT_OK(fs_->CreateDir(sub_directory_path, true));
ASSERT_OK_AND_ASSIGN(auto output, fs_->OpenOutputStream(sub_blob_path));
ASSERT_OK(output->Write(std::string_view("sub")));
ASSERT_OK(output->Close());
ASSERT_OK_AND_ASSIGN(output, fs_->OpenOutputStream(top_blob_path));
ASSERT_OK(output->Write(std::string_view("top")));
ASSERT_OK(output->Close());

arrow::fs::AssertFileInfo(fs_.get(), directory_path, FileType::Directory);
arrow::fs::AssertFileInfo(fs_.get(), sub_directory_path, FileType::Directory);
arrow::fs::AssertFileInfo(fs_.get(), sub_blob_path, FileType::File);
arrow::fs::AssertFileInfo(fs_.get(), top_blob_path, FileType::File);
ASSERT_OK(fs_->DeleteDirContents(directory_path));
arrow::fs::AssertFileInfo(fs_.get(), directory_path, FileType::Directory);
arrow::fs::AssertFileInfo(fs_.get(), sub_directory_path, FileType::NotFound);
arrow::fs::AssertFileInfo(fs_.get(), sub_blob_path, FileType::NotFound);
arrow::fs::AssertFileInfo(fs_.get(), top_blob_path, FileType::NotFound);
}

TEST_F(AzureHierarchicalNamespaceFileSystemTest, DeleteDirContentsSuccessNonexistent) {
const auto directory_path =
internal::ConcatAbstractPath(PreexistingContainerName(), RandomDirectoryName());
ASSERT_OK(fs_->DeleteDirContents(directory_path, true));
arrow::fs::AssertFileInfo(fs_.get(), directory_path, FileType::NotFound);
}

TEST_F(AzureHierarchicalNamespaceFileSystemTest, DeleteDirContentsFailureNonexistent) {
const auto directory_path =
internal::ConcatAbstractPath(PreexistingContainerName(), RandomDirectoryName());
ASSERT_RAISES(IOError, fs_->DeleteDirContents(directory_path, false));
}

TEST_F(AzuriteFileSystemTest, OpenInputStreamString) {
std::shared_ptr<io::InputStream> stream;
ASSERT_OK_AND_ASSIGN(stream, fs_->OpenInputStream(PreexistingObjectPath()));
Expand Down

0 comments on commit cc40c20

Please sign in to comment.