Skip to content

Commit

Permalink
apacheGH-38701: [C++][FS][Azure] Implement DeleteDirContents() (apa…
Browse files Browse the repository at this point in the history
…che#38888)

### Rationale for this change

`DeleteDirContents()` deletes the given directory contents recursively like other filesystem implementations.

Azure file system treats the following cases as root directory:
* Empty container
* Empty path

### What changes are included in this PR?

List and delete approach is used with/without hierarchical namespace support. Because Azure doesn't provide "DeleteDirContents" API with hierarchical namespace support. We may want to use delete the given directory and create an empty approach instead of list and delete approach for performance but it may not be acceptable with some use cases. 

### Are these changes tested?

Yes.

### Are there any user-facing changes?

Yes.
* Closes: apache#38701

Authored-by: Sutou Kouhei <[email protected]>
Signed-off-by: Sutou Kouhei <[email protected]>
  • Loading branch information
kou authored and dgreiss committed Feb 17, 2024
1 parent 64971f8 commit 282d905
Show file tree
Hide file tree
Showing 2 changed files with 232 additions and 56 deletions.
183 changes: 127 additions & 56 deletions cpp/src/arrow/filesystem/azurefs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -970,6 +970,81 @@ class AzureFileSystem::Impl {
return stream;
}

private:
Status DeleteDirContentsWihtoutHierarchicalNamespace(const AzureLocation& location,
bool missing_dir_ok) {
auto container_client =
blob_service_client_->GetBlobContainerClient(location.container);
Azure::Storage::Blobs::ListBlobsOptions options;
if (!location.path.empty()) {
options.Prefix = internal::EnsureTrailingSlash(location.path);
}
// https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch#remarks
//
// Only supports up to 256 subrequests in a single batch. The
// size of the body for a batch request can't exceed 4 MB.
const int32_t kNumMaxRequestsInBatch = 256;
options.PageSizeHint = kNumMaxRequestsInBatch;
try {
auto list_response = container_client.ListBlobs(options);
if (!missing_dir_ok && list_response.Blobs.empty()) {
return PathNotFound(location);
}
for (; list_response.HasPage(); list_response.MoveToNextPage()) {
if (list_response.Blobs.empty()) {
continue;
}
auto batch = container_client.CreateBatch();
std::vector<Azure::Storage::DeferredResponse<
Azure::Storage::Blobs::Models::DeleteBlobResult>>
deferred_responses;
for (const auto& blob_item : list_response.Blobs) {
deferred_responses.push_back(batch.DeleteBlob(blob_item.Name));
}
try {
container_client.SubmitBatch(batch);
} catch (const Azure::Storage::StorageException& exception) {
return internal::ExceptionToStatus(
"Failed to delete blobs in a directory: " + location.path + ": " +
container_client.GetUrl(),
exception);
}
std::vector<std::string> failed_blob_names;
for (size_t i = 0; i < deferred_responses.size(); ++i) {
const auto& deferred_response = deferred_responses[i];
bool success = true;
try {
auto delete_result = deferred_response.GetResponse();
success = delete_result.Value.Deleted;
} catch (const Azure::Storage::StorageException& exception) {
success = false;
}
if (!success) {
const auto& blob_item = list_response.Blobs[i];
failed_blob_names.push_back(blob_item.Name);
}
}
if (!failed_blob_names.empty()) {
if (failed_blob_names.size() == 1) {
return Status::IOError("Failed to delete a blob: ", failed_blob_names[0],
": " + container_client.GetUrl());
} else {
return Status::IOError("Failed to delete blobs: [",
arrow::internal::JoinStrings(failed_blob_names, ", "),
"]: " + container_client.GetUrl());
}
}
}
} catch (const Azure::Storage::StorageException& exception) {
return internal::ExceptionToStatus(
"Failed to list blobs in a directory: " + location.path + ": " +
container_client.GetUrl(),
exception);
}
return Status::OK();
}

public:
Status DeleteDir(const AzureLocation& location) {
if (location.container.empty()) {
return Status::Invalid("Cannot delete an empty container");
Expand Down Expand Up @@ -1017,69 +1092,64 @@ class AzureFileSystem::Impl {
exception);
}
} else {
auto container_client =
blob_service_client_->GetBlobContainerClient(location.container);
Azure::Storage::Blobs::ListBlobsOptions options;
options.Prefix = internal::EnsureTrailingSlash(location.path);
// https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch#remarks
//
// Only supports up to 256 subrequests in a single batch. The
// size of the body for a batch request can't exceed 4 MB.
const int32_t kNumMaxRequestsInBatch = 256;
options.PageSizeHint = kNumMaxRequestsInBatch;
return DeleteDirContentsWihtoutHierarchicalNamespace(location,
/*missing_dir_ok=*/true);
}
}

