Skip to content

Commit

Permalink
apacheGH-38705: [C++][FS][Azure] Implement CopyFile() (apache#39058)
Browse files Browse the repository at this point in the history
### Rationale for this change

`CopyFile()` copies the given source to the given destination. Both of source and destination must be blob name like other filesystem implementations.

### What changes are included in this PR?

Use `CopyFromUri()` API that should use server-side copy.

### Are these changes tested?

Yes.

### Are there any user-facing changes?

No.
* Closes: apache#38705

Authored-by: Sutou Kouhei <[email protected]>
Signed-off-by: Sutou Kouhei <[email protected]>
  • Loading branch information
kou authored and dgreiss committed Feb 17, 2024
1 parent b329935 commit 11089ae
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 7 deletions.
39 changes: 32 additions & 7 deletions cpp/src/arrow/filesystem/azurefs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ struct AzureLocation {
return parent;
}

Result<AzureLocation> join(const std::string& stem) const {
return FromString(internal::ConcatAbstractPath(all, stem));
}

bool has_parent() const { return !path.empty(); }

bool empty() const { return container.empty() && path.empty(); }
Expand Down Expand Up @@ -149,6 +153,7 @@ Status ValidateFileLocation(const AzureLocation& location) {
if (location.path.empty()) {
return NotAFile(location);
}
ARROW_RETURN_NOT_OK(internal::AssertNoTrailingSlash(location.path));
return Status::OK();
}

Expand Down Expand Up @@ -818,7 +823,6 @@ class AzureFileSystem::Impl {
Result<std::shared_ptr<ObjectInputFile>> OpenInputFile(const AzureLocation& location,
AzureFileSystem* fs) {
RETURN_NOT_OK(ValidateFileLocation(location));
ARROW_RETURN_NOT_OK(internal::AssertNoTrailingSlash(location.path));
auto blob_client = std::make_shared<Azure::Storage::Blobs::BlobClient>(
blob_service_client_->GetBlobContainerClient(location.container)
.GetBlobClient(location.path));
Expand All @@ -831,7 +835,6 @@ class AzureFileSystem::Impl {

Result<std::shared_ptr<ObjectInputFile>> OpenInputFile(const FileInfo& info,
AzureFileSystem* fs) {
ARROW_RETURN_NOT_OK(internal::AssertNoTrailingSlash(info.path()));
if (info.type() == FileType::NotFound) {
return ::arrow::fs::internal::PathNotFound(info.path());
}
Expand Down Expand Up @@ -951,7 +954,6 @@ class AzureFileSystem::Impl {
const std::shared_ptr<const KeyValueMetadata>& metadata, const bool truncate,
AzureFileSystem* fs) {
RETURN_NOT_OK(ValidateFileLocation(location));
ARROW_RETURN_NOT_OK(internal::AssertNoTrailingSlash(location.path));

auto block_blob_client = std::make_shared<Azure::Storage::Blobs::BlockBlobClient>(
blob_service_client_->GetBlobContainerClient(location.container)
Expand All @@ -971,7 +973,7 @@ class AzureFileSystem::Impl {
}

private:
Status DeleteDirContentsWihtoutHierarchicalNamespace(const AzureLocation& location,
Status DeleteDirContentsWithoutHierarchicalNamespace(const AzureLocation& location,
bool missing_dir_ok) {
auto container_client =
blob_service_client_->GetBlobContainerClient(location.container);
Expand Down Expand Up @@ -1092,7 +1094,7 @@ class AzureFileSystem::Impl {
exception);
}
} else {
return DeleteDirContentsWihtoutHierarchicalNamespace(location,
return DeleteDirContentsWithoutHierarchicalNamespace(location,
/*missing_dir_ok=*/true);
}
}
Expand Down Expand Up @@ -1149,9 +1151,30 @@ class AzureFileSystem::Impl {
}
return Status::OK();
} else {
return DeleteDirContentsWihtoutHierarchicalNamespace(location, missing_dir_ok);
return DeleteDirContentsWithoutHierarchicalNamespace(location, missing_dir_ok);
}
}

Status CopyFile(const AzureLocation& src, const AzureLocation& dest) {
RETURN_NOT_OK(ValidateFileLocation(src));
RETURN_NOT_OK(ValidateFileLocation(dest));
if (src == dest) {
return Status::OK();
}
auto dest_blob_client = blob_service_client_->GetBlobContainerClient(dest.container)
.GetBlobClient(dest.path);
auto src_url = blob_service_client_->GetBlobContainerClient(src.container)
.GetBlobClient(src.path)
.GetUrl();
try {
dest_blob_client.CopyFromUri(src_url);
} catch (const Azure::Storage::StorageException& exception) {
return internal::ExceptionToStatus(
"Failed to copy a blob. (" + src_url + " -> " + dest_blob_client.GetUrl() + ")",
exception);
}
return Status::OK();
}
};

const AzureOptions& AzureFileSystem::options() const { return impl_->options(); }
Expand Down Expand Up @@ -1208,7 +1231,9 @@ Status AzureFileSystem::Move(const std::string& src, const std::string& dest) {
}

Status AzureFileSystem::CopyFile(const std::string& src, const std::string& dest) {
return Status::NotImplemented("The Azure FileSystem is not fully implemented");
ARROW_ASSIGN_OR_RAISE(auto src_location, AzureLocation::FromString(src));
ARROW_ASSIGN_OR_RAISE(auto dest_location, AzureLocation::FromString(dest));
return impl_->CopyFile(src_location, dest_location);
}

Result<std::shared_ptr<io::InputStream>> AzureFileSystem::OpenInputStream(
Expand Down
45 changes: 45 additions & 0 deletions cpp/src/arrow/filesystem/azurefs_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,51 @@ TEST_F(AzureHierarchicalNamespaceFileSystemTest, DeleteDirContentsFailureNonexis
ASSERT_RAISES(IOError, fs_->DeleteDirContents(directory_path, false));
}

TEST_F(AzuriteFileSystemTest, CopyFileSuccessDestinationNonexistent) {
const auto destination_path =
internal::ConcatAbstractPath(PreexistingContainerName(), "copy-destionation");
ASSERT_OK(fs_->CopyFile(PreexistingObjectPath(), destination_path));
ASSERT_OK_AND_ASSIGN(auto info, fs_->GetFileInfo(destination_path));
ASSERT_OK_AND_ASSIGN(auto stream, fs_->OpenInputStream(info));
ASSERT_OK_AND_ASSIGN(auto buffer, stream->Read(1024));
EXPECT_EQ(kLoremIpsum, buffer->ToString());
}

TEST_F(AzuriteFileSystemTest, CopyFileSuccessDestinationSame) {
ASSERT_OK(fs_->CopyFile(PreexistingObjectPath(), PreexistingObjectPath()));
ASSERT_OK_AND_ASSIGN(auto info, fs_->GetFileInfo(PreexistingObjectPath()));
ASSERT_OK_AND_ASSIGN(auto stream, fs_->OpenInputStream(info));
ASSERT_OK_AND_ASSIGN(auto buffer, stream->Read(1024));
EXPECT_EQ(kLoremIpsum, buffer->ToString());
}

TEST_F(AzuriteFileSystemTest, CopyFileFailureDestinationTrailingSlash) {
ASSERT_RAISES(IOError,
fs_->CopyFile(PreexistingObjectPath(),
internal::EnsureTrailingSlash(PreexistingObjectPath())));
}

TEST_F(AzuriteFileSystemTest, CopyFileFailureSourceNonexistent) {
const auto destination_path =
internal::ConcatAbstractPath(PreexistingContainerName(), "copy-destionation");
ASSERT_RAISES(IOError, fs_->CopyFile(NotFoundObjectPath(), destination_path));
}

TEST_F(AzuriteFileSystemTest, CopyFileFailureDestinationParentNonexistent) {
const auto destination_path =
internal::ConcatAbstractPath(RandomContainerName(), "copy-destionation");
ASSERT_RAISES(IOError, fs_->CopyFile(PreexistingObjectPath(), destination_path));
}

TEST_F(AzuriteFileSystemTest, CopyFileUri) {
const auto destination_path =
internal::ConcatAbstractPath(PreexistingContainerName(), "copy-destionation");
ASSERT_RAISES(Invalid,
fs_->CopyFile("abfs://" + PreexistingObjectPath(), destination_path));
ASSERT_RAISES(Invalid,
fs_->CopyFile(PreexistingObjectPath(), "abfs://" + destination_path));
}

TEST_F(AzuriteFileSystemTest, OpenInputStreamString) {
std::shared_ptr<io::InputStream> stream;
ASSERT_OK_AND_ASSIGN(stream, fs_->OpenInputStream(PreexistingObjectPath()));
Expand Down

0 comments on commit 11089ae

Please sign in to comment.