Skip to content

Commit

Permalink
Set traversal limit in enumeration serialization
Browse files Browse the repository at this point in the history
  • Loading branch information
ypatia committed Nov 6, 2024
1 parent 493ab82 commit 9f68072
Show file tree
Hide file tree
Showing 10 changed files with 51 additions and 13 deletions.
2 changes: 1 addition & 1 deletion test/src/unit-enumerations.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2940,7 +2940,7 @@ shared_ptr<ArraySchemaEvolution> EnumerationFx::ser_des_array_schema_evolution(

ArraySchemaEvolution* ret;
throw_if_not_ok(serialization::array_schema_evolution_deserialize(
&ret, stype, buf, memory_tracker_));
&ret, cfg_, stype, buf, memory_tracker_));

return shared_ptr<ArraySchemaEvolution>(ret);
}
Expand Down
2 changes: 1 addition & 1 deletion test/src/unit-request-handlers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ HandleLoadArraySchemaRequestFx::call_handler(
REQUIRE(rval == TILEDB_OK);

return serialization::deserialize_load_array_schema_response(
uri_, stype, resp_buf->buffer(), memory_tracker_);
uri_, cfg_, stype, resp_buf->buffer(), memory_tracker_);
}

shared_ptr<ArraySchema> HandleQueryPlanRequestFx::create_schema() {
Expand Down
1 change: 1 addition & 0 deletions tiledb/sm/c_api/tiledb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1572,6 +1572,7 @@ int32_t tiledb_deserialize_array_schema_evolution(
ctx,
tiledb::sm::serialization::array_schema_evolution_deserialize(
&((*array_schema_evolution)->array_schema_evolution_),
ctx->config(),
(tiledb::sm::SerializationType)serialize_type,
buffer->buffer(),
memory_tracker))) {
Expand Down
4 changes: 2 additions & 2 deletions tiledb/sm/rest/rest_client_remote.cc
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ RestClientRemote::post_array_schema_from_rest(
// Ensure data has a null delimiter for cap'n proto if using JSON
throw_if_not_ok(ensure_json_null_delimited_string(&returned_data));
return serialization::deserialize_load_array_schema_response(
uri, serialization_type_, returned_data, memory_tracker_);
uri, config, serialization_type_, returned_data, memory_tracker_);
}

Status RestClientRemote::post_array_schema_to_rest(
Expand Down Expand Up @@ -616,7 +616,7 @@ RestClientRemote::post_enumerations_from_rest(
// Ensure data has a null delimiter for cap'n proto if using JSON
throw_if_not_ok(ensure_json_null_delimited_string(&returned_data));
return serialization::deserialize_load_enumerations_response(
array_schema, serialization_type_, returned_data, memory_tracker);
array_schema, config, serialization_type_, returned_data, memory_tracker);
}

