Skip to content

Commit

Permalink
Support remote arrays for fragment_list consolidation
Browse files Browse the repository at this point in the history
  • Loading branch information
ypatia committed Jun 7, 2024
1 parent 1326ed4 commit f776d91
Show file tree
Hide file tree
Showing 9 changed files with 336 additions and 134 deletions.
60 changes: 58 additions & 2 deletions test/src/unit-capi-consolidation.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

#include <test/support/tdb_catch.h>
#include "test/support/src/helpers.h"
#include "test/support/src/serialization_wrappers.h"
#include "test/support/src/vfs_helpers.h"
#include "tiledb/common/stdx_string.h"
#include "tiledb/platform/platform.h"
Expand Down Expand Up @@ -72,6 +73,8 @@ struct ConsolidationFx {
tiledb_encryption_type_t encryption_type_ = TILEDB_NO_ENCRYPTION;
const char* encryption_key_ = nullptr;

bool serialize_ = false;

// Constructors/destructors
ConsolidationFx();

Expand Down Expand Up @@ -5081,6 +5084,10 @@ TEST_CASE_METHOD(
ConsolidationFx,
"C API: Test advanced consolidation #1",
"[capi][consolidation][adv][adv-1][non-rest]") {
#ifdef TILEDB_SERIALIZATION
serialize_ = true;
#endif

remove_dense_vector();
create_dense_vector();
write_dense_vector_4_fragments();
Expand Down Expand Up @@ -5113,6 +5120,13 @@ TEST_CASE_METHOD(
REQUIRE(rc == TILEDB_OK);
REQUIRE(error == nullptr);

if (serialize_) {
std::vector<std::string> frag_uris_deserialized;
tiledb_array_consolidation_request_wrapper(
ctx_, tiledb_serialization_type_t(0), nullptr, &frag_uris_deserialized);
REQUIRE(frag_uris_deserialized.empty());
}

// Consolidate
rc = tiledb_array_consolidate(ctx_, dense_vector_uri_.c_str(), config);
CHECK(rc == TILEDB_OK);
Expand Down Expand Up @@ -7158,7 +7172,11 @@ TEST_CASE_METHOD(
TEST_CASE_METHOD(
ConsolidationFx,
"C API: Test consolidation, dense split fragments",
"[capi][consolidation][dense][split-fragments][non-rest]") {
"[capi][consolidation][dense][split-fragments][rest]") {
#ifdef TILEDB_SERIALIZATION
serialize_ = true;
#endif

remove_dense_array();
create_dense_array();
write_dense_subarray(1, 2, 1, 2);
Expand Down Expand Up @@ -7198,6 +7216,23 @@ TEST_CASE_METHOD(

// Consolidate
const char* uris[2] = {strrchr(uri, '/') + 1, strrchr(uri2, '/') + 1};

if (serialize_) {
std::vector<std::string> frag_uris;
frag_uris.reserve(2);
for (uint64_t i = 0; i < 2; i++) {
frag_uris.emplace_back(uris[i]);
}

std::vector<std::string> frag_uris_deserialized;
tiledb_array_consolidation_request_wrapper(
ctx_,
tiledb_serialization_type_t(0),
&frag_uris,
&frag_uris_deserialized);
REQUIRE(frag_uris == frag_uris_deserialized);
}

rc = tiledb_array_consolidate_fragments(
ctx_, dense_array_uri_.c_str(), uris, 2, cfg);
CHECK(rc == TILEDB_OK);
Expand Down Expand Up @@ -7234,7 +7269,11 @@ TEST_CASE_METHOD(
TEST_CASE_METHOD(
ConsolidationFx,
"C API: Test consolidation, sparse split fragments",
"[capi][consolidation][sparse][split-fragments][non-rest]") {
"[capi][consolidation][sparse][split-fragments][rest]") {
#ifdef TILEDB_SERIALIZATION
serialize_ = true;
#endif

remove_sparse_array();
create_sparse_array();
write_sparse_row(0);
Expand Down Expand Up @@ -7274,6 +7313,23 @@ TEST_CASE_METHOD(

// Consolidate
const char* uris[2] = {strrchr(uri, '/') + 1, strrchr(uri2, '/') + 1};

if (serialize_) {
std::vector<std::string> frag_uris;
frag_uris.reserve(2);
for (uint64_t i = 0; i < 2; i++) {
frag_uris.emplace_back(uris[i]);
}

std::vector<std::string> frag_uris_deserialized;
tiledb_array_consolidation_request_wrapper(
ctx_,
tiledb_serialization_type_t(0),
&frag_uris,
&frag_uris_deserialized);
REQUIRE(frag_uris == frag_uris_deserialized);
}

rc = tiledb_array_consolidate_fragments(
ctx_, sparse_array_uri_.c_str(), uris, 2, cfg);
CHECK(rc == TILEDB_OK);
Expand Down
28 changes: 28 additions & 0 deletions test/support/src/serialization_wrappers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,12 @@
*/

#include "test/support/src/helpers.h"
#include "tiledb/api/c_api/buffer/buffer_api_internal.h"
#include "tiledb/api/c_api/context/context_api_internal.h"
#include "tiledb/sm/c_api/tiledb.h"
#include "tiledb/sm/c_api/tiledb_serialization.h"
#include "tiledb/sm/c_api/tiledb_struct_def.h"
#include "tiledb/sm/serialization/consolidation.h"
#include "tiledb/sm/serialization/query.h"

#ifdef TILEDB_SERIALIZATION
Expand Down Expand Up @@ -218,3 +221,28 @@ void tiledb_subarray_serialize(
*subarray = deserialized_subarray;
#endif
}

void tiledb_array_consolidation_request_wrapper(
tiledb_ctx_t* ctx,
tiledb_serialization_type_t serialize_type,
const std::vector<std::string>* fragment_uris_in,
std::vector<std::string>* fragment_uris_out) {
// Serialize and Deserialize
auto buffer = tiledb_buffer_handle_t::make_handle();
serialization::array_consolidation_request_serialize(
ctx->config(),
fragment_uris_in,
static_cast<tiledb::sm::SerializationType>(serialize_type),
&(buffer->buffer()));

auto [config, fragment_uris_deser] =
serialization::array_consolidation_request_deserialize(
static_cast<tiledb::sm::SerializationType>(serialize_type),
buffer->buffer());

tiledb_buffer_handle_t::break_handle(buffer);

if (fragment_uris_deser.has_value()) {
*fragment_uris_out = fragment_uris_deser.value();
}
}
7 changes: 7 additions & 0 deletions test/support/src/serialization_wrappers.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,4 +141,11 @@ int tiledb_fragment_info_serialize(
*/
void tiledb_subarray_serialize(
tiledb_ctx_t* ctx, tiledb_array_t* array, tiledb_subarray_t** subarray);

void tiledb_array_consolidation_request_wrapper(
tiledb_ctx_t* ctx,
tiledb_serialization_type_t serialize_type,
const std::vector<std::string>* fragment_uris_in,
std::vector<std::string>* fragment_uris_out);

#endif // TILEDB_TEST_SERIALIZATION_WRAPPERS_H
58 changes: 55 additions & 3 deletions tiledb/sm/c_api/tiledb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2664,13 +2664,31 @@ int32_t tiledb_array_create_with_key(

int32_t tiledb_array_consolidate(
tiledb_ctx_t* ctx, const char* array_uri, tiledb_config_t* config) {
// Validate input arguments
api::ensure_context_is_valid(ctx);
api::ensure_config_is_valid_if_present(config);

auto uri = tiledb::sm::URI(array_uri);
if (uri.is_invalid()) {
throw api::CAPIStatusException(
"Failed to consolidate fragments; Invalid input array uri");
}

auto input_config = (config == nullptr) ? ctx->config() : config->config();
if (uri.is_tiledb() &&
tiledb::sm::Consolidator::mode_from_config(input_config) ==
tiledb::sm::ConsolidationMode::FRAGMENT) {
throw api::CAPIStatusException(
"Please use tiledb_array_consolidate_fragments API for consolidating "
"fragments on remote arrays.");
}

tiledb::sm::Consolidator::array_consolidate(
array_uri,
tiledb::sm::EncryptionType::NO_ENCRYPTION,
nullptr,
0,
(config == nullptr) ? ctx->config() : config->config(),
input_config,
ctx->storage_manager());
return TILEDB_OK;
}
Expand All @@ -2682,7 +2700,15 @@ int32_t tiledb_array_consolidate_with_key(
const void* encryption_key,
uint32_t key_length,
tiledb_config_t* config) {
// Sanity checks
// Validate input arguments
api::ensure_context_is_valid(ctx);
api::ensure_config_is_valid_if_present(config);

auto uri = tiledb::sm::URI(array_uri);
if (uri.is_invalid()) {
throw api::CAPIStatusException(
"Failed to consolidate fragments; Invalid input array uri");
}

tiledb::sm::Consolidator::array_consolidate(
array_uri,
Expand All @@ -2701,7 +2727,33 @@ int32_t tiledb_array_consolidate_fragments(
const char** fragment_uris,
const uint64_t num_fragments,
tiledb_config_t* config) {
// Sanity checks
// Validate input arguments
api::ensure_context_is_valid(ctx);
api::ensure_config_is_valid_if_present(config);

if (fragment_uris == nullptr) {
throw api::CAPIStatusException(
"Failed to consolidate fragments; Invalid input fragment list");
}

auto uri = tiledb::sm::URI(array_uri);
if (uri.is_invalid()) {
throw api::CAPIStatusException(
"Failed to consolidate fragments; Invalid input array uri");
}

if (num_fragments < 1) {
throw api::CAPIStatusException(
"Failed to consolidate fragments; Invalid input number of fragments");
}

for (size_t i = 0; i < num_fragments; i++) {
if (tiledb::sm::URI(fragment_uris[i]).is_invalid()) {
throw api::CAPIStatusException(
"Failed to consolidate fragments; Invalid uri(s) in input fragment "
"list");
}
}

// Convert the list of fragments to a vector
std::vector<std::string> uris;
Expand Down
106 changes: 57 additions & 49 deletions tiledb/sm/consolidator/consolidator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -146,21 +146,21 @@ void Consolidator::array_consolidate(
throw ConsolidatorException("Cannot consolidate array; Invalid URI");
}

// Check if array exists
ObjectType obj_type;
throw_if_not_ok(
object_type(storage_manager->resources(), array_uri, &obj_type));

if (obj_type != ObjectType::ARRAY) {
throw ConsolidatorException(
"Cannot consolidate array; Array does not exist");
}

if (array_uri.is_tiledb()) {
throw_if_not_ok(
storage_manager->resources().rest_client()->post_consolidation_to_rest(
array_uri, config));
} else {
// Check if array exists
ObjectType obj_type;
throw_if_not_ok(
object_type(storage_manager->resources(), array_uri, &obj_type));

if (obj_type != ObjectType::ARRAY) {
throw ConsolidatorException(
"Cannot consolidate array; Array does not exist");
}

// Get encryption key from config
std::string encryption_key_from_cfg;
if (!encryption_key) {
Expand Down Expand Up @@ -210,50 +210,58 @@ void Consolidator::fragments_consolidate(
throw ConsolidatorException("Cannot consolidate array; Invalid URI");
}

// Check if array exists
ObjectType obj_type;
throw_if_not_ok(
object_type(storage_manager->resources(), array_uri, &obj_type));
if (array_uri.is_tiledb()) {
throw_if_not_ok(
storage_manager->resources().rest_client()->post_consolidation_to_rest(
array_uri, config, &fragment_uris));
} else {
// Check if array exists
ObjectType obj_type;
throw_if_not_ok(
object_type(storage_manager->resources(), array_uri, &obj_type));

if (obj_type != ObjectType::ARRAY) {
throw ConsolidatorException(
"Cannot consolidate array; Array does not exist");
}
if (obj_type != ObjectType::ARRAY) {
throw ConsolidatorException(
"Cannot consolidate array; Array does not exist");
}

// Get encryption key from config
std::string encryption_key_from_cfg;
if (!encryption_key) {
bool found = false;
encryption_key_from_cfg = config.get("sm.encryption_key", &found);
assert(found);
}
// Get encryption key from config
std::string encryption_key_from_cfg;
if (!encryption_key) {
bool found = false;
encryption_key_from_cfg = config.get("sm.encryption_key", &found);
assert(found);
}

if (!encryption_key_from_cfg.empty()) {
encryption_key = encryption_key_from_cfg.c_str();
key_length = static_cast<uint32_t>(encryption_key_from_cfg.size());
std::string encryption_type_from_cfg;
bool found = false;
encryption_type_from_cfg = config.get("sm.encryption_type", &found);
assert(found);
auto [st, et] = encryption_type_enum(encryption_type_from_cfg);
throw_if_not_ok(st);
encryption_type = et.value();

if (!encryption_key_from_cfg.empty()) {
encryption_key = encryption_key_from_cfg.c_str();
key_length = static_cast<uint32_t>(encryption_key_from_cfg.size());
std::string encryption_type_from_cfg;
bool found = false;
encryption_type_from_cfg = config.get("sm.encryption_type", &found);
assert(found);
auto [st, et] = encryption_type_enum(encryption_type_from_cfg);
throw_if_not_ok(st);
encryption_type = et.value();

if (!EncryptionKey::is_valid_key_length(
encryption_type,
static_cast<uint32_t>(encryption_key_from_cfg.size()))) {
encryption_key = nullptr;
key_length = 0;
if (!EncryptionKey::is_valid_key_length(
encryption_type,
static_cast<uint32_t>(encryption_key_from_cfg.size()))) {
encryption_key = nullptr;
key_length = 0;
}
}
}

// Consolidate
auto consolidator = Consolidator::create(
ConsolidationMode::FRAGMENT, config, storage_manager);
auto fragment_consolidator =
dynamic_cast<FragmentConsolidator*>(consolidator.get());
throw_if_not_ok(fragment_consolidator->consolidate_fragments(
array_name, encryption_type, encryption_key, key_length, fragment_uris));
// Consolidate
auto fragment_consolidator =
make_shared<FragmentConsolidator>(HERE(), config, storage_manager);
throw_if_not_ok(fragment_consolidator->consolidate_fragments(
array_name,
encryption_type,
encryption_key,
key_length,
fragment_uris));
}
}

void Consolidator::write_consolidated_commits_file(
Expand Down
Loading

0 comments on commit f776d91

Please sign in to comment.