-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-38333: [C++][FS][Azure] Implement file writes #38780
Changes from 21 commits
89899b4
12c1978
bd787cc
cb9879b
bbaf138
2f27fbb
bfeb57d
d4c53b2
a1a8c15
0b0a2ee
c69f188
5a91113
1c4300b
a62b95e
28ec1ed
2800dea
9a98a0f
d86c374
77f8345
0178660
7b19feb
74a6d55
9a0d9ca
0e84a82
ad0ed8f
7412b90
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,6 +24,7 @@ | |
#include "arrow/buffer.h" | ||
#include "arrow/filesystem/path_util.h" | ||
#include "arrow/filesystem/util_internal.h" | ||
#include "arrow/io/util_internal.h" | ||
#include "arrow/result.h" | ||
#include "arrow/util/checked_cast.h" | ||
#include "arrow/util/formatting.h" | ||
|
@@ -43,7 +44,8 @@ AzureOptions::AzureOptions() {} | |
bool AzureOptions::Equals(const AzureOptions& other) const { | ||
return (account_dfs_url == other.account_dfs_url && | ||
account_blob_url == other.account_blob_url && | ||
credentials_kind == other.credentials_kind); | ||
credentials_kind == other.credentials_kind && | ||
default_metadata == other.default_metadata); | ||
} | ||
|
||
Status AzureOptions::ConfigureAccountKeyCredentials(const std::string& account_name, | ||
|
@@ -461,6 +463,221 @@ class ObjectInputFile final : public io::RandomAccessFile { | |
int64_t content_length_ = kNoSize; | ||
std::shared_ptr<const KeyValueMetadata> metadata_; | ||
}; | ||
|
||
Status CreateEmptyBlockBlob( | ||
std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient> block_blob_client) { | ||
try { | ||
block_blob_client->UploadFrom(nullptr, 0); | ||
} catch (const Azure::Storage::StorageException& exception) { | ||
return internal::ExceptionToStatus( | ||
"UploadFrom failed for '" + block_blob_client->GetUrl() + | ||
"' with an unexpected Azure error. There is no existing blob at this " | ||
"location or the existing blob must be replaced so ObjectAppendStream must " | ||
"create a new empty block blob.", | ||
exception); | ||
} | ||
return Status::OK(); | ||
} | ||
|
||
Result<Azure::Storage::Blobs::Models::GetBlockListResult> GetBlockList( | ||
std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient> block_blob_client) { | ||
try { | ||
return block_blob_client->GetBlockList().Value; | ||
} catch (Azure::Storage::StorageException& exception) { | ||
return internal::ExceptionToStatus( | ||
"GetBlockList failed for '" + block_blob_client->GetUrl() + | ||
"' with an unexpected Azure error. Cannot write to a file without first " | ||
"fetching the existing block list.", | ||
exception); | ||
} | ||
} | ||
|
||
Azure::Storage::Metadata ArrowMetadataToAzureMetadata( | ||
const std::shared_ptr<const KeyValueMetadata>& arrow_metadata) { | ||
Azure::Storage::Metadata azure_metadata; | ||
for (auto key_value : arrow_metadata->sorted_pairs()) { | ||
azure_metadata[key_value.first] = key_value.second; | ||
} | ||
return azure_metadata; | ||
} | ||
|
||
Status CommitBlockList( | ||
std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient> block_blob_client, | ||
const std::vector<std::string>& block_ids, const Azure::Storage::Metadata& metadata) { | ||
Azure::Storage::Blobs::CommitBlockListOptions options; | ||
options.Metadata = metadata; | ||
try { | ||
// CommitBlockList puts all block_ids in the latest element. That means in the case of | ||
// overlapping block_ids the newly staged block ids will always replace the | ||
// previously committed blocks. | ||
// https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-list?tabs=microsoft-entra-id#request-body | ||
block_blob_client->CommitBlockList(block_ids, options); | ||
} catch (const Azure::Storage::StorageException& exception) { | ||
return internal::ExceptionToStatus( | ||
"CommitBlockList failed for '" + block_blob_client->GetUrl() + | ||
"' with an unexpected Azure error. Committing is required to flush an " | ||
"output/append stream.", | ||
exception); | ||
} | ||
return Status::OK(); | ||
} | ||
|
||
class ObjectAppendStream final : public io::OutputStream { | ||
public: | ||
ObjectAppendStream( | ||
std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient> block_blob_client, | ||
const io::IOContext& io_context, const AzureLocation& location, | ||
const std::shared_ptr<const KeyValueMetadata>& metadata, | ||
const AzureOptions& options, int64_t size = kNoSize) | ||
: block_blob_client_(std::move(block_blob_client)), | ||
io_context_(io_context), | ||
location_(location), | ||
content_length_(size) { | ||
if (metadata && metadata->size() != 0) { | ||
metadata_ = ArrowMetadataToAzureMetadata(metadata); | ||
} else if (options.default_metadata && options.default_metadata->size() != 0) { | ||
metadata_ = ArrowMetadataToAzureMetadata(options.default_metadata); | ||
} | ||
} | ||
|
||
~ObjectAppendStream() override { | ||
// For compliance with the rest of the IO stack, Close rather than Abort, | ||
// even though it may be more expensive. | ||
io::internal::CloseFromDestructor(this); | ||
} | ||
|
||
Status Init() { | ||
if (content_length_ != kNoSize) { | ||
DCHECK_GE(content_length_, 0); | ||
pos_ = content_length_; | ||
} else { | ||
try { | ||
auto properties = block_blob_client_->GetProperties(); | ||
content_length_ = properties.Value.BlobSize; | ||
pos_ = content_length_; | ||
} catch (const Azure::Storage::StorageException& exception) { | ||
if (exception.StatusCode == Azure::Core::Http::HttpStatusCode::NotFound) { | ||
RETURN_NOT_OK(CreateEmptyBlockBlob(block_blob_client_)); | ||
} else { | ||
return internal::ExceptionToStatus( | ||
"GetProperties failed for '" + block_blob_client_->GetUrl() + | ||
"' with an unexpected Azure error. Can not initialise an " | ||
"ObjectAppendStream without knowing whether a file already exists at " | ||
"this path, and if it exists, its size.", | ||
exception); | ||
} | ||
content_length_ = 0; | ||
} | ||
} | ||
if (content_length_ > 0) { | ||
ARROW_ASSIGN_OR_RAISE(auto block_list, GetBlockList(block_blob_client_)); | ||
for (auto block : block_list.CommittedBlocks) { | ||
block_ids_.push_back(block.Name); | ||
} | ||
} | ||
return Status::OK(); | ||
} | ||
|
||
Status Abort() override { | ||
if (closed_) { | ||
return Status::OK(); | ||
} | ||
block_blob_client_ = nullptr; | ||
closed_ = true; | ||
return Status::OK(); | ||
} | ||
|
||
Status Close() override { | ||
if (closed_) { | ||
return Status::OK(); | ||
} | ||
RETURN_NOT_OK(Flush()); | ||
block_blob_client_ = nullptr; | ||
closed_ = true; | ||
return Status::OK(); | ||
} | ||
|
||
bool closed() const override { return closed_; } | ||
|
||
Status CheckClosed(const char* action) const { | ||
if (closed_) { | ||
return Status::Invalid("Cannot ", action, " on closed stream."); | ||
} | ||
return Status::OK(); | ||
} | ||
|
||
Result<int64_t> Tell() const override { | ||
RETURN_NOT_OK(CheckClosed("tell")); | ||
return pos_; | ||
} | ||
|
||
Status Write(const std::shared_ptr<Buffer>& buffer) override { | ||
return DoAppend(buffer->data(), buffer->size(), buffer); | ||
} | ||
|
||
Status Write(const void* data, int64_t nbytes) override { | ||
return DoAppend(data, nbytes); | ||
} | ||
|
||
Status Flush() override { | ||
RETURN_NOT_OK(CheckClosed("flush")); | ||
return CommitBlockList(block_blob_client_, block_ids_, metadata_); | ||
} | ||
|
||
private: | ||
Status DoAppend(const void* data, int64_t nbytes, | ||
Tom-Newton marked this conversation as resolved.
Show resolved
Hide resolved
|
||
std::shared_ptr<Buffer> owned_buffer = nullptr) { | ||
RETURN_NOT_OK(CheckClosed("append")); | ||
auto append_data = reinterpret_cast<const uint8_t*>(data); | ||
auto block_content = Azure::Core::IO::MemoryBodyStream(append_data, nbytes); | ||
kou marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if (block_content.Length() == 0) { | ||
return Status::OK(); | ||
} | ||
|
||
const auto n_block_ids = block_ids_.size(); | ||
|
||
// New block ID must always be distinct from the existing block IDs. Otherwise we | ||
// will accidentally replace the content of existing blocks, causing corruption. | ||
// We will use monotonically increasing integers. | ||
std::string new_block_id = std::to_string(n_block_ids); | ||
kou marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
// Pad to 5 digits, because Azure allows a maximum of 50,000 blocks. | ||
const size_t target_number_of_digits = 5; | ||
const auto required_padding_digits = | ||
target_number_of_digits - std::min(target_number_of_digits, new_block_id.size()); | ||
new_block_id.insert(0, required_padding_digits, '0'); | ||
new_block_id += "-arrow"; // Add a suffix to reduce risk of block_id collisions with | ||
// blocks created by other applications. | ||
new_block_id = Azure::Core::Convert::Base64Encode( | ||
std::vector<uint8_t>(new_block_id.begin(), new_block_id.end())); | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to check whether the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we want to be 100% confident of avoiding clashes then yes but personally I think the current solution is a good compromise. The risk should be zero when using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK. Could you describe the risk as a comment? If we find a real world problem with the risk, we can revisit the risk. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
try { | ||
block_blob_client_->StageBlock(new_block_id, block_content); | ||
} catch (const Azure::Storage::StorageException& exception) { | ||
return internal::ExceptionToStatus( | ||
"StageBlock failed for '" + block_blob_client_->GetUrl() + "' new_block_id: '" + | ||
new_block_id + | ||
"' with an unexpected Azure error. Staging new blocks is fundamental to " | ||
"streaming writes to blob storage.", | ||
exception); | ||
} | ||
block_ids_.push_back(new_block_id); | ||
pos_ += nbytes; | ||
content_length_ += nbytes; | ||
return Status::OK(); | ||
} | ||
|
||
std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient> block_blob_client_; | ||
const io::IOContext io_context_; | ||
const AzureLocation location_; | ||
|
||
bool closed_ = false; | ||
int64_t pos_ = 0; | ||
int64_t content_length_ = kNoSize; | ||
std::vector<std::string> block_ids_; | ||
Azure::Storage::Metadata metadata_; | ||
}; | ||
|
||
} // namespace | ||
|
||
// ----------------------------------------------------------------------- | ||
|
@@ -724,6 +941,30 @@ class AzureFileSystem::Impl { | |
|
||
return Status::OK(); | ||
} | ||
|
||
Result<std::shared_ptr<ObjectAppendStream>> OpenAppendStream( | ||
const AzureLocation& location, | ||
const std::shared_ptr<const KeyValueMetadata>& metadata, const bool truncate, | ||
AzureFileSystem* fs) { | ||
RETURN_NOT_OK(ValidateFileLocation(location)); | ||
ARROW_RETURN_NOT_OK(internal::AssertNoTrailingSlash(location.path)); | ||
|
||
auto block_blob_client = std::make_shared<Azure::Storage::Blobs::BlockBlobClient>( | ||
blob_service_client_->GetBlobContainerClient(location.container) | ||
.GetBlockBlobClient(location.path)); | ||
|
||
std::shared_ptr<ObjectAppendStream> stream; | ||
if (truncate) { | ||
RETURN_NOT_OK(CreateEmptyBlockBlob(block_blob_client)); | ||
stream = std::make_shared<ObjectAppendStream>(block_blob_client, fs->io_context(), | ||
location, metadata, options_, 0); | ||
} else { | ||
stream = std::make_shared<ObjectAppendStream>(block_blob_client, fs->io_context(), | ||
location, metadata, options_); | ||
} | ||
RETURN_NOT_OK(stream->Init()); | ||
return stream; | ||
} | ||
}; | ||
|
||
const AzureOptions& AzureFileSystem::options() const { return impl_->options(); } | ||
|
@@ -805,12 +1046,14 @@ Result<std::shared_ptr<io::RandomAccessFile>> AzureFileSystem::OpenInputFile( | |
|
||
Result<std::shared_ptr<io::OutputStream>> AzureFileSystem::OpenOutputStream( | ||
const std::string& path, const std::shared_ptr<const KeyValueMetadata>& metadata) { | ||
return Status::NotImplemented("The Azure FileSystem is not fully implemented"); | ||
ARROW_ASSIGN_OR_RAISE(auto location, AzureLocation::FromString(path)); | ||
return impl_->OpenAppendStream(location, metadata, true, this); | ||
} | ||
|
||
Result<std::shared_ptr<io::OutputStream>> AzureFileSystem::OpenAppendStream( | ||
const std::string&, const std::shared_ptr<const KeyValueMetadata>&) { | ||
return Status::NotImplemented("The Azure FileSystem is not fully implemented"); | ||
const std::string& path, const std::shared_ptr<const KeyValueMetadata>& metadata) { | ||
ARROW_ASSIGN_OR_RAISE(auto location, AzureLocation::FromString(path)); | ||
return impl_->OpenAppendStream(location, metadata, false, this); | ||
} | ||
|
||
Result<std::shared_ptr<AzureFileSystem>> AzureFileSystem::Make( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do these metadata replace the existing metadata? Should we merge with the existing metadata?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Closing/flushing an append stream will always completely replace the old metadata. This is covered by the
AzuriteFileSystemTest, TestWriteMetadata
test which I largely copied fromgcsfs_test.cc
.I don't feel strongly but I think this is a reasonable choice. I think if it did merge then there would be no way to remove metadata keys through the arrow file system. Also replacing is simpler to implement than merging.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see.