Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-38333: [C++][FS][Azure] Implement file writes #38780

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
89899b4
Paste in object append stream from #12914
Tom-Newton Nov 8, 2023
12c1978
Delete hierarchical namespace code paths
Tom-Newton Nov 9, 2023
bd787cc
Paste in tests from gcsfs_test.cc
Tom-Newton Nov 15, 2023
cb9879b
Mostly correct tests
Tom-Newton Nov 15, 2023
bbaf138
Paste in OpenOutputStream and OpenAppendStream from #12914
Tom-Newton Nov 15, 2023
2f27fbb
Tests pass
Tom-Newton Nov 16, 2023
bfeb57d
Better error handling and tidy
Tom-Newton Nov 16, 2023
d4c53b2
Implement append and output
Tom-Newton Nov 17, 2023
a1a8c15
Fix rebase
Tom-Newton Nov 18, 2023
0b0a2ee
Add tests to distinguish OpenAppendStream and OpenOutputStream
Tom-Newton Nov 18, 2023
c69f188
More precise error handling on calls to Azure blob storage
Tom-Newton Nov 18, 2023
5a91113
Avoid unnecessary extra block list fetching and committing
Tom-Newton Nov 18, 2023
1c4300b
Adjust block_ids and add some comments
Tom-Newton Nov 19, 2023
a62b95e
Add test for writing blob metadata
Tom-Newton Nov 19, 2023
28ec1ed
Implement metadata writes
Tom-Newton Nov 19, 2023
2800dea
Add simple sanity checks on the location
Tom-Newton Nov 19, 2023
9a98a0f
Tidy
Tom-Newton Nov 19, 2023
d86c374
PR comments 1
Tom-Newton Nov 19, 2023
77f8345
PR comments2: move `DoAppend` to private
Tom-Newton Nov 19, 2023
0178660
Autoformat
Tom-Newton Nov 19, 2023
7b19feb
Handle TODO(GH-38780) comments for using fs to write data in tests
Tom-Newton Nov 19, 2023
74a6d55
PR comments: tests tidy up
Tom-Newton Nov 20, 2023
9a0d9ca
Add a comment about the risk of overlapping block ids
Tom-Newton Nov 20, 2023
0e84a82
Lint
Tom-Newton Nov 20, 2023
ad0ed8f
Lint
Tom-Newton Nov 20, 2023
7412b90
Simplify
kou Nov 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
255 changes: 251 additions & 4 deletions cpp/src/arrow/filesystem/azurefs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "arrow/buffer.h"
#include "arrow/filesystem/path_util.h"
#include "arrow/filesystem/util_internal.h"
#include "arrow/io/util_internal.h"
#include "arrow/result.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/formatting.h"
Expand All @@ -43,7 +44,8 @@ AzureOptions::AzureOptions() {}
bool AzureOptions::Equals(const AzureOptions& other) const {
return (account_dfs_url == other.account_dfs_url &&
account_blob_url == other.account_blob_url &&
credentials_kind == other.credentials_kind);
credentials_kind == other.credentials_kind &&
default_metadata == other.default_metadata);
}

