Skip to content

Commit

Permalink
apacheGH-38700: [C++][FS][Azure] Implement DeleteDir()
Browse files Browse the repository at this point in the history
  • Loading branch information
kou committed Nov 21, 2023
1 parent c1b12ca commit 7212e6a
Show file tree
Hide file tree
Showing 2 changed files with 184 additions and 1 deletion.
92 changes: 91 additions & 1 deletion cpp/src/arrow/filesystem/azurefs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -969,6 +969,95 @@ class AzureFileSystem::Impl {
RETURN_NOT_OK(stream->Init());
return stream;
}

Status DeleteDir(const AzureLocation& location) {
if (location.container.empty()) {
return Status::Invalid("Cannot delete an empty container");
}

if (location.path.empty()) {
auto container_client =
blob_service_client_->GetBlobContainerClient(location.container);
try {
auto response = container_client.Delete();
if (response.Value.Deleted) {
return Status::OK();
} else {
return StatusFromErrorResponse(
container_client.GetUrl(), response.RawResponse.get(),
"Failed to delete a container: " + location.container);
}
} catch (const Azure::Storage::StorageException& exception) {
return internal::ExceptionToStatus(
"Failed to delete a container: " + location.container + ": " +
container_client.GetUrl(),
exception);
}
}

ARROW_ASSIGN_OR_RAISE(auto hierarchical_namespace_enabled,
hierarchical_namespace_.Enabled(location.container));
if (hierarchical_namespace_enabled) {
auto directory_client =
datalake_service_client_->GetFileSystemClient(location.container)
.GetDirectoryClient(location.path);
try {
auto response = directory_client.DeleteRecursive();
if (response.Value.Deleted) {
return Status::OK();
} else {
return StatusFromErrorResponse(
directory_client.GetUrl(), response.RawResponse.get(),
"Failed to delete a directory: " + location.path);
}
} catch (const Azure::Storage::StorageException& exception) {
return internal::ExceptionToStatus(
"Failed to delete a directory: " + location.path + ": " +
directory_client.GetUrl(),
exception);
}
} else {
auto container_client =
blob_service_client_->GetBlobContainerClient(location.container);
Azure::Storage::Blobs::ListBlobsOptions options;
options.Prefix = internal::EnsureTrailingSlash(location.path);
// https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch#remarks
//
// Only supports up to 256 subrequests in a single batch. The
// size of the body for a batch request can't exceed 4 MB.
const int32_t kNumMaxRequestsInBatch = 256;
options.PageSizeHint = kNumMaxRequestsInBatch;
auto list_response = container_client.ListBlobs(options);
while (list_response.HasPage() && !list_response.Blobs.empty()) {
auto batch = container_client.CreateBatch();
std::vector<Azure::Storage::DeferredResponse<
Azure::Storage::Blobs::Models::DeleteBlobResult>>
deferred_responses;
for (const auto& blob_item : list_response.Blobs) {
deferred_responses.push_back(batch.DeleteBlob(blob_item.Name));
}
try {
container_client.SubmitBatch(batch);
} catch (const Azure::Storage::StorageException& exception) {
return internal::ExceptionToStatus(
"Failed to delete blobs in a directory: " + location.path + ": " +
container_client.GetUrl(),
exception);
}
for (size_t i = 0; i < deferred_responses.size(); ++i) {
const auto& deferred_response = deferred_responses[i];
auto delete_response = deferred_response.GetResponse();
if (!delete_response.Value.Deleted) {
const auto& blob_item = list_response.Blobs[i];
return Status::IOError("Failed to delete a blob: ", blob_item.Name,
": " + container_client.GetUrl());
}
}
list_response.MoveToNextPage();
}
return Status::OK();
}
}
};

const AzureOptions& AzureFileSystem::options() const { return impl_->options(); }
Expand Down Expand Up @@ -1003,7 +1092,8 @@ Status AzureFileSystem::CreateDir(const std::string& path, bool recursive) {
}

Status AzureFileSystem::DeleteDir(const std::string& path) {
return Status::NotImplemented("The Azure FileSystem is not fully implemented");
ARROW_ASSIGN_OR_RAISE(auto location, AzureLocation::FromString(path));
return impl_->DeleteDir(location);
}

