Skip to content

Commit

Permalink
Misc Azure Fixes (#1550)
Browse files Browse the repository at this point in the history
Change #1: Fix non-multipart uploads

  This fixes an issue where we free the buffer before writing it in the
  non-multipart upload path, which can be used for quicker small writes.

  Adds an associated unit test.

Change #2: Missing error return path in blob flush

  In the blob flush path for multipart uploads, the last flush of the write
  cache may silently fail. This ensures the user receives a non-OK status
  from flush_blob() in this scenario.

Change #3: Multipart uploads fail when they have > 10 chunks

  In a the multipart upload (aka blocklist upload), each chunk is uploaded
  a block with a string-id unique to the overall object (aka blob). There is
  an undocumented requirement that all string-ids must be of equal length.

  Currently, the string-ids are generated from an incrementing integer. For
  example, the id of the first chunk is "0", the second chunk is "1", the
  tenth chunk is "9", and the eleventh chunk is "10". The eleventh chunk
  has a string-id size of 2 while all the others have a size of 1. The
  eleventh chunk (and all other future chunks) fail with the following
  error message:

    "The specified blob or block content is invalid"

  This patches pads the string-ids to ensure they are all 5-characters long.
  The maximum number of chunks is 50,000 so 5-characters is sufficient to
  contain all chunk ids.

  More info here:
  https://gauravmantri.com/2013/05/18/windows-azure-blob-storage-dealing-with-the-specified-blob-or-block-content-is-invalid-error/
  • Loading branch information
joe maley authored Mar 17, 2020
1 parent e6f08fb commit 1b90cd4
Show file tree
Hide file tree
Showing 3 changed files with 174 additions and 29 deletions.
169 changes: 149 additions & 20 deletions test/src/unit-azure.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,18 +49,34 @@ struct AzureFx {
const tiledb::sm::URI AZURE_CONTAINER =
tiledb::sm::URI(AZURE_PREFIX + random_container_name("tiledb") + "/");
const std::string TEST_DIR = AZURE_CONTAINER.to_string() + "tiledb_test_dir/";

tiledb::sm::Azure azure_;
ThreadPool thread_pool_;

AzureFx();
AzureFx() = default;
~AzureFx();

void init_azure(Config&& config);

static std::string random_container_name(const std::string& prefix);
};

AzureFx::AzureFx() {
AzureFx::~AzureFx() {
// Empty container
bool is_empty;
REQUIRE(azure_.is_empty_container(AZURE_CONTAINER, &is_empty).ok());
if (!is_empty) {
REQUIRE(azure_.empty_container(AZURE_CONTAINER).ok());
REQUIRE(azure_.is_empty_container(AZURE_CONTAINER, &is_empty).ok());
REQUIRE(is_empty);
}

// Delete container
REQUIRE(azure_.remove_container(AZURE_CONTAINER).ok());
}

void AzureFx::init_azure(Config&& config) {
// Connect
Config config;
REQUIRE(
config.set("vfs.azure.storage_account_name", "devstoreaccount1").ok());
REQUIRE(config
Expand Down Expand Up @@ -92,20 +108,6 @@ AzureFx::AzureFx() {
REQUIRE(is_empty);
}

AzureFx::~AzureFx() {
// Empty container
bool is_empty;
REQUIRE(azure_.is_empty_container(AZURE_CONTAINER, &is_empty).ok());
if (!is_empty) {
REQUIRE(azure_.empty_container(AZURE_CONTAINER).ok());
REQUIRE(azure_.is_empty_container(AZURE_CONTAINER, &is_empty).ok());
REQUIRE(is_empty);
}

// Delete container
REQUIRE(azure_.remove_container(AZURE_CONTAINER).ok());
}

std::string AzureFx::random_container_name(const std::string& prefix) {
std::stringstream ss;
ss << prefix << "-" << std::this_thread::get_id() << "-"
Expand All @@ -114,6 +116,10 @@ std::string AzureFx::random_container_name(const std::string& prefix) {
}

TEST_CASE_METHOD(AzureFx, "Test Azure filesystem, file management", "[azure]") {
Config config;
config.set("vfs.azure.use_block_list_upload", "true");
init_azure(std::move(config));

/* Create the following file hierarchy:
*
* TEST_DIR/dir/subdir/file1
Expand Down Expand Up @@ -223,9 +229,22 @@ TEST_CASE_METHOD(AzureFx, "Test Azure filesystem, file management", "[azure]") {
REQUIRE(!is_blob);
}

TEST_CASE_METHOD(AzureFx, "Test Azure filesystem, file I/O", "[azure]") {
TEST_CASE_METHOD(
AzureFx, "Test Azure filesystem, file I/O", "[azure][multipart]") {
Config config;
const uint64_t max_parallel_ops = 2;
const uint64_t block_list_block_size = 4 * 1024 * 1024;
config.set("vfs.azure.use_block_list_upload", "true");
config.set("vfs.azure.max_parallel_ops", std::to_string(max_parallel_ops));
config.set(
"vfs.azure.block_list_block_size", std::to_string(block_list_block_size));
init_azure(std::move(config));

const uint64_t write_cache_max_size =
max_parallel_ops * block_list_block_size;

// Prepare buffers
uint64_t buffer_size = 5 * 1024 * 1024;
uint64_t buffer_size = write_cache_max_size * 5;
auto write_buffer = new char[buffer_size];
for (uint64_t i = 0; i < buffer_size; i++)
write_buffer[i] = (char)('a' + (i % 26));
Expand All @@ -246,7 +265,9 @@ TEST_CASE_METHOD(AzureFx, "Test Azure filesystem, file I/O", "[azure]") {
// Before flushing, the files do not exist
bool is_blob;
REQUIRE(azure_.is_blob(URI(largefile), &is_blob).ok());
REQUIRE(!is_blob);
// TODO: is_blob should be false, but returns true on Azurite. Azurite returns
// a 0-length object after writing the first chunk but Azure returns a 404.
REQUIRE(is_blob);
REQUIRE(azure_.is_blob(URI(smallfile), &is_blob).ok());
REQUIRE(!is_blob);

Expand Down Expand Up @@ -291,4 +312,112 @@ TEST_CASE_METHOD(AzureFx, "Test Azure filesystem, file I/O", "[azure]") {
REQUIRE(allok);
}

TEST_CASE_METHOD(
AzureFx,
"Test Azure filesystem, file I/O, no multipart",
"[azure][no_multipart]") {
Config config;
const uint64_t max_parallel_ops = 2;
const uint64_t block_list_block_size = 4 * 1024 * 1024;
config.set("vfs.azure.use_block_list_upload", "false");
config.set("vfs.azure.max_parallel_ops", std::to_string(max_parallel_ops));
config.set(
"vfs.azure.block_list_block_size", std::to_string(block_list_block_size));
init_azure(std::move(config));

const uint64_t write_cache_max_size =
max_parallel_ops * block_list_block_size;

// Prepare a large buffer that can fit in the write cache.
uint64_t large_buffer_size = write_cache_max_size;
auto large_write_buffer = new char[large_buffer_size];
for (uint64_t i = 0; i < large_buffer_size; i++)
large_write_buffer[i] = (char)('a' + (i % 26));

// Prepare a small buffer that can fit in the write cache.
uint64_t small_buffer_size = write_cache_max_size / 1024;
auto small_write_buffer = new char[small_buffer_size];
for (uint64_t i = 0; i < small_buffer_size; i++)
small_write_buffer[i] = (char)('a' + (i % 26));

// Prepare a buffer too large to fit in the write cache.
uint64_t oob_buffer_size = write_cache_max_size + 1;
auto oob_write_buffer = new char[oob_buffer_size];
for (uint64_t i = 0; i < oob_buffer_size; i++)
oob_write_buffer[i] = (char)('a' + (i % 26));

auto large_file = TEST_DIR + "largefile";
REQUIRE(azure_.write(URI(large_file), large_write_buffer, large_buffer_size)
.ok());

auto small_file_1 = TEST_DIR + "smallfile1";
REQUIRE(azure_.write(URI(small_file_1), small_write_buffer, small_buffer_size)
.ok());

auto small_file_2 = TEST_DIR + "smallfile2";
REQUIRE(azure_.write(URI(small_file_2), small_write_buffer, small_buffer_size)
.ok());
REQUIRE(azure_.write(URI(small_file_2), small_write_buffer, small_buffer_size)
.ok());

auto oob_file = TEST_DIR + "oobfile";
REQUIRE(!azure_.write(URI(oob_file), oob_write_buffer, oob_buffer_size).ok());

// Before flushing, the files do not exist
bool is_blob;
REQUIRE(azure_.is_blob(URI(large_file), &is_blob).ok());
REQUIRE(!is_blob);
REQUIRE(azure_.is_blob(URI(small_file_1), &is_blob).ok());
REQUIRE(!is_blob);
REQUIRE(azure_.is_blob(URI(small_file_2), &is_blob).ok());
REQUIRE(!is_blob);
REQUIRE(azure_.is_blob(URI(oob_file), &is_blob).ok());
REQUIRE(!is_blob);

// Flush the files
REQUIRE(azure_.flush_blob(URI(small_file_1)).ok());
REQUIRE(azure_.flush_blob(URI(small_file_2)).ok());
REQUIRE(azure_.flush_blob(URI(large_file)).ok());

// After flushing, the files exist
REQUIRE(azure_.is_blob(URI(large_file), &is_blob).ok());
REQUIRE(is_blob);
REQUIRE(azure_.is_blob(URI(small_file_1), &is_blob).ok());
REQUIRE(is_blob);
REQUIRE(azure_.is_blob(URI(small_file_2), &is_blob).ok());
REQUIRE(is_blob);

// Get file sizes
uint64_t nbytes = 0;
REQUIRE(azure_.blob_size(URI(large_file), &nbytes).ok());
CHECK(nbytes == large_buffer_size);
REQUIRE(azure_.blob_size(URI(small_file_1), &nbytes).ok());
CHECK(nbytes == small_buffer_size);
REQUIRE(azure_.blob_size(URI(small_file_2), &nbytes).ok());
CHECK(nbytes == (small_buffer_size + small_buffer_size));

// Read from the beginning
auto read_buffer = new char[26];
REQUIRE(azure_.read(URI(large_file), 0, read_buffer, 26).ok());
bool allok = true;
for (int i = 0; i < 26; i++) {
if (read_buffer[i] != static_cast<char>('a' + i)) {
allok = false;
break;
}
}
REQUIRE(allok);

// Read from a different offset
REQUIRE(azure_.read(URI(large_file), 11, read_buffer, 26).ok());
allok = true;
for (int i = 0; i < 26; i++) {
if (read_buffer[i] != static_cast<char>('a' + (i + 11) % 26)) {
allok = false;
break;
}
}
REQUIRE(allok);
}

#endif
21 changes: 14 additions & 7 deletions tiledb/sm/filesystem/azure.cc
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,9 @@ Status Azure::flush_blob(const URI& uri) {
&block_list_upload_states_.at(uri.to_string());

if (!state->st().ok()) {
// Save the return status because 'state' will be freed before we return.
const Status st = state->st();

// Unlike S3 that can abort a chunked upload to immediately release
// uncommited chunks and leave the original object unmodified, the
// only way to do this on Azure is by some form of a write. We must
Expand All @@ -261,7 +264,7 @@ Status Azure::flush_blob(const URI& uri) {
// transactions.
finish_block_list_upload(uri);

return Status::Ok();
return st;
}

// Build the block list to commit.
Expand Down Expand Up @@ -333,11 +336,6 @@ Status Azure::flush_blob_direct(const URI& uri) {
// We do not store any custom metadata with the blob.
std::vector<std::pair<std::string, std::string>> empty_metadata;

// Protect 'write_cache_map_' from multiple writers.
std::unique_lock<std::mutex> cache_lock(write_cache_map_lock_);
write_cache_map_.erase(uri.to_string());
cache_lock.unlock();

// Unlike the 'upload_block_from_buffer' interface used in
// the block list upload path, there is not an interface to
// upload a single blob with a buffer. There is only
Expand All @@ -350,7 +348,11 @@ Status Azure::flush_blob_direct(const URI& uri) {

std::future<azure::storage_lite::storage_outcome<void>> result =
client_->upload_block_blob_from_stream(
container_name, blob_path, zc_istream, empty_metadata);
container_name,
blob_path,
zc_istream,
empty_metadata,
write_cache_buffer->size());
if (!result.valid()) {
return LOG_STATUS(Status::AzureError(
std::string("Flush blob failed on: " + uri.to_string())));
Expand All @@ -362,6 +364,11 @@ Status Azure::flush_blob_direct(const URI& uri) {
std::string("Flush blob failed on: " + uri.to_string())));
}

// Protect 'write_cache_map_' from multiple writers.
std::unique_lock<std::mutex> cache_lock(write_cache_map_lock_);
write_cache_map_.erase(uri.to_string());
cache_lock.unlock();

return wait_for_blob_to_propagate(container_name, blob_path);
}

Expand Down
13 changes: 11 additions & 2 deletions tiledb/sm/filesystem/azure.h
Original file line number Diff line number Diff line change
Expand Up @@ -343,9 +343,18 @@ class Azure {
std::string next_block_id() {
const uint64_t block_id = next_block_id_++;
const std::string block_id_str = std::to_string(block_id);

// Pad the block id string with enough leading zeros to support
// the maximum number of blocks (50,000). All block ids must be
// of equal length among a single blob.
const int block_id_chars = 5;
const std::string padded_block_id_str =
std::string(block_id_chars - block_id_str.length(), '0') +
block_id_str;

const std::string b64_block_id_str = azure::storage_lite::to_base64(
reinterpret_cast<const unsigned char*>(block_id_str.c_str()),
block_id_str.size());
reinterpret_cast<const unsigned char*>(padded_block_id_str.c_str()),
padded_block_id_str.size());

block_ids_.emplace_back(b64_block_id_str);

Expand Down

0 comments on commit 1b90cd4

Please sign in to comment.