Status AzureOptions::ConfigureAccountKeyCredentials(const std::string& account_name,
Expand Down Expand Up @@ -461,6 +463,225 @@ class ObjectInputFile final : public io::RandomAccessFile {
int64_t content_length_ = kNoSize;
std::shared_ptr<const KeyValueMetadata> metadata_;
};

Status CreateEmptyBlockBlob(
std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient> block_blob_client) {
try {
block_blob_client->UploadFrom(nullptr, 0);
} catch (const Azure::Storage::StorageException& exception) {
return internal::ExceptionToStatus(
"UploadFrom failed for '" + block_blob_client->GetUrl() +
"' with an unexpected Azure error. There is no existing blob at this "
"location or the existing blob must be replaced so ObjectAppendStream must "
"create a new empty block blob.",
exception);
}
return Status::OK();
}

Result<Azure::Storage::Blobs::Models::GetBlockListResult> GetBlockList(
std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient> block_blob_client) {
try {
return block_blob_client->GetBlockList().Value;
} catch (Azure::Storage::StorageException& exception) {
return internal::ExceptionToStatus(
"GetBlockList failed for '" + block_blob_client->GetUrl() +
"' with an unexpected Azure error. Cannot write to a file without first "
"fetching the existing block list.",
exception);
}
}

Azure::Storage::Metadata ArrowMetadataToAzureMetadata(
const std::shared_ptr<const KeyValueMetadata>& arrow_metadata) {
Azure::Storage::Metadata azure_metadata;
for (auto key_value : arrow_metadata->sorted_pairs()) {
azure_metadata[key_value.first] = key_value.second;
}
return azure_metadata;
}

Status CommitBlockList(
std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient> block_blob_client,
const std::vector<std::string>& block_ids, const Azure::Storage::Metadata& metadata) {
Azure::Storage::Blobs::CommitBlockListOptions options;
options.Metadata = metadata;
try {
// CommitBlockList puts all block_ids in the latest element. That means in the case of
// overlapping block_ids the newly staged block ids will always replace the
// previously committed blocks.
// https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-list?tabs=microsoft-entra-id#request-body
block_blob_client->CommitBlockList(block_ids, options);
} catch (const Azure::Storage::StorageException& exception) {
return internal::ExceptionToStatus(
"CommitBlockList failed for '" + block_blob_client->GetUrl() +
"' with an unexpected Azure error. Committing is required to flush an "
"output/append stream.",
exception);
}
return Status::OK();
}

class ObjectAppendStream final : public io::OutputStream {
public:
ObjectAppendStream(
std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient> block_blob_client,
const io::IOContext& io_context, const AzureLocation& location,
const std::shared_ptr<const KeyValueMetadata>& metadata,
const AzureOptions& options, int64_t size = kNoSize)
: block_blob_client_(std::move(block_blob_client)),
io_context_(io_context),
location_(location),
content_length_(size) {
if (metadata && metadata->size() != 0) {
metadata_ = ArrowMetadataToAzureMetadata(metadata);
} else if (options.default_metadata && options.default_metadata->size() != 0) {
metadata_ = ArrowMetadataToAzureMetadata(options.default_metadata);
}
Comment on lines +536 to +540
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do these metadata replace the existing metadata? Should we merge with the existing metadata?

Copy link
Contributor Author

@Tom-Newton Tom-Newton Nov 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Closing/flushing an append stream will always completely replace the old metadata. This is covered by the AzuriteFileSystemTest, TestWriteMetadata test which I largely copied from gcsfs_test.cc.

I don't feel strongly but I think this is a reasonable choice. I think if it did merge then there would be no way to remove metadata keys through the arrow file system. Also replacing is simpler to implement than merging.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see.

}

~ObjectAppendStream() override {
// For compliance with the rest of the IO stack, Close rather than Abort,
// even though it may be more expensive.
io::internal::CloseFromDestructor(this);
}

Status Init() {
if (content_length_ != kNoSize) {
DCHECK_GE(content_length_, 0);
pos_ = content_length_;
} else {
try {
auto properties = block_blob_client_->GetProperties();
content_length_ = properties.Value.BlobSize;
pos_ = content_length_;
} catch (const Azure::Storage::StorageException& exception) {
if (exception.StatusCode == Azure::Core::Http::HttpStatusCode::NotFound) {
RETURN_NOT_OK(CreateEmptyBlockBlob(block_blob_client_));
} else {
return internal::ExceptionToStatus(
"GetProperties failed for '" + block_blob_client_->GetUrl() +
"' with an unexpected Azure error. Can not initialise an "
"ObjectAppendStream without knowing whether a file already exists at "
"this path, and if it exists, its size.",
exception);
}
content_length_ = 0;
}
}
if (content_length_ > 0) {
ARROW_ASSIGN_OR_RAISE(auto block_list, GetBlockList(block_blob_client_));
for (auto block : block_list.CommittedBlocks) {
block_ids_.push_back(block.Name);
}
}
return Status::OK();
}

Status Abort() override {
if (closed_) {
return Status::OK();
}
block_blob_client_ = nullptr;
closed_ = true;
return Status::OK();
}

Status Close() override {
if (closed_) {
return Status::OK();
}
RETURN_NOT_OK(Flush());
block_blob_client_ = nullptr;
closed_ = true;
return Status::OK();
}

bool closed() const override { return closed_; }

Status CheckClosed(const char* action) const {
if (closed_) {
return Status::Invalid("Cannot ", action, " on closed stream.");
}
return Status::OK();
}

Result<int64_t> Tell() const override {
RETURN_NOT_OK(CheckClosed("tell"));
return pos_;
}

Status Write(const std::shared_ptr<Buffer>& buffer) override {
return DoAppend(buffer->data(), buffer->size(), buffer);
}

Status Write(const void* data, int64_t nbytes) override {
return DoAppend(data, nbytes);
}

Status Flush() override {
RETURN_NOT_OK(CheckClosed("flush"));
return CommitBlockList(block_blob_client_, block_ids_, metadata_);
}

private:
Status DoAppend(const void* data, int64_t nbytes,
Tom-Newton marked this conversation as resolved.
Show resolved Hide resolved
std::shared_ptr<Buffer> owned_buffer = nullptr) {
RETURN_NOT_OK(CheckClosed("append"));
auto append_data = reinterpret_cast<const uint8_t*>(data);
Azure::Core::IO::MemoryBodyStream block_content(append_data, nbytes);
if (block_content.Length() == 0) {
return Status::OK();
}

const auto n_block_ids = block_ids_.size();

// New block ID must always be distinct from the existing block IDs. Otherwise we
// will accidentally replace the content of existing blocks, causing corruption.
// We will use monotonically increasing integers.
auto new_block_id = std::to_string(n_block_ids);

// Pad to 5 digits, because Azure allows a maximum of 50,000 blocks.
const size_t target_number_of_digits = 5;
const auto required_padding_digits =
target_number_of_digits - std::min(target_number_of_digits, new_block_id.size());
new_block_id.insert(0, required_padding_digits, '0');
// There is a small risk when appending to a blob created by another client that
// `new_block_id` may overlapping with an existing block id. Adding the `-arrow`
// suffix significantly reduces the risk, but does not 100% eliminate it. For example
// if the blob was previously created with one block, with id `00001-arrow` then the
// next block we append will conflict with that, and cause corruption.
new_block_id += "-arrow";
new_block_id = Azure::Core::Convert::Base64Encode(
std::vector<uint8_t>(new_block_id.begin(), new_block_id.end()));

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to check whether the new_block_id exists in block_ids_ or not?

Copy link
Contributor Author

@Tom-Newton Tom-Newton Nov 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to be 100% confident of avoiding clashes then yes but personally I think the current solution is a good compromise.

The risk should be zero when using OpenOutputStream because every block ID will be created by this same scheme, using monotonically increasing integers. The risk when using OpenAppendStream is that previously committed blocks used unusual names that might conflict. For example if some other writer committed one block named 00002-arrow then that would conflict after this writer appends 2 additional blocks, and cause a corrupt blob. I think this is extremely unlikely so personally I think this is a good option. Additionally OpenAppendStream is not implemented at all for S3 and GCS so presumably its not used much.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Could you describe the risk as a comment? If we find a real world problem with the risk, we can revisit the risk.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

try {
block_blob_client_->StageBlock(new_block_id, block_content);
} catch (const Azure::Storage::StorageException& exception) {
return internal::ExceptionToStatus(
"StageBlock failed for '" + block_blob_client_->GetUrl() + "' new_block_id: '" +
new_block_id +
"' with an unexpected Azure error. Staging new blocks is fundamental to "
"streaming writes to blob storage.",
exception);
}
block_ids_.push_back(new_block_id);
pos_ += nbytes;
content_length_ += nbytes;
return Status::OK();
}

std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient> block_blob_client_;
const io::IOContext io_context_;
const AzureLocation location_;

bool closed_ = false;
int64_t pos_ = 0;
int64_t content_length_ = kNoSize;
std::vector<std::string> block_ids_;
Azure::Storage::Metadata metadata_;
};

} // namespace