void RestClientRemote::post_query_plan_from_rest(
Expand Down
17 changes: 14 additions & 3 deletions tiledb/sm/serialization/array_schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1824,6 +1824,7 @@ std::tuple<
std::unordered_map<std::string, shared_ptr<ArraySchema>>>
deserialize_load_array_schema_response(
const URI& uri,
const Config& config,
SerializationType serialization_type,
span<const char> data,
shared_ptr<MemoryTracker> memory_tracker) {
Expand All @@ -1840,10 +1841,19 @@ deserialize_load_array_schema_response(
uri, reader, memory_tracker);
}
case SerializationType::CAPNP: {
// Set traversal limit from config
uint64_t limit =
config.get<uint64_t>("rest.capnp_traversal_limit").value();
::capnp::ReaderOptions readerOptions;
// capnp uses the limit in words
readerOptions.traversalLimitInWords = limit / sizeof(::capnp::word);

const auto mBytes = reinterpret_cast<const kj::byte*>(data.data());
::capnp::FlatArrayMessageReader array_reader(kj::arrayPtr(
reinterpret_cast<const ::capnp::word*>(mBytes),
data.size() / sizeof(::capnp::word)));
::capnp::FlatArrayMessageReader array_reader(
kj::arrayPtr(
reinterpret_cast<const ::capnp::word*>(mBytes),
data.size() / sizeof(::capnp::word)),
readerOptions);
auto reader = array_reader.getRoot<capnp::LoadArraySchemaResponse>();
return load_array_schema_response_from_capnp(
uri, reader, memory_tracker);
Expand Down Expand Up @@ -1926,6 +1936,7 @@ std::tuple<
std::unordered_map<std::string, shared_ptr<ArraySchema>>>
deserialize_load_array_schema_response(
const URI&,
const Config&,
SerializationType,
span<const char>,
shared_ptr<MemoryTracker>) {
Expand Down
1 change: 1 addition & 0 deletions tiledb/sm/serialization/array_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ std::tuple<
std::unordered_map<std::string, shared_ptr<ArraySchema>>>
deserialize_load_array_schema_response(
const URI& uri,
const Config& config,
SerializationType serialization_type,
span<const char> data,
shared_ptr<MemoryTracker> memory_tracker);
Expand Down
17 changes: 14 additions & 3 deletions tiledb/sm/serialization/array_schema_evolution.cc
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ Status array_schema_evolution_serialize(

Status array_schema_evolution_deserialize(
ArraySchemaEvolution** array_schema_evolution,
const Config& config,
SerializationType serialize_type,
span<const char> serialized_buffer,
shared_ptr<MemoryTracker> memory_tracker) {
Expand All @@ -312,11 +313,20 @@ Status array_schema_evolution_deserialize(
break;
}
case SerializationType::CAPNP: {
// Set traversal limit from config
uint64_t limit =
config.get<uint64_t>("rest.capnp_traversal_limit").value();
::capnp::ReaderOptions readerOptions;
// capnp uses the limit in words
readerOptions.traversalLimitInWords = limit / sizeof(::capnp::word);

const auto mBytes =
reinterpret_cast<const kj::byte*>(serialized_buffer.data());
::capnp::FlatArrayMessageReader reader(kj::arrayPtr(
reinterpret_cast<const ::capnp::word*>(mBytes),
serialized_buffer.size() / sizeof(::capnp::word)));
::capnp::FlatArrayMessageReader reader(
kj::arrayPtr(
reinterpret_cast<const ::capnp::word*>(mBytes),
serialized_buffer.size() / sizeof(::capnp::word)),
readerOptions);
capnp::ArraySchemaEvolution::Reader array_schema_evolution_reader =
reader.getRoot<capnp::ArraySchemaEvolution>();
decoded_array_schema_evolution = array_schema_evolution_from_capnp(
Expand Down Expand Up @@ -363,6 +373,7 @@ Status array_schema_evolution_serialize(

Status array_schema_evolution_deserialize(
ArraySchemaEvolution**,
const Config&,
SerializationType,
span<const char>,
shared_ptr<MemoryTracker>) {
Expand Down
2 changes: 2 additions & 0 deletions tiledb/sm/serialization/array_schema_evolution.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,15 @@ Status array_schema_evolution_serialize(
/**
* Deserialize an array schema evolution via Cap'n Proto
* @param array_schema_evolution pointer to store evolution object in
* @param config associated config object
* @param serialize_type format to serialize into Cap'n Proto or JSON
* @param serialized_buffer buffer where serialized bytes are stored
* @param memory_tracker memory tracker associated with the evolution object
* @return
*/
Status array_schema_evolution_deserialize(
ArraySchemaEvolution** array_schema_evolution,
const Config& config,
SerializationType serialize_type,
span<const char> serialized_buffer,
shared_ptr<MemoryTracker> memory_tracker);
Expand Down
17 changes: 14 additions & 3 deletions tiledb/sm/serialization/enumeration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ void serialize_load_enumerations_response(
std::unordered_map<std::string, std::vector<shared_ptr<const Enumeration>>>
deserialize_load_enumerations_response(
const ArraySchema& array_schema,
const Config& config,
SerializationType serialize_type,
span<const char> response,
shared_ptr<MemoryTracker> memory_tracker) {
Expand All @@ -362,10 +363,19 @@ deserialize_load_enumerations_response(
reader, array_schema, memory_tracker);
}
case SerializationType::CAPNP: {
// Set traversal limit from config
uint64_t limit =
config.get<uint64_t>("rest.capnp_traversal_limit").value();
::capnp::ReaderOptions readerOptions;
// capnp uses the limit in words
readerOptions.traversalLimitInWords = limit / sizeof(::capnp::word);

const auto mBytes = reinterpret_cast<const kj::byte*>(response.data());
::capnp::FlatArrayMessageReader array_reader(kj::arrayPtr(
reinterpret_cast<const ::capnp::word*>(mBytes),
response.size() / sizeof(::capnp::word)));
::capnp::FlatArrayMessageReader array_reader(
kj::arrayPtr(
reinterpret_cast<const ::capnp::word*>(mBytes),
response.size() / sizeof(::capnp::word)),
readerOptions);
capnp::LoadEnumerationsResponse::Reader reader =
array_reader.getRoot<capnp::LoadEnumerationsResponse>();
return load_enumerations_response_from_capnp(
Expand Down Expand Up @@ -414,6 +424,7 @@ void serialize_load_enumerations_response(
std::unordered_map<std::string, std::vector<shared_ptr<const Enumeration>>>
deserialize_load_enumerations_response(
const Array&,
const Config&,
SerializationType,
span<const char>,
shared_ptr<MemoryTracker>) {
Expand Down
1 change: 1 addition & 0 deletions tiledb/sm/serialization/enumeration.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ void serialize_load_enumerations_response(
std::unordered_map<std::string, std::vector<shared_ptr<const Enumeration>>>
deserialize_load_enumerations_response(
const ArraySchema& array_schema,
const Config& config,
SerializationType serialization_type,
span<const char> response,
shared_ptr<MemoryTracker> memory_tracker);
Expand Down

0 comments on commit 9f68072

Please sign in to comment.