From 2f882cdcea5b1b5d5276478ca3ae86d5b57aaef0 Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Tue, 21 Nov 2023 03:55:16 +0000 Subject: [PATCH] GH-38333: [C++][FS][Azure] Implement file writes (#38780) ### Rationale for this change Writing files is an important part of the filesystem ### What changes are included in this PR? Implements `OpenOutputStream` and `OpenAppendStream` for Azure. - Initially I started with the implementation from https://github.com/apache/arrow/pull/12914 but I made quite a few changes: - Removed the different code path for hierarchical namespace accounts. There should not be any performance advantage to using special APIs only available on hierachical namespace accounts. - Only implement `ObjectAppendStream`, not `ObjectOutputStream`. `OpenOutputStream` is implemented by truncating the existing file then returning a `ObjectAppendStream`. - More precise use of `try` `catch`. Every call to Azure is wrapped in a `try` `catch` and should return a descriptive error status. - Avoid unnecessary calls to Azure. For example we now maintain the block list in memory and commit it only once on flush. https://github.com/apache/arrow/pull/12914 committed the block list after each block that was staged and on flush queried Azure to get the list of uncommitted blocks. The new approach is consistent with the Azure fsspec implementation https://github.com/fsspec/adlfs/blob/092685f102c5cd215550d10e8347e5bce0e2b93d/adlfs/spec.py#L2009 - Adjust the block_ids slightly to minimise the risk of them conflicting with blocks written by other blob storage clients. - Implement metadata writes. Includes adding default metadata to `AzureOptions`. - Tests are based on the `gscfs_test.cc` but I added a couple of extra. - Handle the TODO(GH-38780) comments for using the Azure fs to write data in tests ### Are these changes tested? Yes. Everything should be covered by azurite tests ### Are there any user-facing changes? Yes. The Azure filesystem now supports file writes. * Closes: #38333 Lead-authored-by: Thomas Newton Co-authored-by: Sutou Kouhei Signed-off-by: Sutou Kouhei --- cpp/src/arrow/filesystem/azurefs.cc | 255 ++++++++++++++++++++++- cpp/src/arrow/filesystem/azurefs.h | 5 + cpp/src/arrow/filesystem/azurefs_test.cc | 192 +++++++++++++++-- 3 files changed, 429 insertions(+), 23 deletions(-) diff --git a/cpp/src/arrow/filesystem/azurefs.cc b/cpp/src/arrow/filesystem/azurefs.cc index bd0e353e4a03a..2c3d81ca24c51 100644 --- a/cpp/src/arrow/filesystem/azurefs.cc +++ b/cpp/src/arrow/filesystem/azurefs.cc @@ -24,6 +24,7 @@ #include "arrow/buffer.h" #include "arrow/filesystem/path_util.h" #include "arrow/filesystem/util_internal.h" +#include "arrow/io/util_internal.h" #include "arrow/result.h" #include "arrow/util/checked_cast.h" #include "arrow/util/formatting.h" @@ -43,7 +44,8 @@ AzureOptions::AzureOptions() {} bool AzureOptions::Equals(const AzureOptions& other) const { return (account_dfs_url == other.account_dfs_url && account_blob_url == other.account_blob_url && - credentials_kind == other.credentials_kind); + credentials_kind == other.credentials_kind && + default_metadata == other.default_metadata); } Status AzureOptions::ConfigureAccountKeyCredentials(const std::string& account_name, @@ -461,6 +463,225 @@ class ObjectInputFile final : public io::RandomAccessFile { int64_t content_length_ = kNoSize; std::shared_ptr metadata_; }; + +Status CreateEmptyBlockBlob( + std::shared_ptr block_blob_client) { + try { + block_blob_client->UploadFrom(nullptr, 0); + } catch (const Azure::Storage::StorageException& exception) { + return internal::ExceptionToStatus( + "UploadFrom failed for '" + block_blob_client->GetUrl() + + "' with an unexpected Azure error. There is no existing blob at this " + "location or the existing blob must be replaced so ObjectAppendStream must " + "create a new empty block blob.", + exception); + } + return Status::OK(); +} + +Result GetBlockList( + std::shared_ptr block_blob_client) { + try { + return block_blob_client->GetBlockList().Value; + } catch (Azure::Storage::StorageException& exception) { + return internal::ExceptionToStatus( + "GetBlockList failed for '" + block_blob_client->GetUrl() + + "' with an unexpected Azure error. Cannot write to a file without first " + "fetching the existing block list.", + exception); + } +} + +Azure::Storage::Metadata ArrowMetadataToAzureMetadata( + const std::shared_ptr& arrow_metadata) { + Azure::Storage::Metadata azure_metadata; + for (auto key_value : arrow_metadata->sorted_pairs()) { + azure_metadata[key_value.first] = key_value.second; + } + return azure_metadata; +} + +Status CommitBlockList( + std::shared_ptr block_blob_client, + const std::vector& block_ids, const Azure::Storage::Metadata& metadata) { + Azure::Storage::Blobs::CommitBlockListOptions options; + options.Metadata = metadata; + try { + // CommitBlockList puts all block_ids in the latest element. That means in the case of + // overlapping block_ids the newly staged block ids will always replace the + // previously committed blocks. + // https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-list?tabs=microsoft-entra-id#request-body + block_blob_client->CommitBlockList(block_ids, options); + } catch (const Azure::Storage::StorageException& exception) { + return internal::ExceptionToStatus( + "CommitBlockList failed for '" + block_blob_client->GetUrl() + + "' with an unexpected Azure error. Committing is required to flush an " + "output/append stream.", + exception); + } + return Status::OK(); +} + +class ObjectAppendStream final : public io::OutputStream { + public: + ObjectAppendStream( + std::shared_ptr block_blob_client, + const io::IOContext& io_context, const AzureLocation& location, + const std::shared_ptr& metadata, + const AzureOptions& options, int64_t size = kNoSize) + : block_blob_client_(std::move(block_blob_client)), + io_context_(io_context), + location_(location), + content_length_(size) { + if (metadata && metadata->size() != 0) { + metadata_ = ArrowMetadataToAzureMetadata(metadata); + } else if (options.default_metadata && options.default_metadata->size() != 0) { + metadata_ = ArrowMetadataToAzureMetadata(options.default_metadata); + } + } + + ~ObjectAppendStream() override { + // For compliance with the rest of the IO stack, Close rather than Abort, + // even though it may be more expensive. + io::internal::CloseFromDestructor(this); + } + + Status Init() { + if (content_length_ != kNoSize) { + DCHECK_GE(content_length_, 0); + pos_ = content_length_; + } else { + try { + auto properties = block_blob_client_->GetProperties(); + content_length_ = properties.Value.BlobSize; + pos_ = content_length_; + } catch (const Azure::Storage::StorageException& exception) { + if (exception.StatusCode == Azure::Core::Http::HttpStatusCode::NotFound) { + RETURN_NOT_OK(CreateEmptyBlockBlob(block_blob_client_)); + } else { + return internal::ExceptionToStatus( + "GetProperties failed for '" + block_blob_client_->GetUrl() + + "' with an unexpected Azure error. Can not initialise an " + "ObjectAppendStream without knowing whether a file already exists at " + "this path, and if it exists, its size.", + exception); + } + content_length_ = 0; + } + } + if (content_length_ > 0) { + ARROW_ASSIGN_OR_RAISE(auto block_list, GetBlockList(block_blob_client_)); + for (auto block : block_list.CommittedBlocks) { + block_ids_.push_back(block.Name); + } + } + return Status::OK(); + } + + Status Abort() override { + if (closed_) { + return Status::OK(); + } + block_blob_client_ = nullptr; + closed_ = true; + return Status::OK(); + } + + Status Close() override { + if (closed_) { + return Status::OK(); + } + RETURN_NOT_OK(Flush()); + block_blob_client_ = nullptr; + closed_ = true; + return Status::OK(); + } + + bool closed() const override { return closed_; } + + Status CheckClosed(const char* action) const { + if (closed_) { + return Status::Invalid("Cannot ", action, " on closed stream."); + } + return Status::OK(); + } + + Result Tell() const override { + RETURN_NOT_OK(CheckClosed("tell")); + return pos_; + } + + Status Write(const std::shared_ptr& buffer) override { + return DoAppend(buffer->data(), buffer->size(), buffer); + } + + Status Write(const void* data, int64_t nbytes) override { + return DoAppend(data, nbytes); + } + + Status Flush() override { + RETURN_NOT_OK(CheckClosed("flush")); + return CommitBlockList(block_blob_client_, block_ids_, metadata_); + } + + private: + Status DoAppend(const void* data, int64_t nbytes, + std::shared_ptr owned_buffer = nullptr) { + RETURN_NOT_OK(CheckClosed("append")); + auto append_data = reinterpret_cast(data); + Azure::Core::IO::MemoryBodyStream block_content(append_data, nbytes); + if (block_content.Length() == 0) { + return Status::OK(); + } + + const auto n_block_ids = block_ids_.size(); + + // New block ID must always be distinct from the existing block IDs. Otherwise we + // will accidentally replace the content of existing blocks, causing corruption. + // We will use monotonically increasing integers. + auto new_block_id = std::to_string(n_block_ids); + + // Pad to 5 digits, because Azure allows a maximum of 50,000 blocks. + const size_t target_number_of_digits = 5; + const auto required_padding_digits = + target_number_of_digits - std::min(target_number_of_digits, new_block_id.size()); + new_block_id.insert(0, required_padding_digits, '0'); + // There is a small risk when appending to a blob created by another client that + // `new_block_id` may overlapping with an existing block id. Adding the `-arrow` + // suffix significantly reduces the risk, but does not 100% eliminate it. For example + // if the blob was previously created with one block, with id `00001-arrow` then the + // next block we append will conflict with that, and cause corruption. + new_block_id += "-arrow"; + new_block_id = Azure::Core::Convert::Base64Encode( + std::vector(new_block_id.begin(), new_block_id.end())); + + try { + block_blob_client_->StageBlock(new_block_id, block_content); + } catch (const Azure::Storage::StorageException& exception) { + return internal::ExceptionToStatus( + "StageBlock failed for '" + block_blob_client_->GetUrl() + "' new_block_id: '" + + new_block_id + + "' with an unexpected Azure error. Staging new blocks is fundamental to " + "streaming writes to blob storage.", + exception); + } + block_ids_.push_back(new_block_id); + pos_ += nbytes; + content_length_ += nbytes; + return Status::OK(); + } + + std::shared_ptr block_blob_client_; + const io::IOContext io_context_; + const AzureLocation location_; + + bool closed_ = false; + int64_t pos_ = 0; + int64_t content_length_ = kNoSize; + std::vector block_ids_; + Azure::Storage::Metadata metadata_; +}; + } // namespace // ----------------------------------------------------------------------- @@ -724,6 +945,30 @@ class AzureFileSystem::Impl { return Status::OK(); } + + Result> OpenAppendStream( + const AzureLocation& location, + const std::shared_ptr& metadata, const bool truncate, + AzureFileSystem* fs) { + RETURN_NOT_OK(ValidateFileLocation(location)); + ARROW_RETURN_NOT_OK(internal::AssertNoTrailingSlash(location.path)); + + auto block_blob_client = std::make_shared( + blob_service_client_->GetBlobContainerClient(location.container) + .GetBlockBlobClient(location.path)); + + std::shared_ptr stream; + if (truncate) { + RETURN_NOT_OK(CreateEmptyBlockBlob(block_blob_client)); + stream = std::make_shared(block_blob_client, fs->io_context(), + location, metadata, options_, 0); + } else { + stream = std::make_shared(block_blob_client, fs->io_context(), + location, metadata, options_); + } + RETURN_NOT_OK(stream->Init()); + return stream; + } }; const AzureOptions& AzureFileSystem::options() const { return impl_->options(); } @@ -805,12 +1050,14 @@ Result> AzureFileSystem::OpenInputFile( Result> AzureFileSystem::OpenOutputStream( const std::string& path, const std::shared_ptr& metadata) { - return Status::NotImplemented("The Azure FileSystem is not fully implemented"); + ARROW_ASSIGN_OR_RAISE(auto location, AzureLocation::FromString(path)); + return impl_->OpenAppendStream(location, metadata, true, this); } Result> AzureFileSystem::OpenAppendStream( - const std::string&, const std::shared_ptr&) { - return Status::NotImplemented("The Azure FileSystem is not fully implemented"); + const std::string& path, const std::shared_ptr& metadata) { + ARROW_ASSIGN_OR_RAISE(auto location, AzureLocation::FromString(path)); + return impl_->OpenAppendStream(location, metadata, false, this); } Result> AzureFileSystem::Make( diff --git a/cpp/src/arrow/filesystem/azurefs.h b/cpp/src/arrow/filesystem/azurefs.h index 1f7047ff94c56..9f980ee8baae0 100644 --- a/cpp/src/arrow/filesystem/azurefs.h +++ b/cpp/src/arrow/filesystem/azurefs.h @@ -77,6 +77,11 @@ struct ARROW_EXPORT AzureOptions { std::shared_ptr service_principle_credentials_provider; + /// \brief Default metadata for OpenOutputStream. + /// + /// This will be ignored if non-empty metadata is passed to OpenOutputStream. + std::shared_ptr default_metadata; + AzureOptions(); Status ConfigureAccountKeyCredentials(const std::string& account_name, diff --git a/cpp/src/arrow/filesystem/azurefs_test.cc b/cpp/src/arrow/filesystem/azurefs_test.cc index ecf0a19f684eb..e9b9a6f34b88c 100644 --- a/cpp/src/arrow/filesystem/azurefs_test.cc +++ b/cpp/src/arrow/filesystem/azurefs_test.cc @@ -232,13 +232,11 @@ class AzureFileSystemTest : public ::testing::Test { void UploadLines(const std::vector& lines, const char* path_to_file, int total_size) { - // TODO(GH-38333): Switch to using Azure filesystem to write once its implemented. - auto blob_client = - blob_service_client_->GetBlobContainerClient(PreexistingContainerName()) - .GetBlockBlobClient(path_to_file); - std::string all_lines = std::accumulate(lines.begin(), lines.end(), std::string("")); - blob_client.UploadFrom(reinterpret_cast(all_lines.data()), - total_size); + const auto path = PreexistingContainerPath() + path_to_file; + ASSERT_OK_AND_ASSIGN(auto output, fs_->OpenOutputStream(path, {})); + const auto all_lines = std::accumulate(lines.begin(), lines.end(), std::string("")); + ASSERT_OK(output->Write(all_lines)); + ASSERT_OK(output->Close()); } void RunGetFileInfoObjectWithNestedStructureTest(); @@ -347,21 +345,26 @@ void AzureFileSystemTest::RunGetFileInfoObjectWithNestedStructureTest() { // Adds detailed tests to handle cases of different edge cases // with directory naming conventions (e.g. with and without slashes). constexpr auto kObjectName = "test-object-dir/some_other_dir/another_dir/foo"; - // TODO(GH-38333): Switch to using Azure filesystem to write once its implemented. - blob_service_client_->GetBlobContainerClient(PreexistingContainerName()) - .GetBlockBlobClient(kObjectName) - .UploadFrom(reinterpret_cast(kLoremIpsum), strlen(kLoremIpsum)); + ASSERT_OK_AND_ASSIGN( + auto output, + fs_->OpenOutputStream(PreexistingContainerPath() + kObjectName, /*metadata=*/{})); + const std::string_view data(kLoremIpsum); + ASSERT_OK(output->Write(data)); + ASSERT_OK(output->Close()); // 0 is immediately after "/" lexicographically, ensure that this doesn't // cause unexpected issues. - // TODO(GH-38333): Switch to using Azure filesystem to write once its implemented. - blob_service_client_->GetBlobContainerClient(PreexistingContainerName()) - .GetBlockBlobClient("test-object-dir/some_other_dir0") - .UploadFrom(reinterpret_cast(kLoremIpsum), strlen(kLoremIpsum)); - - blob_service_client_->GetBlobContainerClient(PreexistingContainerName()) - .GetBlockBlobClient(std::string(kObjectName) + "0") - .UploadFrom(reinterpret_cast(kLoremIpsum), strlen(kLoremIpsum)); + ASSERT_OK_AND_ASSIGN(output, + fs_->OpenOutputStream( + PreexistingContainerPath() + "test-object-dir/some_other_dir0", + /*metadata=*/{})); + ASSERT_OK(output->Write(data)); + ASSERT_OK(output->Close()); + ASSERT_OK_AND_ASSIGN( + output, fs_->OpenOutputStream(PreexistingContainerPath() + kObjectName + "0", + /*metadata=*/{})); + ASSERT_OK(output->Write(data)); + ASSERT_OK(output->Close()); AssertFileInfo(fs_.get(), PreexistingContainerPath() + kObjectName, FileType::File); AssertFileInfo(fs_.get(), PreexistingContainerPath() + kObjectName + "/", @@ -647,6 +650,157 @@ TEST_F(AzuriteFileSystemTest, OpenInputStreamClosed) { ASSERT_RAISES(Invalid, stream->Tell()); } +TEST_F(AzuriteFileSystemTest, TestWriteMetadata) { + options_.default_metadata = arrow::key_value_metadata({{"foo", "bar"}}); + + ASSERT_OK_AND_ASSIGN(auto fs_with_defaults, AzureFileSystem::Make(options_)); + std::string path = "object_with_defaults"; + auto location = PreexistingContainerPath() + path; + ASSERT_OK_AND_ASSIGN(auto output, + fs_with_defaults->OpenOutputStream(location, /*metadata=*/{})); + const std::string_view expected(kLoremIpsum); + ASSERT_OK(output->Write(expected)); + ASSERT_OK(output->Close()); + + // Verify the metadata has been set. + auto blob_metadata = + blob_service_client_->GetBlobContainerClient(PreexistingContainerName()) + .GetBlockBlobClient(path) + .GetProperties() + .Value.Metadata; + EXPECT_EQ(Azure::Core::CaseInsensitiveMap{std::make_pair("foo", "bar")}, blob_metadata); + + // Check that explicit metadata overrides the defaults. + ASSERT_OK_AND_ASSIGN( + output, fs_with_defaults->OpenOutputStream( + location, /*metadata=*/arrow::key_value_metadata({{"bar", "foo"}}))); + ASSERT_OK(output->Write(expected)); + ASSERT_OK(output->Close()); + blob_metadata = blob_service_client_->GetBlobContainerClient(PreexistingContainerName()) + .GetBlockBlobClient(path) + .GetProperties() + .Value.Metadata; + // Defaults are overwritten and not merged. + EXPECT_EQ(Azure::Core::CaseInsensitiveMap{std::make_pair("bar", "foo")}, blob_metadata); +} + +TEST_F(AzuriteFileSystemTest, OpenOutputStreamSmall) { + const auto path = PreexistingContainerPath() + "test-write-object"; + ASSERT_OK_AND_ASSIGN(auto output, fs_->OpenOutputStream(path, {})); + const std::string_view expected(kLoremIpsum); + ASSERT_OK(output->Write(expected)); + ASSERT_OK(output->Close()); + + // Verify we can read the object back. + ASSERT_OK_AND_ASSIGN(auto input, fs_->OpenInputStream(path)); + + std::array inbuf{}; + ASSERT_OK_AND_ASSIGN(auto size, input->Read(inbuf.size(), inbuf.data())); + + EXPECT_EQ(expected, std::string_view(inbuf.data(), size)); +} + +TEST_F(AzuriteFileSystemTest, OpenOutputStreamLarge) { + const auto path = PreexistingContainerPath() + "test-write-object"; + ASSERT_OK_AND_ASSIGN(auto output, fs_->OpenOutputStream(path, {})); + std::array sizes{257 * 1024, 258 * 1024, 259 * 1024}; + std::array buffers{ + std::string(sizes[0], 'A'), + std::string(sizes[1], 'B'), + std::string(sizes[2], 'C'), + }; + auto expected = std::int64_t{0}; + for (auto i = 0; i != 3; ++i) { + ASSERT_OK(output->Write(buffers[i])); + expected += sizes[i]; + ASSERT_EQ(expected, output->Tell()); + } + ASSERT_OK(output->Close()); + + // Verify we can read the object back. + ASSERT_OK_AND_ASSIGN(auto input, fs_->OpenInputStream(path)); + + std::string contents; + std::shared_ptr buffer; + do { + ASSERT_OK_AND_ASSIGN(buffer, input->Read(128 * 1024)); + ASSERT_TRUE(buffer); + contents.append(buffer->ToString()); + } while (buffer->size() != 0); + + EXPECT_EQ(contents, buffers[0] + buffers[1] + buffers[2]); +} + +TEST_F(AzuriteFileSystemTest, OpenOutputStreamTruncatesExistingFile) { + const auto path = PreexistingContainerPath() + "test-write-object"; + ASSERT_OK_AND_ASSIGN(auto output, fs_->OpenOutputStream(path, {})); + const std::string_view expected0("Existing blob content"); + ASSERT_OK(output->Write(expected0)); + ASSERT_OK(output->Close()); + + // Check that the initial content has been written - if not this test is not achieving + // what it's meant to. + ASSERT_OK_AND_ASSIGN(auto input, fs_->OpenInputStream(path)); + + std::array inbuf{}; + ASSERT_OK_AND_ASSIGN(auto size, input->Read(inbuf.size(), inbuf.data())); + EXPECT_EQ(expected0, std::string_view(inbuf.data(), size)); + + ASSERT_OK_AND_ASSIGN(output, fs_->OpenOutputStream(path, {})); + const std::string_view expected1(kLoremIpsum); + ASSERT_OK(output->Write(expected1)); + ASSERT_OK(output->Close()); + + // Verify that the initial content has been overwritten. + ASSERT_OK_AND_ASSIGN(input, fs_->OpenInputStream(path)); + ASSERT_OK_AND_ASSIGN(size, input->Read(inbuf.size(), inbuf.data())); + EXPECT_EQ(expected1, std::string_view(inbuf.data(), size)); +} + +TEST_F(AzuriteFileSystemTest, OpenAppendStreamDoesNotTruncateExistingFile) { + const auto path = PreexistingContainerPath() + "test-write-object"; + ASSERT_OK_AND_ASSIGN(auto output, fs_->OpenOutputStream(path, {})); + const std::string_view expected0("Existing blob content"); + ASSERT_OK(output->Write(expected0)); + ASSERT_OK(output->Close()); + + // Check that the initial content has been written - if not this test is not achieving + // what it's meant to. + ASSERT_OK_AND_ASSIGN(auto input, fs_->OpenInputStream(path)); + + std::array inbuf{}; + ASSERT_OK_AND_ASSIGN(auto size, input->Read(inbuf.size(), inbuf.data())); + EXPECT_EQ(expected0, std::string_view(inbuf.data())); + + ASSERT_OK_AND_ASSIGN(output, fs_->OpenAppendStream(path, {})); + const std::string_view expected1(kLoremIpsum); + ASSERT_OK(output->Write(expected1)); + ASSERT_OK(output->Close()); + + // Verify that the initial content has not been overwritten and that the block from + // the other client was not committed. + ASSERT_OK_AND_ASSIGN(input, fs_->OpenInputStream(path)); + ASSERT_OK_AND_ASSIGN(size, input->Read(inbuf.size(), inbuf.data())); + EXPECT_EQ(std::string(inbuf.data(), size), + std::string(expected0) + std::string(expected1)); +} + +TEST_F(AzuriteFileSystemTest, OpenOutputStreamClosed) { + const auto path = internal::ConcatAbstractPath(PreexistingContainerName(), + "open-output-stream-closed.txt"); + ASSERT_OK_AND_ASSIGN(auto output, fs_->OpenOutputStream(path, {})); + ASSERT_OK(output->Close()); + ASSERT_RAISES(Invalid, output->Write(kLoremIpsum, std::strlen(kLoremIpsum))); + ASSERT_RAISES(Invalid, output->Flush()); + ASSERT_RAISES(Invalid, output->Tell()); +} + +TEST_F(AzuriteFileSystemTest, OpenOutputStreamUri) { + const auto path = internal::ConcatAbstractPath(PreexistingContainerName(), + "open-output-stream-uri.txt"); + ASSERT_RAISES(Invalid, fs_->OpenInputStream("abfs://" + path)); +} + TEST_F(AzuriteFileSystemTest, OpenInputFileMixedReadVsReadAt) { // Create a file large enough to make the random access tests non-trivial. auto constexpr kLineWidth = 100;