diff --git a/test/src/unit-capi-consolidation.cc b/test/src/unit-capi-consolidation.cc index 36e7d485050..f91c3ba17b6 100644 --- a/test/src/unit-capi-consolidation.cc +++ b/test/src/unit-capi-consolidation.cc @@ -7169,7 +7169,7 @@ TEST_CASE_METHOD( TEST_CASE_METHOD( ConsolidationFx, "C API: Test consolidation, dense split fragments", - "[capi][consolidation][dense][split-fragments][non-rest]") { + "[capi][consolidation][dense][split-fragments][rest]") { remove_dense_array(); create_dense_array(); write_dense_subarray(1, 2, 1, 2); @@ -7253,7 +7253,7 @@ TEST_CASE_METHOD( TEST_CASE_METHOD( ConsolidationFx, "C API: Test consolidation, sparse split fragments", - "[capi][consolidation][sparse][split-fragments][non-rest]") { + "[capi][consolidation][sparse][split-fragments][rest]") { remove_sparse_array(); create_sparse_array(); write_sparse_row(0); diff --git a/test/support/src/serialization_wrappers.cc b/test/support/src/serialization_wrappers.cc index 6590523138c..9a9cd8397ab 100644 --- a/test/support/src/serialization_wrappers.cc +++ b/test/support/src/serialization_wrappers.cc @@ -33,9 +33,12 @@ #include "test/support/src/helpers.h" #include "tiledb/api/c_api/array_schema/array_schema_api_internal.h" +#include "tiledb/api/c_api/buffer/buffer_api_internal.h" +#include "tiledb/api/c_api/context/context_api_internal.h" #include "tiledb/sm/c_api/tiledb.h" #include "tiledb/sm/c_api/tiledb_serialization.h" #include "tiledb/sm/c_api/tiledb_struct_def.h" +#include "tiledb/sm/serialization/consolidation.h" #include "tiledb/sm/serialization/query.h" #ifdef TILEDB_SERIALIZATION @@ -219,3 +222,30 @@ void tiledb_subarray_serialize( *subarray = deserialized_subarray; #endif } + +void tiledb_array_consolidation_request_wrapper( + tiledb_ctx_t* ctx, + tiledb_serialization_type_t serialize_type, + const std::string& array_uri, + const std::vector* fragment_uris_in, + std::vector* fragment_uris_out) { + // Serialize and Deserialize + auto buffer = tiledb_buffer_handle_t::make_handle(); + serialization::array_consolidation_request_serialize( + ctx->config(), + fragment_uris_in, + static_cast(serialize_type), + &(buffer->buffer())); + + auto [config, fragment_uris_deser] = + serialization::array_consolidation_request_deserialize( + array_uri, + static_cast(serialize_type), + buffer->buffer()); + + tiledb_buffer_handle_t::break_handle(buffer); + + if (fragment_uris_deser.has_value()) { + *fragment_uris_out = fragment_uris_deser.value(); + } +} diff --git a/test/support/src/serialization_wrappers.h b/test/support/src/serialization_wrappers.h index 55317de32d4..eaf1227c2ab 100644 --- a/test/support/src/serialization_wrappers.h +++ b/test/support/src/serialization_wrappers.h @@ -35,6 +35,7 @@ #define TILEDB_TEST_SERIALIZATION_WRAPPERS_H #include +#include #include "tiledb/sm/c_api/tiledb.h" #include "tiledb/sm/c_api/tiledb_serialization.h" @@ -141,4 +142,12 @@ int tiledb_fragment_info_serialize( */ void tiledb_subarray_serialize( tiledb_ctx_t* ctx, tiledb_array_t* array, tiledb_subarray_t** subarray); + +void tiledb_array_consolidation_request_wrapper( + tiledb_ctx_t* ctx, + tiledb_serialization_type_t serialize_type, + const std::string& array_uri, + const std::vector* fragment_uris_in, + std::vector* fragment_uris_out); + #endif // TILEDB_TEST_SERIALIZATION_WRAPPERS_H diff --git a/tiledb/sm/c_api/tiledb.cc b/tiledb/sm/c_api/tiledb.cc index 71ac3033e7d..f17e753b7ac 100644 --- a/tiledb/sm/c_api/tiledb.cc +++ b/tiledb/sm/c_api/tiledb.cc @@ -1709,14 +1709,32 @@ int32_t tiledb_array_create( int32_t tiledb_array_consolidate( tiledb_ctx_t* ctx, const char* array_uri, tiledb_config_t* config) { + // Validate input arguments + api::ensure_context_is_valid(ctx); api::ensure_config_is_valid_if_present(config); + + auto uri = tiledb::sm::URI(array_uri); + if (uri.is_invalid()) { + throw api::CAPIStatusException( + "Failed to consolidate fragments; Invalid input array uri"); + } + + auto input_config = (config == nullptr) ? ctx->config() : config->config(); + if (uri.is_tiledb() && + tiledb::sm::Consolidator::mode_from_config(input_config) == + tiledb::sm::ConsolidationMode::FRAGMENT) { + throw api::CAPIStatusException( + "Please use tiledb_array_consolidate_fragments API for consolidating " + "fragments on remote arrays."); + } + tiledb::sm::Consolidator::array_consolidate( ctx->resources(), array_uri, tiledb::sm::EncryptionType::NO_ENCRYPTION, nullptr, 0, - (config == nullptr) ? ctx->config() : config->config(), + input_config, ctx->storage_manager()); return TILEDB_OK; } @@ -1727,7 +1745,33 @@ int32_t tiledb_array_consolidate_fragments( const char** fragment_uris, const uint64_t num_fragments, tiledb_config_t* config) { - // Sanity checks + // Validate input arguments + api::ensure_context_is_valid(ctx); + api::ensure_config_is_valid_if_present(config); + + if (fragment_uris == nullptr) { + throw api::CAPIStatusException( + "Failed to consolidate fragments; Invalid input fragment list"); + } + + auto uri = tiledb::sm::URI(array_uri); + if (uri.is_invalid()) { + throw api::CAPIStatusException( + "Failed to consolidate fragments; Invalid input array uri"); + } + + if (num_fragments < 1) { + throw api::CAPIStatusException( + "Failed to consolidate fragments; Invalid input number of fragments"); + } + + for (size_t i = 0; i < num_fragments; i++) { + if (tiledb::sm::URI(fragment_uris[i]).is_invalid()) { + throw api::CAPIStatusException( + "Failed to consolidate fragments; Invalid uri(s) in input fragment " + "list"); + } + } // Convert the list of fragments to a vector std::vector uris; diff --git a/tiledb/sm/consolidator/consolidator.cc b/tiledb/sm/consolidator/consolidator.cc index a003a20fc5b..16ec190a873 100644 --- a/tiledb/sm/consolidator/consolidator.cc +++ b/tiledb/sm/consolidator/consolidator.cc @@ -151,16 +151,16 @@ void Consolidator::array_consolidate( throw ConsolidatorException("Cannot consolidate array; Invalid URI"); } - // Check if array exists - if (object_type(resources, array_uri) != ObjectType::ARRAY) { - throw ConsolidatorException( - "Cannot consolidate array; Array does not exist"); - } - if (array_uri.is_tiledb()) { throw_if_not_ok( resources.rest_client()->post_consolidation_to_rest(array_uri, config)); } else { + // Check if array exists + if (object_type(resources, array_uri) != ObjectType::ARRAY) { + throw ConsolidatorException( + "Cannot consolidate array; Array does not exist"); + } + // Get encryption key from config std::string encryption_key_from_cfg; if (!encryption_key) { @@ -212,46 +212,55 @@ void Consolidator::fragments_consolidate( throw ConsolidatorException("Cannot consolidate array; Invalid URI"); } - // Check if array exists - if (object_type(resources, array_uri) != ObjectType::ARRAY) { - throw ConsolidatorException( - "Cannot consolidate array; Array does not exist"); - } + if (array_uri.is_tiledb()) { + throw_if_not_ok(resources.rest_client()->post_consolidation_to_rest( + array_uri, config, &fragment_uris)); + } else { + // Check if array exists + if (object_type(resources, array_uri) != ObjectType::ARRAY) { + throw ConsolidatorException( + "Cannot consolidate array; Array does not exist"); + } - // Get encryption key from config - std::string encryption_key_from_cfg; - if (!encryption_key) { - bool found = false; - encryption_key_from_cfg = config.get("sm.encryption_key", &found); - assert(found); - } + // Get encryption key from config + std::string encryption_key_from_cfg; + if (!encryption_key) { + bool found = false; + encryption_key_from_cfg = config.get("sm.encryption_key", &found); + assert(found); + } + + if (!encryption_key_from_cfg.empty()) { + encryption_key = encryption_key_from_cfg.c_str(); + key_length = static_cast(encryption_key_from_cfg.size()); + std::string encryption_type_from_cfg; + bool found = false; + encryption_type_from_cfg = config.get("sm.encryption_type", &found); + assert(found); + auto [st, et] = encryption_type_enum(encryption_type_from_cfg); + throw_if_not_ok(st); + encryption_type = et.value(); - if (!encryption_key_from_cfg.empty()) { - encryption_key = encryption_key_from_cfg.c_str(); - key_length = static_cast(encryption_key_from_cfg.size()); - std::string encryption_type_from_cfg; - bool found = false; - encryption_type_from_cfg = config.get("sm.encryption_type", &found); - assert(found); - auto [st, et] = encryption_type_enum(encryption_type_from_cfg); - throw_if_not_ok(st); - encryption_type = et.value(); - - if (!EncryptionKey::is_valid_key_length( - encryption_type, - static_cast(encryption_key_from_cfg.size()))) { - encryption_key = nullptr; - key_length = 0; + if (!EncryptionKey::is_valid_key_length( + encryption_type, + static_cast(encryption_key_from_cfg.size()))) { + encryption_key = nullptr; + key_length = 0; + } } - } - // Consolidate - auto consolidator = Consolidator::create( - resources, ConsolidationMode::FRAGMENT, config, storage_manager); - auto fragment_consolidator = - dynamic_cast(consolidator.get()); - throw_if_not_ok(fragment_consolidator->consolidate_fragments( - array_name, encryption_type, encryption_key, key_length, fragment_uris)); + // Consolidate + auto consolidator = Consolidator::create( + resources, ConsolidationMode::FRAGMENT, config, storage_manager); + auto fragment_consolidator = + dynamic_cast(consolidator.get()); + throw_if_not_ok(fragment_consolidator->consolidate_fragments( + array_name, + encryption_type, + encryption_key, + key_length, + fragment_uris)); + } } void Consolidator::write_consolidated_commits_file( diff --git a/tiledb/sm/rest/rest_client.h b/tiledb/sm/rest/rest_client.h index 80d33d8a841..f551834fe3e 100644 --- a/tiledb/sm/rest/rest_client.h +++ b/tiledb/sm/rest/rest_client.h @@ -466,7 +466,8 @@ class RestClient { } /// Operation disabled in base class. - inline virtual Status post_consolidation_to_rest(const URI&, const Config&) { + inline virtual Status post_consolidation_to_rest( + const URI&, const Config&, const std::vector* = nullptr) { throw RestClientDisabledException(); } diff --git a/tiledb/sm/rest/rest_client_remote.cc b/tiledb/sm/rest/rest_client_remote.cc index afe69ea87a3..22e87a5e7e8 100644 --- a/tiledb/sm/rest/rest_client_remote.cc +++ b/tiledb/sm/rest/rest_client_remote.cc @@ -1532,10 +1532,13 @@ Status RestClientRemote::ensure_json_null_delimited_string(Buffer* buffer) { } Status RestClientRemote::post_consolidation_to_rest( - const URI& uri, const Config& config) { + const URI& uri, + const Config& config, + const std::vector* fragment_uris) { Buffer buff; - RETURN_NOT_OK(serialization::array_consolidation_request_serialize( - config, serialization_type_, &buff)); + serialization::array_consolidation_request_serialize( + config, fragment_uris, serialization_type_, &buff); + // Wrap in a list BufferList serialized; RETURN_NOT_OK(serialized.add_buffer(std::move(buff))); diff --git a/tiledb/sm/rest/rest_client_remote.h b/tiledb/sm/rest/rest_client_remote.h index d40cdda2121..7555d41f68a 100644 --- a/tiledb/sm/rest/rest_client_remote.h +++ b/tiledb/sm/rest/rest_client_remote.h @@ -432,10 +432,14 @@ class RestClientRemote : public RestClient { * * @param uri Array URI * @param config config + * @param fragment_uris The uris of the fragments to be consolidated if this + * is a request for fragment list consolidation * @return */ Status post_consolidation_to_rest( - const URI& uri, const Config& config) override; + const URI& uri, + const Config& config, + const std::vector* fragment_uris = nullptr) override; /** * Post array vacuum request to the REST server. diff --git a/tiledb/sm/serialization/consolidation.cc b/tiledb/sm/serialization/consolidation.cc index e6c06c1ecaf..75b6cabb7ac 100644 --- a/tiledb/sm/serialization/consolidation.cc +++ b/tiledb/sm/serialization/consolidation.cc @@ -43,6 +43,7 @@ // clang-format on #include "tiledb/common/logger_public.h" +#include "tiledb/sm/consolidator/consolidator.h" #include "tiledb/sm/enums/serialization_type.h" #include "tiledb/sm/serialization/config.h" #include "tiledb/sm/serialization/consolidation.h" @@ -71,34 +72,73 @@ class ConsolidationSerializationDisabledException #ifdef TILEDB_SERIALIZATION -Status array_consolidation_request_to_capnp( +void array_consolidation_request_to_capnp( const Config& config, + const std::vector* fragment_uris, capnp::ArrayConsolidationRequest::Builder* array_consolidation_request_builder) { + // Validate input arguments to be sure that what we are serializing make sense + auto mode = Consolidator::mode_from_config(config); + if (mode != ConsolidationMode::FRAGMENT && + (fragment_uris != nullptr && !fragment_uris->empty())) { + throw ConsolidationSerializationException( + "[array_consolidation_request_to_capnp] Error serializing " + "consolidation request. A non-empty fragment list should only be " + "provided for fragment consolidation."); + } + auto config_builder = array_consolidation_request_builder->initConfig(); - RETURN_NOT_OK(config_to_capnp(config, &config_builder)); - return Status::Ok(); + throw_if_not_ok(config_to_capnp(config, &config_builder)); + + if (fragment_uris != nullptr && !fragment_uris->empty()) { + auto fragment_list_builder = + array_consolidation_request_builder->initFragments( + fragment_uris->size()); + size_t i = 0; + for (auto& fragment_uri : *fragment_uris) { + const auto& relative_fragment_uri = + serialize_array_uri_to_relative(URI(fragment_uri)); + fragment_list_builder.set(i, relative_fragment_uri); + i++; + } + } } -Status array_consolidation_request_from_capnp( +std::pair>> +array_consolidation_request_from_capnp( + const std::string& array_uri, const capnp::ArrayConsolidationRequest::Reader& - array_consolidation_request_reader, - tdb_unique_ptr* config) { + array_consolidation_request_reader) { auto config_reader = array_consolidation_request_reader.getConfig(); - RETURN_NOT_OK(config_from_capnp(config_reader, config)); - return Status::Ok(); + tdb_unique_ptr decoded_config = nullptr; + throw_if_not_ok(config_from_capnp(config_reader, &decoded_config)); + + std::vector fragment_uris; + if (array_consolidation_request_reader.hasFragments()) { + auto fragment_reader = array_consolidation_request_reader.getFragments(); + fragment_uris.reserve(fragment_reader.size()); + for (const auto& fragment_uri : fragment_reader) { + fragment_uris.emplace_back(deserialize_array_uri_to_absolute( + fragment_uri.cStr(), URI(array_uri))); + } + + return {*decoded_config, fragment_uris}; + } else { + return {*decoded_config, std::nullopt}; + } } -Status array_consolidation_request_serialize( +void array_consolidation_request_serialize( const Config& config, + const std::vector* fragment_uris, SerializationType serialize_type, Buffer* serialized_buffer) { try { ::capnp::MallocMessageBuilder message; capnp::ArrayConsolidationRequest::Builder ArrayConsolidationRequestBuilder = message.initRoot(); - RETURN_NOT_OK(array_consolidation_request_to_capnp( - config, &ArrayConsolidationRequestBuilder)); + array_consolidation_request_to_capnp( + config, fragment_uris, &ArrayConsolidationRequestBuilder); serialized_buffer->reset_size(); serialized_buffer->reset_offset(); @@ -110,45 +150,46 @@ Status array_consolidation_request_serialize( const auto json_len = capnp_json.size(); const char nul = '\0'; // size does not include needed null terminator, so add +1 - RETURN_NOT_OK(serialized_buffer->realloc(json_len + 1)); - RETURN_NOT_OK(serialized_buffer->write(capnp_json.cStr(), json_len)); - RETURN_NOT_OK(serialized_buffer->write(&nul, 1)); + throw_if_not_ok(serialized_buffer->realloc(json_len + 1)); + throw_if_not_ok(serialized_buffer->write(capnp_json.cStr(), json_len)); + throw_if_not_ok(serialized_buffer->write(&nul, 1)); break; } case SerializationType::CAPNP: { kj::Array<::capnp::word> protomessage = messageToFlatArray(message); kj::ArrayPtr message_chars = protomessage.asChars(); const auto nbytes = message_chars.size(); - RETURN_NOT_OK(serialized_buffer->realloc(nbytes)); - RETURN_NOT_OK(serialized_buffer->write(message_chars.begin(), nbytes)); + throw_if_not_ok(serialized_buffer->realloc(nbytes)); + throw_if_not_ok( + serialized_buffer->write(message_chars.begin(), nbytes)); break; } default: { - return LOG_STATUS(Status_SerializationError( - "Error serializing config; Unknown serialization type " - "passed")); + throw ConsolidationSerializationException( + "[array_consolidation_request_serialize] Error serializing config; " + "Unknown serialization type passed"); } } } catch (kj::Exception& e) { - return LOG_STATUS(Status_SerializationError( - "Error serializing config; kj::Exception: " + - std::string(e.getDescription().cStr()))); + throw ConsolidationSerializationException( + "[array_consolidation_request_serialize] Error serializing config; " + "kj::Exception: " + + std::string(e.getDescription().cStr())); } catch (std::exception& e) { - return LOG_STATUS(Status_SerializationError( - "Error serializing config; exception " + std::string(e.what()))); + throw ConsolidationSerializationException( + "[array_consolidation_request_serialize] Error serializing config; " + "exception " + + std::string(e.what())); } - - return Status::Ok(); } -Status array_consolidation_request_deserialize( - Config** config, +std::pair>> +array_consolidation_request_deserialize( + const std::string& array_uri, SerializationType serialize_type, const Buffer& serialized_buffer) { try { - tdb_unique_ptr decoded_config = nullptr; - switch (serialize_type) { case SerializationType::JSON: { ::capnp::JsonCodec json; @@ -162,8 +203,8 @@ Status array_consolidation_request_deserialize( capnp::ArrayConsolidationRequest::Reader array_consolidation_request_reader = array_consolidation_request_builder.asReader(); - RETURN_NOT_OK(array_consolidation_request_from_capnp( - array_consolidation_request_reader, &decoded_config)); + return array_consolidation_request_from_capnp( + array_uri, array_consolidation_request_reader); break; } case SerializationType::CAPNP: { @@ -175,32 +216,27 @@ Status array_consolidation_request_deserialize( capnp::ArrayConsolidationRequest::Reader array_consolidation_request_reader = reader.getRoot(); - RETURN_NOT_OK(array_consolidation_request_from_capnp( - array_consolidation_request_reader, &decoded_config)); + return array_consolidation_request_from_capnp( + array_uri, array_consolidation_request_reader); break; } default: { - return LOG_STATUS(Status_SerializationError( - "Error deserializing config; Unknown serialization type " - "passed")); + throw ConsolidationSerializationException( + "[array_consolidation_request_deserialize] Error deserializing " + "config; Unknown serialization type passed"); } } - - if (decoded_config == nullptr) - return LOG_STATUS(Status_SerializationError( - "Error serializing config; deserialized config is null")); - - *config = decoded_config.release(); } catch (kj::Exception& e) { - return LOG_STATUS(Status_SerializationError( - "Error deserializing config; kj::Exception: " + - std::string(e.getDescription().cStr()))); + throw ConsolidationSerializationException( + "[array_consolidation_request_deserialize] Error deserializing config; " + "kj::Exception: " + + std::string(e.getDescription().cStr())); } catch (std::exception& e) { - return LOG_STATUS(Status_SerializationError( - "Error deserializing config; exception " + std::string(e.what()))); + throw ConsolidationSerializationException( + "[array_consolidation_request_deserialize] Error deserializing config; " + "exception " + + std::string(e.what())); } - - return Status::Ok(); } void consolidation_plan_request_to_capnp( @@ -435,16 +471,18 @@ std::vector> deserialize_consolidation_plan_response( #else -Status array_consolidation_request_serialize( - const Config&, SerializationType, Buffer*) { - return LOG_STATUS(Status_SerializationError( - "Cannot serialize; serialization not enabled.")); +void array_consolidation_request_serialize( + const Config&, + const std::vector*, + SerializationType, + Buffer*) { + throw ConsolidationSerializationDisabledException(); } -Status array_consolidation_request_deserialize( - Config**, SerializationType, const Buffer&) { - return LOG_STATUS(Status_SerializationError( - "Cannot deserialize; serialization not enabled.")); +std::pair>> +array_consolidation_request_deserialize( + const std::string&, SerializationType, const Buffer&) { + throw ConsolidationSerializationDisabledException(); } void serialize_consolidation_plan_request( diff --git a/tiledb/sm/serialization/consolidation.h b/tiledb/sm/serialization/consolidation.h index d27c6c6e346..e57e7b7f1ba 100644 --- a/tiledb/sm/serialization/consolidation.h +++ b/tiledb/sm/serialization/consolidation.h @@ -57,23 +57,27 @@ namespace serialization { /** * Convert Cap'n Proto message to Consolidation request * + * @param array_uri URI of the array * @param consolidation_req_reader cap'n proto class. - * @param config config object to deserialize into. - * @return Status + * @return {config, fragment_uris} config object to deserialize into, and the + * uris of the fragments to be consolidated if any */ -Status array_consolidation_request_from_capnp( - const capnp::ArrayConsolidationRequest::Reader& consolidation_req_reader, - Config* Config); +std::pair>> +array_consolidation_request_from_capnp( + const std::string& array_uri, + const capnp::ArrayConsolidationRequest::Reader& consolidation_req_reader); /** * Convert Consolidation Request to Cap'n Proto message. * * @param config config to serialize info from + * @param fragment_uris The uris of the fragments to be consolidated if this is + * a request for fragment list consolidation * @param consolidation_req_builder cap'n proto class. - * @return Status */ -Status array_consolidation_request_to_capnp( +void array_consolidation_request_to_capnp( const Config& config, + const std::vector* fragment_uris, capnp::ArrayConsolidationRequest::Builder* consolidation_req_builder); #endif @@ -81,25 +85,29 @@ Status array_consolidation_request_to_capnp( * Serialize a consolidation request via Cap'n Proto. * * @param config config object to get info to serialize. + * @param fragment_uris The uris of the fragments to be consolidated if this is + * a request for fragment list consolidation * @param serialize_type format to serialize into Cap'n Proto or JSON. * @param serialized_buffer buffer to store serialized bytes in. - * @return Status */ -Status array_consolidation_request_serialize( +void array_consolidation_request_serialize( const Config& config, + const std::vector* fragment_uris, SerializationType serialize_type, Buffer* serialized_buffer); /** * Deserialize consolidation request via Cap'n Proto * - * @param config config object to store the deserialized info in. + * @param array_uri URI of the array * @param serialize_type format the data is serialized in: Cap'n Proto of JSON. * @param serialized_buffer buffer to read serialized bytes from. - * @return Status + * @return {config, fragment_uris} config object to deserialize into, and the + * uris of the fragments to be consolidated if any */ -Status array_consolidation_request_deserialize( - Config** config, +std::pair>> +array_consolidation_request_deserialize( + const std::string& array_uri, SerializationType serialize_type, const Buffer& serialized_buffer);