Skip to content

Commit

Permalink
apacheGH-38333: [C++][FS][Azure] Implement file writes (apache#38780)
Browse files Browse the repository at this point in the history
### Rationale for this change
Writing files is an important part of the filesystem

### What changes are included in this PR?
Implements `OpenOutputStream` and `OpenAppendStream` for Azure.

- Initially I started with the implementation from apache#12914 but I made quite a few changes:
  - Removed the different code path for hierarchical namespace accounts. There should not be any performance advantage to using special APIs only available on hierachical namespace accounts. 
  - Only implement `ObjectAppendStream`, not `ObjectOutputStream`. `OpenOutputStream` is implemented by truncating the existing file then returning a `ObjectAppendStream`.
  - More precise use of `try` `catch`. Every call to Azure is wrapped in a `try` `catch` and should return a descriptive error status. 
  - Avoid unnecessary calls to Azure. For example we now maintain the block list in memory and commit it only once on flush. apache#12914 committed the block list after each block that was staged and on flush queried Azure to get the list of uncommitted blocks. The new approach is consistent with the Azure fsspec implementation https://github.com/fsspec/adlfs/blob/092685f102c5cd215550d10e8347e5bce0e2b93d/adlfs/spec.py#L2009
  - Adjust the block_ids slightly to minimise the risk of them conflicting with blocks written by other blob storage clients. 
  - Implement metadata writes. Includes adding default metadata to `AzureOptions`.
- Tests are based on the `gscfs_test.cc` but I added a couple of extra. 
- Handle the TODO(apacheGH-38780) comments for using the Azure fs to write data in tests

### Are these changes tested?
Yes. Everything should be covered by azurite tests

### Are there any user-facing changes?
Yes. The Azure filesystem now supports file writes. 

* Closes: apache#38333

Lead-authored-by: Thomas Newton <[email protected]>
Co-authored-by: Sutou Kouhei <[email protected]>
Signed-off-by: Sutou Kouhei <[email protected]>
  • Loading branch information
Tom-Newton and kou authored Nov 21, 2023
1 parent 5a0e8b6 commit c1b12ca
Show file tree
Hide file tree
Showing 3 changed files with 429 additions and 23 deletions.
255 changes: 251 additions & 4 deletions cpp/src/arrow/filesystem/azurefs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "arrow/buffer.h"
#include "arrow/filesystem/path_util.h"
#include "arrow/filesystem/util_internal.h"
#include "arrow/io/util_internal.h"
#include "arrow/result.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/formatting.h"
Expand All @@ -43,7 +44,8 @@ AzureOptions::AzureOptions() {}
bool AzureOptions::Equals(const AzureOptions& other) const {
return (account_dfs_url == other.account_dfs_url &&
account_blob_url == other.account_blob_url &&
credentials_kind == other.credentials_kind);
credentials_kind == other.credentials_kind &&
default_metadata == other.default_metadata);
}

