Skip to content

Commit

Permalink
GH-40036: [C++] Azure file system write buffering & async writes (#43096
Browse files Browse the repository at this point in the history
)

### Rationale for this change

See #40036.

### What changes are included in this PR?

Write buffering and async writes (similar to what the S3 file system does) in the `ObjectAppendStream` for the Azure file system.

With write buffering and async writes, the input scenario creation runtime in the tests (which uses the `ObjectAppendStream` against Azurite) decreased from ~25s (see [here](#40036)) to ~800ms:
```
[ RUN      ] TestAzuriteFileSystem.OpenInputFileMixedReadVsReadAt
[       OK ] TestAzuriteFileSystem.OpenInputFileMixedReadVsReadAt (787 ms)
```

### Are these changes tested?
Added some tests with background writes enabled and disabled (some were taken from the S3 tests). Everything changed should be covered.

### Are there any user-facing changes?
`AzureOptions` now allows for `background_writes` to be set (default: true). No breaking changes.

### Notes

- The code in `DoWrite` is very similar to [the code in the S3 FS](https://github.com/apache/arrow/blob/edfa343eeca008513f0300924380e1b187cc976b/cpp/src/arrow/filesystem/s3fs.cc#L1753). Maybe this could be unified? I didn't see this in the scope of the PR though.
* GitHub Issue: #40036

Lead-authored-by: Oliver Layer <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
  • Loading branch information
OliLay and pitrou authored Aug 21, 2024
1 parent 9fc0301 commit e1e7c50
Show file tree
Hide file tree
Showing 3 changed files with 471 additions and 72 deletions.
276 changes: 247 additions & 29 deletions cpp/src/arrow/filesystem/azurefs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include "arrow/filesystem/azurefs.h"
#include "arrow/filesystem/azurefs_internal.h"
#include "arrow/io/memory.h"

// idenfity.hpp triggers -Wattributes warnings cause -Werror builds to fail,
// so disable it for this file with pragmas.
Expand Down Expand Up @@ -144,6 +145,9 @@ Status AzureOptions::ExtractFromUriQuery(const Uri& uri) {
blob_storage_scheme = "http";
dfs_storage_scheme = "http";
}
} else if (kv.first == "background_writes") {
ARROW_ASSIGN_OR_RAISE(background_writes,
::arrow::internal::ParseBoolean(kv.second));
} else {
return Status::Invalid(
"Unexpected query parameter in Azure Blob File System URI: '", kv.first, "'");
Expand Down Expand Up @@ -937,8 +941,8 @@ Status CommitBlockList(std::shared_ptr<Storage::Blobs::BlockBlobClient> block_bl
const std::vector<std::string>& block_ids,
const Blobs::CommitBlockListOptions& options) {
try {
// CommitBlockList puts all block_ids in the latest element. That means in the case of
// overlapping block_ids the newly staged block ids will always replace the
// CommitBlockList puts all block_ids in the latest element. That means in the case
// of overlapping block_ids the newly staged block ids will always replace the
// previously committed blocks.
// https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-list?tabs=microsoft-entra-id#request-body
block_blob_client->CommitBlockList(block_ids, options);
Expand All @@ -950,15 +954,43 @@ Status CommitBlockList(std::shared_ptr<Storage::Blobs::BlockBlobClient> block_bl
return Status::OK();
}

Status StageBlock(Blobs::BlockBlobClient* block_blob_client, const std::string& id,
Core::IO::MemoryBodyStream& content) {
try {
block_blob_client->StageBlock(id, content);
} catch (const Storage::StorageException& exception) {
return ExceptionToStatus(
exception, "StageBlock failed for '", block_blob_client->GetUrl(),
"' new_block_id: '", id,
"'. Staging new blocks is fundamental to streaming writes to blob storage.");
}

return Status::OK();
}

/// Writes will be buffered up to this size (in bytes) before actually uploading them.
static constexpr int64_t kBlockUploadSizeBytes = 10 * 1024 * 1024;
/// The maximum size of a block in Azure Blob (as per docs).
static constexpr int64_t kMaxBlockSizeBytes = 4UL * 1024 * 1024 * 1024;

/// This output stream, similar to other arrow OutputStreams, is not thread-safe.
class ObjectAppendStream final : public io::OutputStream {
private:
struct UploadState;

std::shared_ptr<ObjectAppendStream> Self() {
return std::dynamic_pointer_cast<ObjectAppendStream>(shared_from_this());
}

public:
ObjectAppendStream(std::shared_ptr<Blobs::BlockBlobClient> block_blob_client,
const io::IOContext& io_context, const AzureLocation& location,
const std::shared_ptr<const KeyValueMetadata>& metadata,
const AzureOptions& options)
: block_blob_client_(std::move(block_blob_client)),
io_context_(io_context),
location_(location) {
location_(location),
background_writes_(options.background_writes) {
if (metadata && metadata->size() != 0) {
ArrowMetadataToCommitBlockListOptions(metadata, commit_block_list_options_);
} else if (options.default_metadata && options.default_metadata->size() != 0) {
Expand Down Expand Up @@ -1008,10 +1040,13 @@ class ObjectAppendStream final : public io::OutputStream {
content_length_ = 0;
}
}

upload_state_ = std::make_shared<UploadState>();

if (content_length_ > 0) {
ARROW_ASSIGN_OR_RAISE(auto block_list, GetBlockList(block_blob_client_));
for (auto block : block_list.CommittedBlocks) {
block_ids_.push_back(block.Name);
upload_state_->block_ids.push_back(block.Name);
}
}
initialised_ = true;
Expand All @@ -1031,12 +1066,34 @@ class ObjectAppendStream final : public io::OutputStream {
if (closed_) {
return Status::OK();
}

if (current_block_) {
// Upload remaining buffer
RETURN_NOT_OK(AppendCurrentBlock());
}

RETURN_NOT_OK(Flush());
block_blob_client_ = nullptr;
closed_ = true;
return Status::OK();
}

Future<> CloseAsync() override {
if (closed_) {
return Status::OK();
}

if (current_block_) {
// Upload remaining buffer
RETURN_NOT_OK(AppendCurrentBlock());
}

return FlushAsync().Then([self = Self()]() {
self->block_blob_client_ = nullptr;
self->closed_ = true;
});
}

bool closed() const override { return closed_; }

Status CheckClosed(const char* action) const {
Expand All @@ -1052,11 +1109,11 @@ class ObjectAppendStream final : public io::OutputStream {
}

Status Write(const std::shared_ptr<Buffer>& buffer) override {
return DoAppend(buffer->data(), buffer->size(), buffer);
return DoWrite(buffer->data(), buffer->size(), buffer);
}

Status Write(const void* data, int64_t nbytes) override {
return DoAppend(data, nbytes);
return DoWrite(data, nbytes);
}

Status Flush() override {
Expand All @@ -1066,20 +1123,111 @@ class ObjectAppendStream final : public io::OutputStream {
// flush. This also avoids some unhandled errors when flushing in the destructor.
return Status::OK();
}
return CommitBlockList(block_blob_client_, block_ids_, commit_block_list_options_);

Future<> pending_blocks_completed;
{
std::unique_lock<std::mutex> lock(upload_state_->mutex);
pending_blocks_completed = upload_state_->pending_blocks_completed;
}

RETURN_NOT_OK(pending_blocks_completed.status());
std::unique_lock<std::mutex> lock(upload_state_->mutex);
return CommitBlockList(block_blob_client_, upload_state_->block_ids,
commit_block_list_options_);
}

private:
Status DoAppend(const void* data, int64_t nbytes,
std::shared_ptr<Buffer> owned_buffer = nullptr) {
RETURN_NOT_OK(CheckClosed("append"));
auto append_data = reinterpret_cast<const uint8_t*>(data);
Core::IO::MemoryBodyStream block_content(append_data, nbytes);
if (block_content.Length() == 0) {
Future<> FlushAsync() {
RETURN_NOT_OK(CheckClosed("flush async"));
if (!initialised_) {
// If the stream has not been successfully initialized then there is nothing to
// flush. This also avoids some unhandled errors when flushing in the destructor.
return Status::OK();
}

const auto n_block_ids = block_ids_.size();
Future<> pending_blocks_completed;
{
std::unique_lock<std::mutex> lock(upload_state_->mutex);
pending_blocks_completed = upload_state_->pending_blocks_completed;
}

return pending_blocks_completed.Then([self = Self()] {
std::unique_lock<std::mutex> lock(self->upload_state_->mutex);
return CommitBlockList(self->block_blob_client_, self->upload_state_->block_ids,
self->commit_block_list_options_);
});
}

private:
Status AppendCurrentBlock() {
ARROW_ASSIGN_OR_RAISE(auto buf, current_block_->Finish());
current_block_.reset();
current_block_size_ = 0;
return AppendBlock(buf);
}

Status DoWrite(const void* data, int64_t nbytes,
std::shared_ptr<Buffer> owned_buffer = nullptr) {
if (closed_) {
return Status::Invalid("Operation on closed stream");
}

const auto* data_ptr = reinterpret_cast<const int8_t*>(data);
auto advance_ptr = [this, &data_ptr, &nbytes](const int64_t offset) {
data_ptr += offset;
nbytes -= offset;
pos_ += offset;
content_length_ += offset;
};

// Handle case where we have some bytes buffered from prior calls.
if (current_block_size_ > 0) {
// Try to fill current buffer
const int64_t to_copy =
std::min(nbytes, kBlockUploadSizeBytes - current_block_size_);
RETURN_NOT_OK(current_block_->Write(data_ptr, to_copy));
current_block_size_ += to_copy;
advance_ptr(to_copy);

// If buffer isn't full, break
if (current_block_size_ < kBlockUploadSizeBytes) {
return Status::OK();
}

// Upload current buffer
RETURN_NOT_OK(AppendCurrentBlock());
}

// We can upload chunks without copying them into a buffer
while (nbytes >= kBlockUploadSizeBytes) {
const auto upload_size = std::min(nbytes, kMaxBlockSizeBytes);
RETURN_NOT_OK(AppendBlock(data_ptr, upload_size));
advance_ptr(upload_size);
}

// Buffer remaining bytes
if (nbytes > 0) {
current_block_size_ = nbytes;

if (current_block_ == nullptr) {
ARROW_ASSIGN_OR_RAISE(
current_block_,
io::BufferOutputStream::Create(kBlockUploadSizeBytes, io_context_.pool()));
} else {
// Re-use the allocation from before.
RETURN_NOT_OK(current_block_->Reset(kBlockUploadSizeBytes, io_context_.pool()));
}

RETURN_NOT_OK(current_block_->Write(data_ptr, current_block_size_));
pos_ += current_block_size_;
content_length_ += current_block_size_;
}

return Status::OK();
}

std::string CreateBlock() {
std::unique_lock<std::mutex> lock(upload_state_->mutex);
const auto n_block_ids = upload_state_->block_ids.size();

// New block ID must always be distinct from the existing block IDs. Otherwise we
// will accidentally replace the content of existing blocks, causing corruption.
Expand All @@ -1093,36 +1241,106 @@ class ObjectAppendStream final : public io::OutputStream {
new_block_id.insert(0, required_padding_digits, '0');
// There is a small risk when appending to a blob created by another client that
// `new_block_id` may overlapping with an existing block id. Adding the `-arrow`
// suffix significantly reduces the risk, but does not 100% eliminate it. For example
// if the blob was previously created with one block, with id `00001-arrow` then the
// next block we append will conflict with that, and cause corruption.
// suffix significantly reduces the risk, but does not 100% eliminate it. For
// example if the blob was previously created with one block, with id `00001-arrow`
// then the next block we append will conflict with that, and cause corruption.
new_block_id += "-arrow";
new_block_id = Core::Convert::Base64Encode(
std::vector<uint8_t>(new_block_id.begin(), new_block_id.end()));

try {
block_blob_client_->StageBlock(new_block_id, block_content);
} catch (const Storage::StorageException& exception) {
return ExceptionToStatus(
exception, "StageBlock failed for '", block_blob_client_->GetUrl(),
"' new_block_id: '", new_block_id,
"'. Staging new blocks is fundamental to streaming writes to blob storage.");
upload_state_->block_ids.push_back(new_block_id);

// We only use the future if we have background writes enabled. Without background
// writes the future is initialized as finished and not mutated any more.
if (background_writes_ && upload_state_->blocks_in_progress++ == 0) {
upload_state_->pending_blocks_completed = Future<>::Make();
}
block_ids_.push_back(new_block_id);
pos_ += nbytes;
content_length_ += nbytes;

return new_block_id;
}

Status AppendBlock(const void* data, int64_t nbytes,
std::shared_ptr<Buffer> owned_buffer = nullptr) {
RETURN_NOT_OK(CheckClosed("append"));

if (nbytes == 0) {
return Status::OK();
}

const auto block_id = CreateBlock();

if (background_writes_) {
if (owned_buffer == nullptr) {
ARROW_ASSIGN_OR_RAISE(owned_buffer, AllocateBuffer(nbytes, io_context_.pool()));
memcpy(owned_buffer->mutable_data(), data, nbytes);
} else {
DCHECK_EQ(data, owned_buffer->data());
DCHECK_EQ(nbytes, owned_buffer->size());
}

// The closure keeps the buffer and the upload state alive
auto deferred = [owned_buffer, block_id, block_blob_client = block_blob_client_,
state = upload_state_]() mutable -> Status {
Core::IO::MemoryBodyStream block_content(owned_buffer->data(),
owned_buffer->size());

auto status = StageBlock(block_blob_client.get(), block_id, block_content);
HandleUploadOutcome(state, status);
return Status::OK();
};
RETURN_NOT_OK(io::internal::SubmitIO(io_context_, std::move(deferred)));
} else {
auto append_data = reinterpret_cast<const uint8_t*>(data);
Core::IO::MemoryBodyStream block_content(append_data, nbytes);

RETURN_NOT_OK(StageBlock(block_blob_client_.get(), block_id, block_content));
}

return Status::OK();
}

Status AppendBlock(std::shared_ptr<Buffer> buffer) {
return AppendBlock(buffer->data(), buffer->size(), buffer);
}

static void HandleUploadOutcome(const std::shared_ptr<UploadState>& state,
const Status& status) {
std::unique_lock<std::mutex> lock(state->mutex);
if (!status.ok()) {
state->status &= status;
}
// Notify completion
if (--state->blocks_in_progress == 0) {
auto fut = state->pending_blocks_completed;
lock.unlock();
fut.MarkFinished(state->status);
}
}

std::shared_ptr<Blobs::BlockBlobClient> block_blob_client_;
const io::IOContext io_context_;
const AzureLocation location_;
const bool background_writes_;
int64_t content_length_ = kNoSize;

std::shared_ptr<io::BufferOutputStream> current_block_;
int64_t current_block_size_ = 0;

bool closed_ = false;
bool initialised_ = false;
int64_t pos_ = 0;
std::vector<std::string> block_ids_;

// This struct is kept alive through background writes to avoid problems
// in the completion handler.
struct UploadState {
std::mutex mutex;
std::vector<std::string> block_ids;
int64_t blocks_in_progress = 0;
Status status;
Future<> pending_blocks_completed = Future<>::MakeFinished(Status::OK());
};
std::shared_ptr<UploadState> upload_state_;

Blobs::CommitBlockListOptions commit_block_list_options_;
};

Expand Down
3 changes: 3 additions & 0 deletions cpp/src/arrow/filesystem/azurefs.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ struct ARROW_EXPORT AzureOptions {
/// This will be ignored if non-empty metadata is passed to OpenOutputStream.
std::shared_ptr<const KeyValueMetadata> default_metadata;

/// Whether OutputStream writes will be issued in the background, without blocking.
bool background_writes = true;

private:
enum class CredentialKind {
kDefault,
Expand Down
Loading

0 comments on commit e1e7c50

Please sign in to comment.