Status DeleteDirContents(const AzureLocation& location, bool missing_dir_ok) {
if (location.container.empty()) {
return internal::InvalidDeleteDirContents(location.all);
}

ARROW_ASSIGN_OR_RAISE(auto hierarchical_namespace_enabled,
hierarchical_namespace_.Enabled(location.container));
if (hierarchical_namespace_enabled) {
auto file_system_client =
datalake_service_client_->GetFileSystemClient(location.container);
auto directory_client = file_system_client.GetDirectoryClient(location.path);
try {
auto list_response = container_client.ListBlobs(options);
while (list_response.HasPage() && !list_response.Blobs.empty()) {
auto batch = container_client.CreateBatch();
std::vector<Azure::Storage::DeferredResponse<
Azure::Storage::Blobs::Models::DeleteBlobResult>>
deferred_responses;
for (const auto& blob_item : list_response.Blobs) {
deferred_responses.push_back(batch.DeleteBlob(blob_item.Name));
}
try {
container_client.SubmitBatch(batch);
} catch (const Azure::Storage::StorageException& exception) {
return internal::ExceptionToStatus(
"Failed to delete blobs in a directory: " + location.path + ": " +
container_client.GetUrl(),
exception);
}
std::vector<std::string> failed_blob_names;
for (size_t i = 0; i < deferred_responses.size(); ++i) {
const auto& deferred_response = deferred_responses[i];
bool success = true;
try {
auto delete_result = deferred_response.GetResponse();
success = delete_result.Value.Deleted;
} catch (const Azure::Storage::StorageException& exception) {
success = false;
}
if (!success) {
const auto& blob_item = list_response.Blobs[i];
failed_blob_names.push_back(blob_item.Name);
}
}
if (!failed_blob_names.empty()) {
if (failed_blob_names.size() == 1) {
return Status::IOError("Failed to delete a blob: ", failed_blob_names[0],
": " + container_client.GetUrl());
auto list_response = directory_client.ListPaths(false);
for (; list_response.HasPage(); list_response.MoveToNextPage()) {
for (const auto& path : list_response.Paths) {
if (path.IsDirectory) {
auto sub_directory_client =
file_system_client.GetDirectoryClient(path.Name);
try {
sub_directory_client.DeleteRecursive();
} catch (const Azure::Storage::StorageException& exception) {
return internal::ExceptionToStatus(
"Failed to delete a sub directory: " + location.container +
internal::kSep + path.Name + ": " + sub_directory_client.GetUrl(),
exception);
}
} else {
return Status::IOError(
"Failed to delete blobs: [",
arrow::internal::JoinStrings(failed_blob_names, ", "),
"]: " + container_client.GetUrl());
auto sub_file_client = file_system_client.GetFileClient(path.Name);
try {
sub_file_client.Delete();
} catch (const Azure::Storage::StorageException& exception) {
return internal::ExceptionToStatus(
"Failed to delete a sub file: " + location.container +
internal::kSep + path.Name + ": " + sub_file_client.GetUrl(),
exception);
}
}
}
list_response.MoveToNextPage();
}
} catch (const Azure::Storage::StorageException& exception) {
return internal::ExceptionToStatus(
"Failed to list blobs in a directory: " + location.path + ": " +
container_client.GetUrl(),
exception);
if (missing_dir_ok &&
exception.StatusCode == Azure::Core::Http::HttpStatusCode::NotFound) {
return Status::OK();
} else {
return internal::ExceptionToStatus(
"Failed to delete directory contents: " + location.path + ": " +
directory_client.GetUrl(),
exception);
}
}
return Status::OK();
} else {
return DeleteDirContentsWihtoutHierarchicalNamespace(location, missing_dir_ok);
}
}
};
Expand Down Expand Up @@ -1121,7 +1191,8 @@ Status AzureFileSystem::DeleteDir(const std::string& path) {
}

Status AzureFileSystem::DeleteDirContents(const std::string& path, bool missing_dir_ok) {
return Status::NotImplemented("The Azure FileSystem is not fully implemented");
ARROW_ASSIGN_OR_RAISE(auto location, AzureLocation::FromString(path));
return impl_->DeleteDirContents(location, missing_dir_ok);
}

Status AzureFileSystem::DeleteRootDirContents() {
Expand Down
105 changes: 105 additions & 0 deletions cpp/src/arrow/filesystem/azurefs_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,45 @@ class AzureFileSystemTest : public ::testing::Test {

void RunGetFileInfoObjectWithNestedStructureTest();
void RunGetFileInfoObjectTest();

struct HierarchicalPaths {
std::string container;
std::string directory;
std::vector<std::string> sub_paths;
};

// Need to use "void" as the return type to use ASSERT_* in this method.
void CreateHierarchicalData(HierarchicalPaths& paths) {
const auto container_path = RandomContainerName();
const auto directory_path =
internal::ConcatAbstractPath(container_path, RandomDirectoryName());
const auto sub_directory_path =
internal::ConcatAbstractPath(directory_path, "new-sub");
const auto sub_blob_path =
internal::ConcatAbstractPath(sub_directory_path, "sub.txt");
const auto top_blob_path = internal::ConcatAbstractPath(directory_path, "top.txt");
ASSERT_OK(fs_->CreateDir(sub_directory_path, true));
ASSERT_OK_AND_ASSIGN(auto output, fs_->OpenOutputStream(sub_blob_path));
ASSERT_OK(output->Write(std::string_view("sub")));
ASSERT_OK(output->Close());
ASSERT_OK_AND_ASSIGN(output, fs_->OpenOutputStream(top_blob_path));
ASSERT_OK(output->Write(std::string_view("top")));
ASSERT_OK(output->Close());

AssertFileInfo(fs_.get(), container_path, FileType::Directory);
AssertFileInfo(fs_.get(), directory_path, FileType::Directory);
AssertFileInfo(fs_.get(), sub_directory_path, FileType::Directory);
AssertFileInfo(fs_.get(), sub_blob_path, FileType::File);
AssertFileInfo(fs_.get(), top_blob_path, FileType::File);

paths.container = container_path;
paths.directory = directory_path;
paths.sub_paths = {
sub_directory_path,
sub_blob_path,
top_blob_path,
};
}
};

class AzuriteFileSystemTest : public AzureFileSystemTest {
Expand Down Expand Up @@ -666,6 +705,72 @@ TEST_F(AzuriteFileSystemTest, DeleteDirUri) {
ASSERT_RAISES(Invalid, fs_->DeleteDir("abfs://" + PreexistingContainerPath()));
}

TEST_F(AzuriteFileSystemTest, DeleteDirContentsSuccessContainer) {
#ifdef __APPLE__
GTEST_SKIP() << "This test fails by an Azurite problem: "
"https://github.com/Azure/Azurite/pull/2302";
#endif
HierarchicalPaths paths;
CreateHierarchicalData(paths);
ASSERT_OK(fs_->DeleteDirContents(paths.container));
arrow::fs::AssertFileInfo(fs_.get(), paths.container, FileType::Directory);
arrow::fs::AssertFileInfo(fs_.get(), paths.directory, FileType::NotFound);
for (const auto& sub_path : paths.sub_paths) {
arrow::fs::AssertFileInfo(fs_.get(), sub_path, FileType::NotFound);
}
}

TEST_F(AzuriteFileSystemTest, DeleteDirContentsSuccessDirectory) {
#ifdef __APPLE__
GTEST_SKIP() << "This test fails by an Azurite problem: "
"https://github.com/Azure/Azurite/pull/2302";
#endif
HierarchicalPaths paths;
CreateHierarchicalData(paths);
ASSERT_OK(fs_->DeleteDirContents(paths.directory));
// GH-38772: We may change this to FileType::Directory.
arrow::fs::AssertFileInfo(fs_.get(), paths.directory, FileType::NotFound);
for (const auto& sub_path : paths.sub_paths) {
arrow::fs::AssertFileInfo(fs_.get(), sub_path, FileType::NotFound);
}
}

TEST_F(AzuriteFileSystemTest, DeleteDirContentsSuccessNonexistent) {
const auto directory_path =
internal::ConcatAbstractPath(PreexistingContainerName(), RandomDirectoryName());
ASSERT_OK(fs_->DeleteDirContents(directory_path, true));
arrow::fs::AssertFileInfo(fs_.get(), directory_path, FileType::NotFound);
}

TEST_F(AzuriteFileSystemTest, DeleteDirContentsFailureNonexistent) {
const auto directory_path =
internal::ConcatAbstractPath(PreexistingContainerName(), RandomDirectoryName());
ASSERT_RAISES(IOError, fs_->DeleteDirContents(directory_path, false));
}

TEST_F(AzureHierarchicalNamespaceFileSystemTest, DeleteDirContentsSuccessExist) {
HierarchicalPaths paths;
CreateHierarchicalData(paths);
ASSERT_OK(fs_->DeleteDirContents(paths.directory));
arrow::fs::AssertFileInfo(fs_.get(), paths.directory, FileType::Directory);
for (const auto& sub_path : paths.sub_paths) {
arrow::fs::AssertFileInfo(fs_.get(), sub_path, FileType::NotFound);
}
}

TEST_F(AzureHierarchicalNamespaceFileSystemTest, DeleteDirContentsSuccessNonexistent) {
const auto directory_path =
internal::ConcatAbstractPath(PreexistingContainerName(), RandomDirectoryName());
ASSERT_OK(fs_->DeleteDirContents(directory_path, true));
arrow::fs::AssertFileInfo(fs_.get(), directory_path, FileType::NotFound);
}

TEST_F(AzureHierarchicalNamespaceFileSystemTest, DeleteDirContentsFailureNonexistent) {
const auto directory_path =
internal::ConcatAbstractPath(PreexistingContainerName(), RandomDirectoryName());
ASSERT_RAISES(IOError, fs_->DeleteDirContents(directory_path, false));
}

TEST_F(AzuriteFileSystemTest, OpenInputStreamString) {
std::shared_ptr<io::InputStream> stream;
ASSERT_OK_AND_ASSIGN(stream, fs_->OpenInputStream(PreexistingObjectPath()));
Expand Down

0 comments on commit 282d905

Please sign in to comment.