Status AzureOptions::ConfigureAccountKeyCredentials(const std::string& account_name,
Expand Down Expand Up @@ -461,6 +463,225 @@ class ObjectInputFile final : public io::RandomAccessFile {
int64_t content_length_ = kNoSize;
std::shared_ptr<const KeyValueMetadata> metadata_;
};

Status CreateEmptyBlockBlob(
std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient> block_blob_client) {
try {
block_blob_client->UploadFrom(nullptr, 0);
} catch (const Azure::Storage::StorageException& exception) {
return internal::ExceptionToStatus(
"UploadFrom failed for '" + block_blob_client->GetUrl() +
"' with an unexpected Azure error. There is no existing blob at this "
"location or the existing blob must be replaced so ObjectAppendStream must "
"create a new empty block blob.",
exception);
}
return Status::OK();
}

Result<Azure::Storage::Blobs::Models::GetBlockListResult> GetBlockList(
std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient> block_blob_client) {
try {
return block_blob_client->GetBlockList().Value;
} catch (Azure::Storage::StorageException& exception) {
return internal::ExceptionToStatus(
"GetBlockList failed for '" + block_blob_client->GetUrl() +
"' with an unexpected Azure error. Cannot write to a file without first "
"fetching the existing block list.",
exception);
}
}

Azure::Storage::Metadata ArrowMetadataToAzureMetadata(
const std::shared_ptr<const KeyValueMetadata>& arrow_metadata) {
Azure::Storage::Metadata azure_metadata;
for (auto key_value : arrow_metadata->sorted_pairs()) {
azure_metadata[key_value.first] = key_value.second;
}
return azure_metadata;
}

Status CommitBlockList(
std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient> block_blob_client,
const std::vector<std::string>& block_ids, const Azure::Storage::Metadata& metadata) {
Azure::Storage::Blobs::CommitBlockListOptions options;
options.Metadata = metadata;
try {
// CommitBlockList puts all block_ids in the latest element. That means in the case of
// overlapping block_ids the newly staged block ids will always replace the
// previously committed blocks.
// https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-list?tabs=microsoft-entra-id#request-body
block_blob_client->CommitBlockList(block_ids, options);
} catch (const Azure::Storage::StorageException& exception) {
return internal::ExceptionToStatus(
"CommitBlockList failed for '" + block_blob_client->GetUrl() +
"' with an unexpected Azure error. Committing is required to flush an "
"output/append stream.",
exception);
}
return Status::OK();
}

class ObjectAppendStream final : public io::OutputStream {
public:
ObjectAppendStream(
std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient> block_blob_client,
const io::IOContext& io_context, const AzureLocation& location,
const std::shared_ptr<const KeyValueMetadata>& metadata,
const AzureOptions& options, int64_t size = kNoSize)
: block_blob_client_(std::move(block_blob_client)),
io_context_(io_context),
location_(location),
content_length_(size) {
if (metadata && metadata->size() != 0) {
metadata_ = ArrowMetadataToAzureMetadata(metadata);
} else if (options.default_metadata && options.default_metadata->size() != 0) {
metadata_ = ArrowMetadataToAzureMetadata(options.default_metadata);
}
}

~ObjectAppendStream() override {
// For compliance with the rest of the IO stack, Close rather than Abort,
// even though it may be more expensive.
io::internal::CloseFromDestructor(this);
}

Status Init() {
if (content_length_ != kNoSize) {
DCHECK_GE(content_length_, 0);
pos_ = content_length_;
} else {
try {
auto properties = block_blob_client_->GetProperties();
content_length_ = properties.Value.BlobSize;
pos_ = content_length_;
} catch (const Azure::Storage::StorageException& exception) {
if (exception.StatusCode == Azure::Core::Http::HttpStatusCode::NotFound) {
RETURN_NOT_OK(CreateEmptyBlockBlob(block_blob_client_));
} else {
return internal::ExceptionToStatus(
"GetProperties failed for '" + block_blob_client_->GetUrl() +
"' with an unexpected Azure error. Can not initialise an "
"ObjectAppendStream without knowing whether a file already exists at "
"this path, and if it exists, its size.",
exception);
}
content_length_ = 0;
}
}
if (content_length_ > 0) {
ARROW_ASSIGN_OR_RAISE(auto block_list, GetBlockList(block_blob_client_));
for (auto block : block_list.CommittedBlocks) {
block_ids_.push_back(block.Name);
}
}
return Status::OK();
}

Status Abort() override {
if (closed_) {
return Status::OK();
}
block_blob_client_ = nullptr;
closed_ = true;
return Status::OK();
}

Status Close() override {
if (closed_) {
return Status::OK();
}
RETURN_NOT_OK(Flush());
block_blob_client_ = nullptr;
closed_ = true;
return Status::OK();
}

bool closed() const override { return closed_; }

Status CheckClosed(const char* action) const {
if (closed_) {
return Status::Invalid("Cannot ", action, " on closed stream.");
}
return Status::OK();
}

Result<int64_t> Tell() const override {
RETURN_NOT_OK(CheckClosed("tell"));
return pos_;
}

Status Write(const std::shared_ptr<Buffer>& buffer) override {
return DoAppend(buffer->data(), buffer->size(), buffer);
}

Status Write(const void* data, int64_t nbytes) override {
return DoAppend(data, nbytes);
}

Status Flush() override {
RETURN_NOT_OK(CheckClosed("flush"));
return CommitBlockList(block_blob_client_, block_ids_, metadata_);
}

private:
Status DoAppend(const void* data, int64_t nbytes,
std::shared_ptr<Buffer> owned_buffer = nullptr) {
RETURN_NOT_OK(CheckClosed("append"));
auto append_data = reinterpret_cast<const uint8_t*>(data);
Azure::Core::IO::MemoryBodyStream block_content(append_data, nbytes);
if (block_content.Length() == 0) {
return Status::OK();
}

const auto n_block_ids = block_ids_.size();

// New block ID must always be distinct from the existing block IDs. Otherwise we
// will accidentally replace the content of existing blocks, causing corruption.
// We will use monotonically increasing integers.
auto new_block_id = std::to_string(n_block_ids);

// Pad to 5 digits, because Azure allows a maximum of 50,000 blocks.
const size_t target_number_of_digits = 5;
const auto required_padding_digits =
target_number_of_digits - std::min(target_number_of_digits, new_block_id.size());
new_block_id.insert(0, required_padding_digits, '0');
// There is a small risk when appending to a blob created by another client that
// `new_block_id` may overlapping with an existing block id. Adding the `-arrow`
// suffix significantly reduces the risk, but does not 100% eliminate it. For example
// if the blob was previously created with one block, with id `00001-arrow` then the
// next block we append will conflict with that, and cause corruption.
new_block_id += "-arrow";
new_block_id = Azure::Core::Convert::Base64Encode(
std::vector<uint8_t>(new_block_id.begin(), new_block_id.end()));

try {
block_blob_client_->StageBlock(new_block_id, block_content);
} catch (const Azure::Storage::StorageException& exception) {
return internal::ExceptionToStatus(
"StageBlock failed for '" + block_blob_client_->GetUrl() + "' new_block_id: '" +
new_block_id +
"' with an unexpected Azure error. Staging new blocks is fundamental to "
"streaming writes to blob storage.",
exception);
}
block_ids_.push_back(new_block_id);
pos_ += nbytes;
content_length_ += nbytes;
return Status::OK();
}

std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient> block_blob_client_;
const io::IOContext io_context_;
const AzureLocation location_;

bool closed_ = false;
int64_t pos_ = 0;
int64_t content_length_ = kNoSize;
std::vector<std::string> block_ids_;
Azure::Storage::Metadata metadata_;
};

} // namespace