Status AzureFileSystem::DeleteDirContents(const std::string& path, bool missing_dir_ok) {
Expand Down
93 changes: 93 additions & 0 deletions cpp/src/arrow/filesystem/azurefs_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,99 @@ TEST_F(AzuriteFileSystemTest, CreateDirUri) {
ASSERT_RAISES(Invalid, fs_->CreateDir("abfs://" + RandomContainerName(), true));
}

TEST_F(AzuriteFileSystemTest, DeleteDirSuccessContainer) {
const auto container_name = RandomContainerName();
ASSERT_OK(fs_->CreateDir(container_name));
arrow::fs::AssertFileInfo(fs_.get(), container_name, FileType::Directory);
ASSERT_OK(fs_->DeleteDir(container_name));
arrow::fs::AssertFileInfo(fs_.get(), container_name, FileType::NotFound);
}

TEST_F(AzuriteFileSystemTest, DeleteDirSuccessEmpty) {
const auto directory_path =
internal::ConcatAbstractPath(PreexistingContainerName(), RandomDirectoryName());
// There is only virtual directory without hierarchical namespace
// support. So the CreateDir() and DeleteDir() do nothing.
ASSERT_OK(fs_->CreateDir(directory_path));
arrow::fs::AssertFileInfo(fs_.get(), directory_path, FileType::NotFound);
ASSERT_OK(fs_->DeleteDir(directory_path));
arrow::fs::AssertFileInfo(fs_.get(), directory_path, FileType::NotFound);
}

TEST_F(AzuriteFileSystemTest, DeleteDirSuccessNonexistent) {
const auto directory_path =
internal::ConcatAbstractPath(PreexistingContainerName(), RandomDirectoryName());
// There is only virtual directory without hierarchical namespace
// support. So the DeleteDir() for nonexistent directory does nothing.
ASSERT_OK(fs_->DeleteDir(directory_path));
arrow::fs::AssertFileInfo(fs_.get(), directory_path, FileType::NotFound);
}

TEST_F(AzuriteFileSystemTest, DeleteDirSuccessHaveBlobs) {
const auto directory_path =
internal::ConcatAbstractPath(PreexistingContainerName(), RandomDirectoryName());
// We must use 257 or more blobs here to test pagination of ListBlobs().
// Because we can't add 257 or more delete blob requests to one SubmitBatch().
int64_t n_blobs = 300;
for (int64_t i = 0; i < n_blobs; ++i) {
const auto blob_path =
internal::ConcatAbstractPath(directory_path, std::to_string(i) + ".txt");
ASSERT_OK_AND_ASSIGN(auto output, fs_->OpenOutputStream(blob_path));
ASSERT_OK(output->Write(std::string_view(std::to_string(i))));
ASSERT_OK(output->Close());
arrow::fs::AssertFileInfo(fs_.get(), blob_path, FileType::File);
}
ASSERT_OK(fs_->DeleteDir(directory_path));
for (int64_t i = 0; i < n_blobs; ++i) {
const auto blob_path =
internal::ConcatAbstractPath(directory_path, std::to_string(i) + ".txt");
arrow::fs::AssertFileInfo(fs_.get(), blob_path, FileType::NotFound);
}
}

TEST_F(AzureHierarchicalNamespaceFileSystemTest, DeleteDirSuccessEmpty) {
const auto directory_path =
internal::ConcatAbstractPath(PreexistingContainerName(), RandomDirectoryName());
ASSERT_OK(fs_->CreateDir(directory_path, true));
arrow::fs::AssertFileInfo(fs_.get(), directory_path, FileType::Directory);
ASSERT_OK(fs_->DeleteDir(directory_path));
arrow::fs::AssertFileInfo(fs_.get(), directory_path, FileType::NotFound);
}

TEST_F(AzureHierarchicalNamespaceFileSystemTest, DeleteDirFailureNonexistent) {
const auto path =
internal::ConcatAbstractPath(PreexistingContainerName(), RandomDirectoryName());
ASSERT_RAISES(IOError, fs_->DeleteDir(path));
}

TEST_F(AzureHierarchicalNamespaceFileSystemTest, DeleteDirSuccessHaveBlob) {
const auto directory_path =
internal::ConcatAbstractPath(PreexistingContainerName(), RandomDirectoryName());
const auto blob_path = internal::ConcatAbstractPath(directory_path, "hello.txt");
ASSERT_OK_AND_ASSIGN(auto output, fs_->OpenOutputStream(blob_path));
ASSERT_OK(output->Write(std::string_view("hello")));
ASSERT_OK(output->Close());
arrow::fs::AssertFileInfo(fs_.get(), blob_path, FileType::File);
ASSERT_OK(fs_->DeleteDir(directory_path));
arrow::fs::AssertFileInfo(fs_.get(), blob_path, FileType::NotFound);
}

TEST_F(AzureHierarchicalNamespaceFileSystemTest, DeleteDirSuccessHaveDirectory) {
const auto parent =
internal::ConcatAbstractPath(PreexistingContainerName(), RandomDirectoryName());
const auto path = internal::ConcatAbstractPath(parent, "new-sub");
ASSERT_OK(fs_->CreateDir(path, true));
arrow::fs::AssertFileInfo(fs_.get(), path, FileType::Directory);
arrow::fs::AssertFileInfo(fs_.get(), parent, FileType::Directory);
ASSERT_OK(fs_->DeleteDir(parent));
arrow::fs::AssertFileInfo(fs_.get(), path, FileType::NotFound);
arrow::fs::AssertFileInfo(fs_.get(), parent, FileType::NotFound);
}

TEST_F(AzuriteFileSystemTest, DeleteDirUri) {
ASSERT_RAISES(Invalid, fs_->DeleteDir("abfs://" + PreexistingContainerPath()));
}

TEST_F(AzuriteFileSystemTest, OpenInputStreamString) {
std::shared_ptr<io::InputStream> stream;
ASSERT_OK_AND_ASSIGN(stream, fs_->OpenInputStream(PreexistingObjectPath()));
Expand Down

0 comments on commit 7212e6a

Please sign in to comment.