// -----------------------------------------------------------------------
Expand Down Expand Up @@ -724,6 +945,30 @@ class AzureFileSystem::Impl {

return Status::OK();
}

Result<std::shared_ptr<ObjectAppendStream>> OpenAppendStream(
const AzureLocation& location,
const std::shared_ptr<const KeyValueMetadata>& metadata, const bool truncate,
AzureFileSystem* fs) {
RETURN_NOT_OK(ValidateFileLocation(location));
ARROW_RETURN_NOT_OK(internal::AssertNoTrailingSlash(location.path));

auto block_blob_client = std::make_shared<Azure::Storage::Blobs::BlockBlobClient>(
blob_service_client_->GetBlobContainerClient(location.container)
.GetBlockBlobClient(location.path));

std::shared_ptr<ObjectAppendStream> stream;
if (truncate) {
RETURN_NOT_OK(CreateEmptyBlockBlob(block_blob_client));
stream = std::make_shared<ObjectAppendStream>(block_blob_client, fs->io_context(),
location, metadata, options_, 0);
} else {
stream = std::make_shared<ObjectAppendStream>(block_blob_client, fs->io_context(),
location, metadata, options_);
}
RETURN_NOT_OK(stream->Init());
return stream;
}
};

const AzureOptions& AzureFileSystem::options() const { return impl_->options(); }
Expand Down Expand Up @@ -805,12 +1050,14 @@ Result<std::shared_ptr<io::RandomAccessFile>> AzureFileSystem::OpenInputFile(

Result<std::shared_ptr<io::OutputStream>> AzureFileSystem::OpenOutputStream(
const std::string& path, const std::shared_ptr<const KeyValueMetadata>& metadata) {
return Status::NotImplemented("The Azure FileSystem is not fully implemented");
ARROW_ASSIGN_OR_RAISE(auto location, AzureLocation::FromString(path));
return impl_->OpenAppendStream(location, metadata, true, this);
}

Result<std::shared_ptr<io::OutputStream>> AzureFileSystem::OpenAppendStream(
const std::string&, const std::shared_ptr<const KeyValueMetadata>&) {
return Status::NotImplemented("The Azure FileSystem is not fully implemented");
const std::string& path, const std::shared_ptr<const KeyValueMetadata>& metadata) {
ARROW_ASSIGN_OR_RAISE(auto location, AzureLocation::FromString(path));
return impl_->OpenAppendStream(location, metadata, false, this);
}

Result<std::shared_ptr<AzureFileSystem>> AzureFileSystem::Make(
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/arrow/filesystem/azurefs.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ struct ARROW_EXPORT AzureOptions {
std::shared_ptr<Azure::Core::Credentials::TokenCredential>
service_principle_credentials_provider;

/// \brief Default metadata for OpenOutputStream.
///
/// This will be ignored if non-empty metadata is passed to OpenOutputStream.
std::shared_ptr<const KeyValueMetadata> default_metadata;

AzureOptions();

Status ConfigureAccountKeyCredentials(const std::string& account_name,
Expand Down
Loading
Loading