From 8cfb5cb655056811933780a0f0f8889603477fe3 Mon Sep 17 00:00:00 2001 From: Ypatia Tsavliri Date: Fri, 7 Jun 2024 16:15:52 +0300 Subject: [PATCH 1/6] Support remote arrays for fragment_list consolidation --- test/src/unit-capi-consolidation.cc | 4 +- test/support/src/serialization_wrappers.cc | 28 ++++ test/support/src/serialization_wrappers.h | 8 ++ tiledb/sm/c_api/tiledb.cc | 48 ++++++- tiledb/sm/consolidator/consolidator.cc | 92 +++++++------ tiledb/sm/rest/rest_client.h | 2 +- tiledb/sm/rest/rest_client_remote.cc | 11 +- tiledb/sm/rest/rest_client_remote.h | 6 +- tiledb/sm/serialization/consolidation.cc | 149 ++++++++++++--------- tiledb/sm/serialization/consolidation.h | 33 ++--- 10 files changed, 254 insertions(+), 127 deletions(-) diff --git a/test/src/unit-capi-consolidation.cc b/test/src/unit-capi-consolidation.cc index 36e7d485050..f91c3ba17b6 100644 --- a/test/src/unit-capi-consolidation.cc +++ b/test/src/unit-capi-consolidation.cc @@ -7169,7 +7169,7 @@ TEST_CASE_METHOD( TEST_CASE_METHOD( ConsolidationFx, "C API: Test consolidation, dense split fragments", - "[capi][consolidation][dense][split-fragments][non-rest]") { + "[capi][consolidation][dense][split-fragments][rest]") { remove_dense_array(); create_dense_array(); write_dense_subarray(1, 2, 1, 2); @@ -7253,7 +7253,7 @@ TEST_CASE_METHOD( TEST_CASE_METHOD( ConsolidationFx, "C API: Test consolidation, sparse split fragments", - "[capi][consolidation][sparse][split-fragments][non-rest]") { + "[capi][consolidation][sparse][split-fragments][rest]") { remove_sparse_array(); create_sparse_array(); write_sparse_row(0); diff --git a/test/support/src/serialization_wrappers.cc b/test/support/src/serialization_wrappers.cc index 6590523138c..eb3b6786d7e 100644 --- a/test/support/src/serialization_wrappers.cc +++ b/test/support/src/serialization_wrappers.cc @@ -33,9 +33,12 @@ #include "test/support/src/helpers.h" #include "tiledb/api/c_api/array_schema/array_schema_api_internal.h" +#include "tiledb/api/c_api/buffer/buffer_api_internal.h" +#include "tiledb/api/c_api/context/context_api_internal.h" #include "tiledb/sm/c_api/tiledb.h" #include "tiledb/sm/c_api/tiledb_serialization.h" #include "tiledb/sm/c_api/tiledb_struct_def.h" +#include "tiledb/sm/serialization/consolidation.h" #include "tiledb/sm/serialization/query.h" #ifdef TILEDB_SERIALIZATION @@ -219,3 +222,28 @@ void tiledb_subarray_serialize( *subarray = deserialized_subarray; #endif } + +void tiledb_array_consolidation_request_wrapper( + tiledb_ctx_t* ctx, + tiledb_serialization_type_t serialize_type, + const std::vector* fragment_uris_in, + std::vector* fragment_uris_out) { + // Serialize and Deserialize + auto buffer = tiledb_buffer_handle_t::make_handle(); + serialization::array_consolidation_request_serialize( + ctx->config(), + fragment_uris_in, + static_cast(serialize_type), + &(buffer->buffer())); + + auto [config, fragment_uris_deser] = + serialization::array_consolidation_request_deserialize( + static_cast(serialize_type), + buffer->buffer()); + + tiledb_buffer_handle_t::break_handle(buffer); + + if (fragment_uris_deser.has_value()) { + *fragment_uris_out = fragment_uris_deser.value(); + } +} diff --git a/test/support/src/serialization_wrappers.h b/test/support/src/serialization_wrappers.h index 55317de32d4..907ce2e7e7d 100644 --- a/test/support/src/serialization_wrappers.h +++ b/test/support/src/serialization_wrappers.h @@ -35,6 +35,7 @@ #define TILEDB_TEST_SERIALIZATION_WRAPPERS_H #include +#include #include "tiledb/sm/c_api/tiledb.h" #include "tiledb/sm/c_api/tiledb_serialization.h" @@ -141,4 +142,11 @@ int tiledb_fragment_info_serialize( */ void tiledb_subarray_serialize( tiledb_ctx_t* ctx, tiledb_array_t* array, tiledb_subarray_t** subarray); + +void tiledb_array_consolidation_request_wrapper( + tiledb_ctx_t* ctx, + tiledb_serialization_type_t serialize_type, + const std::vector* fragment_uris_in, + std::vector* fragment_uris_out); + #endif // TILEDB_TEST_SERIALIZATION_WRAPPERS_H diff --git a/tiledb/sm/c_api/tiledb.cc b/tiledb/sm/c_api/tiledb.cc index 71ac3033e7d..f17e753b7ac 100644 --- a/tiledb/sm/c_api/tiledb.cc +++ b/tiledb/sm/c_api/tiledb.cc @@ -1709,14 +1709,32 @@ int32_t tiledb_array_create( int32_t tiledb_array_consolidate( tiledb_ctx_t* ctx, const char* array_uri, tiledb_config_t* config) { + // Validate input arguments + api::ensure_context_is_valid(ctx); api::ensure_config_is_valid_if_present(config); + + auto uri = tiledb::sm::URI(array_uri); + if (uri.is_invalid()) { + throw api::CAPIStatusException( + "Failed to consolidate fragments; Invalid input array uri"); + } + + auto input_config = (config == nullptr) ? ctx->config() : config->config(); + if (uri.is_tiledb() && + tiledb::sm::Consolidator::mode_from_config(input_config) == + tiledb::sm::ConsolidationMode::FRAGMENT) { + throw api::CAPIStatusException( + "Please use tiledb_array_consolidate_fragments API for consolidating " + "fragments on remote arrays."); + } + tiledb::sm::Consolidator::array_consolidate( ctx->resources(), array_uri, tiledb::sm::EncryptionType::NO_ENCRYPTION, nullptr, 0, - (config == nullptr) ? ctx->config() : config->config(), + input_config, ctx->storage_manager()); return TILEDB_OK; } @@ -1727,7 +1745,33 @@ int32_t tiledb_array_consolidate_fragments( const char** fragment_uris, const uint64_t num_fragments, tiledb_config_t* config) { - // Sanity checks + // Validate input arguments + api::ensure_context_is_valid(ctx); + api::ensure_config_is_valid_if_present(config); + + if (fragment_uris == nullptr) { + throw api::CAPIStatusException( + "Failed to consolidate fragments; Invalid input fragment list"); + } + + auto uri = tiledb::sm::URI(array_uri); + if (uri.is_invalid()) { + throw api::CAPIStatusException( + "Failed to consolidate fragments; Invalid input array uri"); + } + + if (num_fragments < 1) { + throw api::CAPIStatusException( + "Failed to consolidate fragments; Invalid input number of fragments"); + } + + for (size_t i = 0; i < num_fragments; i++) { + if (tiledb::sm::URI(fragment_uris[i]).is_invalid()) { + throw api::CAPIStatusException( + "Failed to consolidate fragments; Invalid uri(s) in input fragment " + "list"); + } + } // Convert the list of fragments to a vector std::vector uris; diff --git a/tiledb/sm/consolidator/consolidator.cc b/tiledb/sm/consolidator/consolidator.cc index a003a20fc5b..2eeb5b2720a 100644 --- a/tiledb/sm/consolidator/consolidator.cc +++ b/tiledb/sm/consolidator/consolidator.cc @@ -151,16 +151,16 @@ void Consolidator::array_consolidate( throw ConsolidatorException("Cannot consolidate array; Invalid URI"); } - // Check if array exists - if (object_type(resources, array_uri) != ObjectType::ARRAY) { - throw ConsolidatorException( - "Cannot consolidate array; Array does not exist"); - } - if (array_uri.is_tiledb()) { throw_if_not_ok( resources.rest_client()->post_consolidation_to_rest(array_uri, config)); } else { + // Check if array exists + if (object_type(resources, array_uri) != ObjectType::ARRAY) { + throw ConsolidatorException( + "Cannot consolidate array; Array does not exist"); + } + // Get encryption key from config std::string encryption_key_from_cfg; if (!encryption_key) { @@ -212,46 +212,54 @@ void Consolidator::fragments_consolidate( throw ConsolidatorException("Cannot consolidate array; Invalid URI"); } - // Check if array exists - if (object_type(resources, array_uri) != ObjectType::ARRAY) { - throw ConsolidatorException( - "Cannot consolidate array; Array does not exist"); - } + if (array_uri.is_tiledb()) { + throw_if_not_ok( + storage_manager->resources().rest_client()->post_consolidation_to_rest( + array_uri, config, &fragment_uris)); + } else { + // Check if array exists + if (object_type(resources, array_uri) != ObjectType::ARRAY) { + throw ConsolidatorException( + "Cannot consolidate array; Array does not exist"); + } - // Get encryption key from config - std::string encryption_key_from_cfg; - if (!encryption_key) { - bool found = false; - encryption_key_from_cfg = config.get("sm.encryption_key", &found); - assert(found); - } + // Get encryption key from config + std::string encryption_key_from_cfg; + if (!encryption_key) { + bool found = false; + encryption_key_from_cfg = config.get("sm.encryption_key", &found); + assert(found); + } - if (!encryption_key_from_cfg.empty()) { - encryption_key = encryption_key_from_cfg.c_str(); - key_length = static_cast(encryption_key_from_cfg.size()); - std::string encryption_type_from_cfg; - bool found = false; - encryption_type_from_cfg = config.get("sm.encryption_type", &found); - assert(found); - auto [st, et] = encryption_type_enum(encryption_type_from_cfg); - throw_if_not_ok(st); - encryption_type = et.value(); - - if (!EncryptionKey::is_valid_key_length( - encryption_type, - static_cast(encryption_key_from_cfg.size()))) { - encryption_key = nullptr; - key_length = 0; + if (!encryption_key_from_cfg.empty()) { + encryption_key = encryption_key_from_cfg.c_str(); + key_length = static_cast(encryption_key_from_cfg.size()); + std::string encryption_type_from_cfg; + bool found = false; + encryption_type_from_cfg = config.get("sm.encryption_type", &found); + assert(found); + auto [st, et] = encryption_type_enum(encryption_type_from_cfg); + throw_if_not_ok(st); + encryption_type = et.value(); + + if (!EncryptionKey::is_valid_key_length( + encryption_type, + static_cast(encryption_key_from_cfg.size()))) { + encryption_key = nullptr; + key_length = 0; + } } - } - // Consolidate - auto consolidator = Consolidator::create( - resources, ConsolidationMode::FRAGMENT, config, storage_manager); - auto fragment_consolidator = - dynamic_cast(consolidator.get()); - throw_if_not_ok(fragment_consolidator->consolidate_fragments( - array_name, encryption_type, encryption_key, key_length, fragment_uris)); + // Consolidate + auto fragment_consolidator = + make_shared(HERE(), config, storage_manager); + throw_if_not_ok(fragment_consolidator->consolidate_fragments( + array_name, + encryption_type, + encryption_key, + key_length, + fragment_uris)); + } } void Consolidator::write_consolidated_commits_file( diff --git a/tiledb/sm/rest/rest_client.h b/tiledb/sm/rest/rest_client.h index 80d33d8a841..c4e6a2dc106 100644 --- a/tiledb/sm/rest/rest_client.h +++ b/tiledb/sm/rest/rest_client.h @@ -466,7 +466,7 @@ class RestClient { } /// Operation disabled in base class. - inline virtual Status post_consolidation_to_rest(const URI&, const Config&) { + inline virtual Status post_consolidation_to_rest(const URI&, const Config&, const std::vector*) { throw RestClientDisabledException(); } diff --git a/tiledb/sm/rest/rest_client_remote.cc b/tiledb/sm/rest/rest_client_remote.cc index afe69ea87a3..fce2060770a 100644 --- a/tiledb/sm/rest/rest_client_remote.cc +++ b/tiledb/sm/rest/rest_client_remote.cc @@ -1531,11 +1531,14 @@ Status RestClientRemote::ensure_json_null_delimited_string(Buffer* buffer) { return Status::Ok(); } -Status RestClientRemote::post_consolidation_to_rest( - const URI& uri, const Config& config) { +Status RestClient::post_consolidation_to_rest( + const URI& uri, + const Config& config, + const std::vector* fragment_uris) { Buffer buff; - RETURN_NOT_OK(serialization::array_consolidation_request_serialize( - config, serialization_type_, &buff)); + serialization::array_consolidation_request_serialize( + config, fragment_uris, serialization_type_, &buff); + // Wrap in a list BufferList serialized; RETURN_NOT_OK(serialized.add_buffer(std::move(buff))); diff --git a/tiledb/sm/rest/rest_client_remote.h b/tiledb/sm/rest/rest_client_remote.h index d40cdda2121..7555d41f68a 100644 --- a/tiledb/sm/rest/rest_client_remote.h +++ b/tiledb/sm/rest/rest_client_remote.h @@ -432,10 +432,14 @@ class RestClientRemote : public RestClient { * * @param uri Array URI * @param config config + * @param fragment_uris The uris of the fragments to be consolidated if this + * is a request for fragment list consolidation * @return */ Status post_consolidation_to_rest( - const URI& uri, const Config& config) override; + const URI& uri, + const Config& config, + const std::vector* fragment_uris = nullptr) override; /** * Post array vacuum request to the REST server. diff --git a/tiledb/sm/serialization/consolidation.cc b/tiledb/sm/serialization/consolidation.cc index e6c06c1ecaf..69b368a909e 100644 --- a/tiledb/sm/serialization/consolidation.cc +++ b/tiledb/sm/serialization/consolidation.cc @@ -43,6 +43,7 @@ // clang-format on #include "tiledb/common/logger_public.h" +#include "tiledb/sm/consolidator/consolidator.h" #include "tiledb/sm/enums/serialization_type.h" #include "tiledb/sm/serialization/config.h" #include "tiledb/sm/serialization/consolidation.h" @@ -71,34 +72,67 @@ class ConsolidationSerializationDisabledException #ifdef TILEDB_SERIALIZATION -Status array_consolidation_request_to_capnp( +void array_consolidation_request_to_capnp( const Config& config, + const std::vector* fragment_uris, capnp::ArrayConsolidationRequest::Builder* array_consolidation_request_builder) { + // Validate input arguments to be sure that what we are serializing make sense + auto mode = Consolidator::mode_from_config(config); + if (mode != ConsolidationMode::FRAGMENT && + (fragment_uris != nullptr || !fragment_uris->empty())) { + throw ConsolidationSerializationException( + "[array_consolidation_request_to_capnp] Error serializing " + "consolidation request. A non-empty fragment list should only be " + "provided for fragment consolidation."); + } + auto config_builder = array_consolidation_request_builder->initConfig(); - RETURN_NOT_OK(config_to_capnp(config, &config_builder)); - return Status::Ok(); + throw_if_not_ok(config_to_capnp(config, &config_builder)); + + if (fragment_uris != nullptr && !fragment_uris->empty()) { + auto fragment_list_builder = + array_consolidation_request_builder->initFragments( + fragment_uris->size()); + for (size_t i = 0; i < fragment_uris->size(); i++) { + fragment_list_builder.set(i, (*fragment_uris)[i]); + } + } } -Status array_consolidation_request_from_capnp( +std::pair>> +array_consolidation_request_from_capnp( const capnp::ArrayConsolidationRequest::Reader& - array_consolidation_request_reader, - tdb_unique_ptr* config) { + array_consolidation_request_reader) { auto config_reader = array_consolidation_request_reader.getConfig(); - RETURN_NOT_OK(config_from_capnp(config_reader, config)); - return Status::Ok(); + tdb_unique_ptr decoded_config = nullptr; + throw_if_not_ok(config_from_capnp(config_reader, &decoded_config)); + + std::vector fragment_uris; + if (array_consolidation_request_reader.hasFragments()) { + auto fragment_reader = array_consolidation_request_reader.getFragments(); + fragment_uris.reserve(fragment_reader.size()); + for (const auto& fragment_uri : fragment_reader) { + fragment_uris.emplace_back(fragment_uri); + } + + return {*decoded_config, fragment_uris}; + } else { + return {*decoded_config, std::nullopt}; + } } -Status array_consolidation_request_serialize( +void array_consolidation_request_serialize( const Config& config, + const std::vector* fragment_uris, SerializationType serialize_type, Buffer* serialized_buffer) { try { ::capnp::MallocMessageBuilder message; capnp::ArrayConsolidationRequest::Builder ArrayConsolidationRequestBuilder = message.initRoot(); - RETURN_NOT_OK(array_consolidation_request_to_capnp( - config, &ArrayConsolidationRequestBuilder)); + array_consolidation_request_to_capnp( + config, fragment_uris, &ArrayConsolidationRequestBuilder); serialized_buffer->reset_size(); serialized_buffer->reset_offset(); @@ -110,45 +144,44 @@ Status array_consolidation_request_serialize( const auto json_len = capnp_json.size(); const char nul = '\0'; // size does not include needed null terminator, so add +1 - RETURN_NOT_OK(serialized_buffer->realloc(json_len + 1)); - RETURN_NOT_OK(serialized_buffer->write(capnp_json.cStr(), json_len)); - RETURN_NOT_OK(serialized_buffer->write(&nul, 1)); + throw_if_not_ok(serialized_buffer->realloc(json_len + 1)); + throw_if_not_ok(serialized_buffer->write(capnp_json.cStr(), json_len)); + throw_if_not_ok(serialized_buffer->write(&nul, 1)); break; } case SerializationType::CAPNP: { kj::Array<::capnp::word> protomessage = messageToFlatArray(message); kj::ArrayPtr message_chars = protomessage.asChars(); const auto nbytes = message_chars.size(); - RETURN_NOT_OK(serialized_buffer->realloc(nbytes)); - RETURN_NOT_OK(serialized_buffer->write(message_chars.begin(), nbytes)); + throw_if_not_ok(serialized_buffer->realloc(nbytes)); + throw_if_not_ok( + serialized_buffer->write(message_chars.begin(), nbytes)); break; } default: { - return LOG_STATUS(Status_SerializationError( - "Error serializing config; Unknown serialization type " - "passed")); + throw ConsolidationSerializationException( + "[array_consolidation_request_serialize] Error serializing config; " + "Unknown serialization type passed"); } } } catch (kj::Exception& e) { - return LOG_STATUS(Status_SerializationError( - "Error serializing config; kj::Exception: " + - std::string(e.getDescription().cStr()))); + throw ConsolidationSerializationException( + "[array_consolidation_request_serialize] Error serializing config; " + "kj::Exception: " + + std::string(e.getDescription().cStr())); } catch (std::exception& e) { - return LOG_STATUS(Status_SerializationError( - "Error serializing config; exception " + std::string(e.what()))); + throw ConsolidationSerializationException( + "[array_consolidation_request_serialize] Error serializing config; " + "exception " + + std::string(e.what())); } - - return Status::Ok(); } -Status array_consolidation_request_deserialize( - Config** config, - SerializationType serialize_type, - const Buffer& serialized_buffer) { +std::pair>> +array_consolidation_request_deserialize( + SerializationType serialize_type, const Buffer& serialized_buffer) { try { - tdb_unique_ptr decoded_config = nullptr; - switch (serialize_type) { case SerializationType::JSON: { ::capnp::JsonCodec json; @@ -162,8 +195,8 @@ Status array_consolidation_request_deserialize( capnp::ArrayConsolidationRequest::Reader array_consolidation_request_reader = array_consolidation_request_builder.asReader(); - RETURN_NOT_OK(array_consolidation_request_from_capnp( - array_consolidation_request_reader, &decoded_config)); + return array_consolidation_request_from_capnp( + array_consolidation_request_reader); break; } case SerializationType::CAPNP: { @@ -175,32 +208,27 @@ Status array_consolidation_request_deserialize( capnp::ArrayConsolidationRequest::Reader array_consolidation_request_reader = reader.getRoot(); - RETURN_NOT_OK(array_consolidation_request_from_capnp( - array_consolidation_request_reader, &decoded_config)); + return array_consolidation_request_from_capnp( + array_consolidation_request_reader); break; } default: { - return LOG_STATUS(Status_SerializationError( - "Error deserializing config; Unknown serialization type " - "passed")); + throw ConsolidationSerializationException( + "[array_consolidation_request_deserialize] Error deserializing " + "config; Unknown serialization type passed"); } } - - if (decoded_config == nullptr) - return LOG_STATUS(Status_SerializationError( - "Error serializing config; deserialized config is null")); - - *config = decoded_config.release(); } catch (kj::Exception& e) { - return LOG_STATUS(Status_SerializationError( - "Error deserializing config; kj::Exception: " + - std::string(e.getDescription().cStr()))); + throw ConsolidationSerializationException( + "[array_consolidation_request_deserialize] Error deserializing config; " + "kj::Exception: " + + std::string(e.getDescription().cStr())); } catch (std::exception& e) { - return LOG_STATUS(Status_SerializationError( - "Error deserializing config; exception " + std::string(e.what()))); + throw ConsolidationSerializationException( + "[array_consolidation_request_deserialize] Error deserializing config; " + "exception " + + std::string(e.what())); } - - return Status::Ok(); } void consolidation_plan_request_to_capnp( @@ -435,16 +463,17 @@ std::vector> deserialize_consolidation_plan_response( #else -Status array_consolidation_request_serialize( - const Config&, SerializationType, Buffer*) { - return LOG_STATUS(Status_SerializationError( - "Cannot serialize; serialization not enabled.")); +void array_consolidation_request_serialize( + const Config&, + const std::vector*, + SerializationType, + Buffer*) { + throw ConsolidationSerializationDisabledException(); } -Status array_consolidation_request_deserialize( - Config**, SerializationType, const Buffer&) { - return LOG_STATUS(Status_SerializationError( - "Cannot deserialize; serialization not enabled.")); +std::pair>> +array_consolidation_request_deserialize(SerializationType, const Buffer&) { + throw ConsolidationSerializationDisabledException(); } void serialize_consolidation_plan_request( diff --git a/tiledb/sm/serialization/consolidation.h b/tiledb/sm/serialization/consolidation.h index d27c6c6e346..3be366a4dc6 100644 --- a/tiledb/sm/serialization/consolidation.h +++ b/tiledb/sm/serialization/consolidation.h @@ -58,22 +58,24 @@ namespace serialization { * Convert Cap'n Proto message to Consolidation request * * @param consolidation_req_reader cap'n proto class. - * @param config config object to deserialize into. - * @return Status + * @return {config, fragment_uris} config object to deserialize into, and the + * uris of the fragments to be consolidated if any */ -Status array_consolidation_request_from_capnp( - const capnp::ArrayConsolidationRequest::Reader& consolidation_req_reader, - Config* Config); +std::pair>> +array_consolidation_request_from_capnp( + const capnp::ArrayConsolidationRequest::Reader& consolidation_req_reader); /** * Convert Consolidation Request to Cap'n Proto message. * * @param config config to serialize info from + * @param fragment_uris The uris of the fragments to be consolidated if this is + * a request for fragment list consolidation * @param consolidation_req_builder cap'n proto class. - * @return Status */ -Status array_consolidation_request_to_capnp( +void array_consolidation_request_to_capnp( const Config& config, + const std::vector* fragment_uris, capnp::ArrayConsolidationRequest::Builder* consolidation_req_builder); #endif @@ -81,27 +83,28 @@ Status array_consolidation_request_to_capnp( * Serialize a consolidation request via Cap'n Proto. * * @param config config object to get info to serialize. + * @param fragment_uris The uris of the fragments to be consolidated if this is + * a request for fragment list consolidation * @param serialize_type format to serialize into Cap'n Proto or JSON. * @param serialized_buffer buffer to store serialized bytes in. - * @return Status */ -Status array_consolidation_request_serialize( +void array_consolidation_request_serialize( const Config& config, + const std::vector* fragment_uris, SerializationType serialize_type, Buffer* serialized_buffer); /** * Deserialize consolidation request via Cap'n Proto * - * @param config config object to store the deserialized info in. * @param serialize_type format the data is serialized in: Cap'n Proto of JSON. * @param serialized_buffer buffer to read serialized bytes from. - * @return Status + * @return {config, fragment_uris} config object to deserialize into, and the + * uris of the fragments to be consolidated if any */ -Status array_consolidation_request_deserialize( - Config** config, - SerializationType serialize_type, - const Buffer& serialized_buffer); +std::pair>> +array_consolidation_request_deserialize( + SerializationType serialize_type, const Buffer& serialized_buffer); /** * Serialize a consolidation plan request via Cap'n Proto. From 361df734994f2bbb75f8f2cca284403643a23171 Mon Sep 17 00:00:00 2001 From: Ypatia Tsavliri Date: Fri, 21 Jun 2024 11:15:58 +0300 Subject: [PATCH 2/6] Fix boolean clause for empty fragment list serialization Co-authored-by: Shaun M Reed --- tiledb/sm/serialization/consolidation.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tiledb/sm/serialization/consolidation.cc b/tiledb/sm/serialization/consolidation.cc index 69b368a909e..ba041e5788c 100644 --- a/tiledb/sm/serialization/consolidation.cc +++ b/tiledb/sm/serialization/consolidation.cc @@ -80,7 +80,7 @@ void array_consolidation_request_to_capnp( // Validate input arguments to be sure that what we are serializing make sense auto mode = Consolidator::mode_from_config(config); if (mode != ConsolidationMode::FRAGMENT && - (fragment_uris != nullptr || !fragment_uris->empty())) { + (fragment_uris != nullptr && !fragment_uris->empty())) { throw ConsolidationSerializationException( "[array_consolidation_request_to_capnp] Error serializing " "consolidation request. A non-empty fragment list should only be " From 4b7d592d91704eb03d4bde5d44af90637f499f52 Mon Sep 17 00:00:00 2001 From: Ypatia Tsavliri Date: Fri, 21 Jun 2024 11:38:38 +0300 Subject: [PATCH 3/6] Fix rebase issue --- tiledb/sm/consolidator/consolidator.cc | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tiledb/sm/consolidator/consolidator.cc b/tiledb/sm/consolidator/consolidator.cc index 2eeb5b2720a..8e5b30084c3 100644 --- a/tiledb/sm/consolidator/consolidator.cc +++ b/tiledb/sm/consolidator/consolidator.cc @@ -155,7 +155,7 @@ void Consolidator::array_consolidate( throw_if_not_ok( resources.rest_client()->post_consolidation_to_rest(array_uri, config)); } else { - // Check if array exists + // Check if array exists if (object_type(resources, array_uri) != ObjectType::ARRAY) { throw ConsolidatorException( "Cannot consolidate array; Array does not exist"); @@ -217,7 +217,7 @@ void Consolidator::fragments_consolidate( storage_manager->resources().rest_client()->post_consolidation_to_rest( array_uri, config, &fragment_uris)); } else { - // Check if array exists + // Check if array exists if (object_type(resources, array_uri) != ObjectType::ARRAY) { throw ConsolidatorException( "Cannot consolidate array; Array does not exist"); @@ -251,8 +251,8 @@ void Consolidator::fragments_consolidate( } // Consolidate - auto fragment_consolidator = - make_shared(HERE(), config, storage_manager); + auto fragment_consolidator = make_shared( + HERE(), resources, config, storage_manager); throw_if_not_ok(fragment_consolidator->consolidate_fragments( array_name, encryption_type, From 8ab80ffe5fb8e7d308b5707ed6a751d65c62cd72 Mon Sep 17 00:00:00 2001 From: Ypatia Tsavliri Date: Tue, 23 Jul 2024 12:54:19 +0300 Subject: [PATCH 4/6] Use relative paths in serialization --- test/support/src/serialization_wrappers.cc | 2 ++ test/support/src/serialization_wrappers.h | 1 + tiledb/sm/serialization/consolidation.cc | 23 +++++++++++++++------- tiledb/sm/serialization/consolidation.h | 7 ++++++- 4 files changed, 25 insertions(+), 8 deletions(-) diff --git a/test/support/src/serialization_wrappers.cc b/test/support/src/serialization_wrappers.cc index eb3b6786d7e..9a9cd8397ab 100644 --- a/test/support/src/serialization_wrappers.cc +++ b/test/support/src/serialization_wrappers.cc @@ -226,6 +226,7 @@ void tiledb_subarray_serialize( void tiledb_array_consolidation_request_wrapper( tiledb_ctx_t* ctx, tiledb_serialization_type_t serialize_type, + const std::string& array_uri, const std::vector* fragment_uris_in, std::vector* fragment_uris_out) { // Serialize and Deserialize @@ -238,6 +239,7 @@ void tiledb_array_consolidation_request_wrapper( auto [config, fragment_uris_deser] = serialization::array_consolidation_request_deserialize( + array_uri, static_cast(serialize_type), buffer->buffer()); diff --git a/test/support/src/serialization_wrappers.h b/test/support/src/serialization_wrappers.h index 907ce2e7e7d..eaf1227c2ab 100644 --- a/test/support/src/serialization_wrappers.h +++ b/test/support/src/serialization_wrappers.h @@ -146,6 +146,7 @@ void tiledb_subarray_serialize( void tiledb_array_consolidation_request_wrapper( tiledb_ctx_t* ctx, tiledb_serialization_type_t serialize_type, + const std::string& array_uri, const std::vector* fragment_uris_in, std::vector* fragment_uris_out); diff --git a/tiledb/sm/serialization/consolidation.cc b/tiledb/sm/serialization/consolidation.cc index ba041e5788c..75b6cabb7ac 100644 --- a/tiledb/sm/serialization/consolidation.cc +++ b/tiledb/sm/serialization/consolidation.cc @@ -94,14 +94,19 @@ void array_consolidation_request_to_capnp( auto fragment_list_builder = array_consolidation_request_builder->initFragments( fragment_uris->size()); - for (size_t i = 0; i < fragment_uris->size(); i++) { - fragment_list_builder.set(i, (*fragment_uris)[i]); + size_t i = 0; + for (auto& fragment_uri : *fragment_uris) { + const auto& relative_fragment_uri = + serialize_array_uri_to_relative(URI(fragment_uri)); + fragment_list_builder.set(i, relative_fragment_uri); + i++; } } } std::pair>> array_consolidation_request_from_capnp( + const std::string& array_uri, const capnp::ArrayConsolidationRequest::Reader& array_consolidation_request_reader) { auto config_reader = array_consolidation_request_reader.getConfig(); @@ -113,7 +118,8 @@ array_consolidation_request_from_capnp( auto fragment_reader = array_consolidation_request_reader.getFragments(); fragment_uris.reserve(fragment_reader.size()); for (const auto& fragment_uri : fragment_reader) { - fragment_uris.emplace_back(fragment_uri); + fragment_uris.emplace_back(deserialize_array_uri_to_absolute( + fragment_uri.cStr(), URI(array_uri))); } return {*decoded_config, fragment_uris}; @@ -180,7 +186,9 @@ void array_consolidation_request_serialize( std::pair>> array_consolidation_request_deserialize( - SerializationType serialize_type, const Buffer& serialized_buffer) { + const std::string& array_uri, + SerializationType serialize_type, + const Buffer& serialized_buffer) { try { switch (serialize_type) { case SerializationType::JSON: { @@ -196,7 +204,7 @@ array_consolidation_request_deserialize( array_consolidation_request_reader = array_consolidation_request_builder.asReader(); return array_consolidation_request_from_capnp( - array_consolidation_request_reader); + array_uri, array_consolidation_request_reader); break; } case SerializationType::CAPNP: { @@ -209,7 +217,7 @@ array_consolidation_request_deserialize( array_consolidation_request_reader = reader.getRoot(); return array_consolidation_request_from_capnp( - array_consolidation_request_reader); + array_uri, array_consolidation_request_reader); break; } default: { @@ -472,7 +480,8 @@ void array_consolidation_request_serialize( } std::pair>> -array_consolidation_request_deserialize(SerializationType, const Buffer&) { +array_consolidation_request_deserialize( + const std::string&, SerializationType, const Buffer&) { throw ConsolidationSerializationDisabledException(); } diff --git a/tiledb/sm/serialization/consolidation.h b/tiledb/sm/serialization/consolidation.h index 3be366a4dc6..e57e7b7f1ba 100644 --- a/tiledb/sm/serialization/consolidation.h +++ b/tiledb/sm/serialization/consolidation.h @@ -57,12 +57,14 @@ namespace serialization { /** * Convert Cap'n Proto message to Consolidation request * + * @param array_uri URI of the array * @param consolidation_req_reader cap'n proto class. * @return {config, fragment_uris} config object to deserialize into, and the * uris of the fragments to be consolidated if any */ std::pair>> array_consolidation_request_from_capnp( + const std::string& array_uri, const capnp::ArrayConsolidationRequest::Reader& consolidation_req_reader); /** @@ -97,6 +99,7 @@ void array_consolidation_request_serialize( /** * Deserialize consolidation request via Cap'n Proto * + * @param array_uri URI of the array * @param serialize_type format the data is serialized in: Cap'n Proto of JSON. * @param serialized_buffer buffer to read serialized bytes from. * @return {config, fragment_uris} config object to deserialize into, and the @@ -104,7 +107,9 @@ void array_consolidation_request_serialize( */ std::pair>> array_consolidation_request_deserialize( - SerializationType serialize_type, const Buffer& serialized_buffer); + const std::string& array_uri, + SerializationType serialize_type, + const Buffer& serialized_buffer); /** * Serialize a consolidation plan request via Cap'n Proto. From 5e455b1565db7495e47ca38a644abdfeaa182d5c Mon Sep 17 00:00:00 2001 From: Ypatia Tsavliri Date: Wed, 24 Jul 2024 10:58:01 +0300 Subject: [PATCH 5/6] Fix rebase issues --- tiledb/sm/consolidator/consolidator.cc | 5 ++--- tiledb/sm/rest/rest_client.h | 3 ++- tiledb/sm/rest/rest_client_remote.cc | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/tiledb/sm/consolidator/consolidator.cc b/tiledb/sm/consolidator/consolidator.cc index 8e5b30084c3..427db15ffa2 100644 --- a/tiledb/sm/consolidator/consolidator.cc +++ b/tiledb/sm/consolidator/consolidator.cc @@ -213,9 +213,8 @@ void Consolidator::fragments_consolidate( } if (array_uri.is_tiledb()) { - throw_if_not_ok( - storage_manager->resources().rest_client()->post_consolidation_to_rest( - array_uri, config, &fragment_uris)); + throw_if_not_ok(resources.rest_client()->post_consolidation_to_rest( + array_uri, config, &fragment_uris)); } else { // Check if array exists if (object_type(resources, array_uri) != ObjectType::ARRAY) { diff --git a/tiledb/sm/rest/rest_client.h b/tiledb/sm/rest/rest_client.h index c4e6a2dc106..f551834fe3e 100644 --- a/tiledb/sm/rest/rest_client.h +++ b/tiledb/sm/rest/rest_client.h @@ -466,7 +466,8 @@ class RestClient { } /// Operation disabled in base class. - inline virtual Status post_consolidation_to_rest(const URI&, const Config&, const std::vector*) { + inline virtual Status post_consolidation_to_rest( + const URI&, const Config&, const std::vector* = nullptr) { throw RestClientDisabledException(); } diff --git a/tiledb/sm/rest/rest_client_remote.cc b/tiledb/sm/rest/rest_client_remote.cc index fce2060770a..22e87a5e7e8 100644 --- a/tiledb/sm/rest/rest_client_remote.cc +++ b/tiledb/sm/rest/rest_client_remote.cc @@ -1531,7 +1531,7 @@ Status RestClientRemote::ensure_json_null_delimited_string(Buffer* buffer) { return Status::Ok(); } -Status RestClient::post_consolidation_to_rest( +Status RestClientRemote::post_consolidation_to_rest( const URI& uri, const Config& config, const std::vector* fragment_uris) { From 65c1812835bc3da6471fb0d64a6a74395c6ab8ba Mon Sep 17 00:00:00 2001 From: Ypatia Tsavliri Date: Wed, 4 Sep 2024 14:37:09 +0300 Subject: [PATCH 6/6] Revert removing dynamic_cast --- tiledb/sm/consolidator/consolidator.cc | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tiledb/sm/consolidator/consolidator.cc b/tiledb/sm/consolidator/consolidator.cc index 427db15ffa2..16ec190a873 100644 --- a/tiledb/sm/consolidator/consolidator.cc +++ b/tiledb/sm/consolidator/consolidator.cc @@ -250,8 +250,10 @@ void Consolidator::fragments_consolidate( } // Consolidate - auto fragment_consolidator = make_shared( - HERE(), resources, config, storage_manager); + auto consolidator = Consolidator::create( + resources, ConsolidationMode::FRAGMENT, config, storage_manager); + auto fragment_consolidator = + dynamic_cast(consolidator.get()); throw_if_not_ok(fragment_consolidator->consolidate_fragments( array_name, encryption_type,