// -----------------------------------------------------------------------
Expand Down Expand Up @@ -724,6 +945,30 @@ class AzureFileSystem::Impl {

return Status::OK();
}

Result<std::shared_ptr<ObjectAppendStream>> OpenAppendStream(
const AzureLocation& location,
const std::shared_ptr<const KeyValueMetadata>& metadata, const bool truncate,
AzureFileSystem* fs) {
RETURN_NOT_OK(ValidateFileLocation(location));
ARROW_RETURN_NOT_OK(internal::AssertNoTrailingSlash(location.path));

auto block_blob_client = std::make_shared<Azure::Storage::Blobs::BlockBlobClient>(
blob_service_client_->GetBlobContainerClient(location.container)
.GetBlockBlobClient(location.path));

std::shared_ptr<ObjectAppendStream> stream;
if (truncate) {
RETURN_NOT_OK(CreateEmptyBlockBlob(block_blob_client));
stream = std::make_shared<ObjectAppendStream>(block_blob_client, fs->io_context(),
location, metadata, options_, 0);
} else {
stream = std::make_shared<ObjectAppendStream>(block_blob_client, fs->io_context(),
location, metadata, options_);
}
RETURN_NOT_OK(stream->Init());
return stream;
}
};

const AzureOptions& AzureFileSystem::options() const { return impl_->options(); }
Expand Down Expand Up @@ -805,12 +1050,14 @@ Result<std::shared_ptr<io::RandomAccessFile>> AzureFileSystem::OpenInputFile(

Result<std::shared_ptr<io::OutputStream>> AzureFileSystem::OpenOutputStream(
const std::string& path, const std::shared_ptr<const KeyValueMetadata>& metadata) {
return Status::NotImplemented("The Azure FileSystem is not fully implemented");
ARROW_ASSIGN_OR_RAISE(auto location, AzureLocation::FromString(path));
return impl_->OpenAppendStream(location, metadata, true, this);
}

Result<std::shared_ptr<io::OutputStream>> AzureFileSystem::OpenAppendStream(
const std::string&, const std::shared_ptr<const KeyValueMetadata>&) {
return Status::NotImplemented("The Azure FileSystem is not fully implemented");
const std::string& path, const std::shared_ptr<const KeyValueMetadata>& metadata) {
ARROW_ASSIGN_OR_RAISE(auto location, AzureLocation::FromString(path));
return impl_->OpenAppendStream(location, metadata, false, this);
}

Result<std::shared_ptr<AzureFileSystem>> AzureFileSystem::Make(
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/arrow/filesystem/azurefs.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ struct ARROW_EXPORT AzureOptions {
std::shared_ptr<Azure::Core::Credentials::TokenCredential>
service_principle_credentials_provider;

/// \brief Default metadata for OpenOutputStream.
///
/// This will be ignored if non-empty metadata is passed to OpenOutputStream.
std::shared_ptr<const KeyValueMetadata> default_metadata;

AzureOptions();

Status ConfigureAccountKeyCredentials(const std::string& account_name,
Expand Down
Loading

0 comments on commit c1b12ca

Please sign in to comment.