Skip to content

Commit

Permalink
Merge pull request #209 from IntelLabs/175-descriptor-set-optimizatio…
Browse files Browse the repository at this point in the history
…ns-batch-inserts

175 descriptor set optimizations batch inserts
  • Loading branch information
s-gobriel authored Aug 29, 2024
2 parents 0ea1717 + fc936f5 commit a1874e6
Show file tree
Hide file tree
Showing 5 changed files with 453 additions and 85 deletions.
310 changes: 232 additions & 78 deletions src/DescriptorsCommand.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,10 @@ std::string DescriptorsCommand::get_set_path(PMGDQuery &query_tx,
list_arr.append(VDMS_DESC_SET_DIM_PROP);
list_arr.append(VDMS_DESC_SET_ENGIN_PROP);


results["list"] = list_arr;


bool unique = true;

// Query set node
Expand All @@ -94,7 +96,6 @@ std::string DescriptorsCommand::get_set_path(PMGDQuery &query_tx,
true);

Json::Value &query_responses = query.run();

if (query_responses.size() != 1 && query_responses[0].size() != 1) {
throw ExceptionCommand(DescriptorSetError, "PMGD Transaction Error");
}
Expand All @@ -113,7 +114,6 @@ std::string DescriptorsCommand::get_set_path(PMGDQuery &query_tx,
_desc_set_locator[set_name] = set_path;
return set_path;
}

return "";
}

Expand Down Expand Up @@ -341,8 +341,9 @@ AddDescriptor::AddDescriptor() : DescriptorsCommand("AddDescriptor") {
//_use_aws_storage = VDMSConfig::instance()->get_aws_flag();
}

//update to handle multiple descriptors at a go
long AddDescriptor::insert_descriptor(const std::string &blob,
const std::string &set_path, int dim,
const std::string &set_path, int nr_desc,
const std::string &label,
Json::Value &error) {
long id_first;
Expand All @@ -351,21 +352,13 @@ long AddDescriptor::insert_descriptor(const std::string &blob,

VCL::DescriptorSet *desc_set = _dm->get_descriptors_handler(set_path);

if (blob.length() / 4 != dim) {
std::cerr << "AddDescriptor::insert_descriptor: ";
std::cerr << "Dimensions mismatch: ";
std::cerr << blob.length() / 4 << " " << dim << std::endl;
error["info"] = "Blob Dimensions Mismatch";
return -1;
}

if (!label.empty()) {
long label_id = desc_set->get_label_id(label);
long *label_ptr = &label_id;
id_first = desc_set->add((float *)blob.data(), 1, label_ptr);
id_first = desc_set->add((float *)blob.data(), nr_desc, label_ptr);

} else {
id_first = desc_set->add((float *)blob.data(), 1);
id_first = desc_set->add((float *)blob.data(), nr_desc);
}

if (output_vcl_timing) {
Expand Down Expand Up @@ -409,94 +402,257 @@ void AddDescriptor::retrieve_aws_descriptorSet(const std::string &set_path) {
}
}

int AddDescriptor::construct_protobuf(PMGDQuery &query,
const Json::Value &jsoncmd,
const std::string &blob, int grp_id,
Json::Value &error) {
const Json::Value &cmd = jsoncmd[_cmd_name];
int AddDescriptor::add_single_descriptor(PMGDQuery &query,
const Json::Value &jsoncmd,
const std::string &blob, int grp_id,
Json::Value &error){

const std::string set_name = cmd["set"].asString();
const Json::Value &cmd = jsoncmd[_cmd_name];
const std::string set_name = cmd["set"].asString();

Json::Value props = get_value<Json::Value>(cmd, "properties");
Json::Value props = get_value<Json::Value>(cmd, "properties");

std::string label = get_value<std::string>(cmd, "label", "None");
props[VDMS_DESC_LABEL_PROP] = label;
std::string label = get_value<std::string>(cmd, "label", "None");
props[VDMS_DESC_LABEL_PROP] = label;

int dimensions;
const std::string set_path = get_set_path(query, set_name, dimensions);
int dim;
const std::string set_path = get_set_path(query, set_name, dim);

if (set_path.empty()) {
error["info"] = "Set " + set_name + " not found";
error["status"] = RSCommand::Error;
return -1;
}
if (set_path.empty()) {
error["info"] = "Set " + set_name + " not found";
error["status"] = RSCommand::Error;
return -1;
}

// retrieve the descriptor set from AWS here
// operations are currently done in memory with no subsequent write to disk
// so there's no need to re-upload to AWS
if (_use_aws_storage) {
retrieve_aws_descriptorSet(set_path);
}
if (blob.length() / 4 != dim) {
std::cerr << "AddDescriptor::insert_descriptor: ";
std::cerr << "Dimensions mismatch: ";
std::cerr << blob.length() / 4 << " " << dim << std::endl;
error["info"] = "Blob Dimensions Mismatch";
return -1;
}

long id = insert_descriptor(blob, set_path, dimensions, label, error);
// retrieve the descriptor set from AWS here
// operations are currently done in memory with no subsequent write to disk
// so there's no need to re-upload to AWS
if (_use_aws_storage) {
retrieve_aws_descriptorSet(set_path);
}

if (id < 0) {
error["status"] = RSCommand::Error;
long id = insert_descriptor(blob, set_path, 1, label, error);

if (id < 0) {
error["status"] = RSCommand::Error;

if (_use_aws_storage) {
// delete files in set_path
std::uintmax_t n = fs::remove_all(set_path);
std::cout << "Deleted " << n << " files or directories\n";
}

return -1;
}

props[VDMS_DESC_ID_PROP] = Json::Int64(id);

int node_ref = get_value<int>(cmd, "_ref", query.get_available_reference());

query.AddNode(node_ref, VDMS_DESC_TAG, props, Json::nullValue);

// It passed the checker, so it exists.
int set_ref = query.get_available_reference();

Json::Value link;
Json::Value results;
Json::Value list_arr;
list_arr.append(VDMS_DESC_SET_PATH_PROP);
list_arr.append(VDMS_DESC_SET_DIM_PROP);
results["list"] = list_arr;

Json::Value constraints;
Json::Value name_arr;
name_arr.append("==");
name_arr.append(set_name);
constraints[VDMS_DESC_SET_NAME_PROP] = name_arr;

bool unique = true;

// Query set node
query.QueryNode(set_ref, VDMS_DESC_SET_TAG, link, constraints, results,
unique);

if (cmd.isMember("link")) {
add_link(query, cmd["link"], node_ref, VDMS_DESC_EDGE_TAG);
}

Json::Value props_edge;
query.AddEdge(-1, set_ref, node_ref, VDMS_DESC_SET_EDGE_TAG, props_edge);

// TODO: deleting files here causes problems with concurrency (TestRetail.py)
// keeping local copies as a temporary solution
// if(_use_aws_storage)
// {
// //delete files in set_path
// std::uintmax_t n = fs::remove_all(set_path);
// std::cout << "Deleted " << n << " files or directories\n";
// }

return 0;

}

int AddDescriptor::add_descriptor_batch(PMGDQuery &query,
const Json::Value &jsoncmd,
const std::string &blob, int grp_id,
Json::Value &error){

const int FOUR_BYTE_INT = 4;
int expected_blb_size;
int nr_expected_descs;
int dimensions;

//Extract set name
const Json::Value &cmd = jsoncmd[_cmd_name];
const std::string set_name = cmd["set"].asString();

//Json::Value props = get_value<Json::Value>(cmd, "properties");

//extract properties list and get filepath/object location of set
Json::Value prop_list = get_value<Json::Value>(cmd, "batch_properties");
const std::string set_path = get_set_path(query, set_name, dimensions);

if (set_path.empty()) {
error["info"] = "Set " + set_name + " not found";
error["status"] = RSCommand::Error;
return -1;
}

std::string label = get_value<std::string>(cmd, "label", "None");
//props[VDMS_DESC_LABEL_PROP] = label;

// retrieve the descriptor set from AWS here
// operations are currently done in memory with no subsequent write to disk
// so there's no need to re-upload to AWS
if (_use_aws_storage) {
// delete files in set_path
std::uintmax_t n = fs::remove_all(set_path);
std::cout << "Deleted " << n << " files or directories\n";
retrieve_aws_descriptorSet(set_path);
}

return -1;
}
// Note dimensionse are based on a 32 bit integer, hence the /4 math on size
// as the string blob is sized in 8 bit ints.
nr_expected_descs = prop_list.size();
expected_blb_size = nr_expected_descs * dimensions * FOUR_BYTE_INT;

//Verify length of input is matching expectations
if (blob.length() != expected_blb_size) {
std::cerr << "AddDescriptor::insert_descriptor: ";
std::cerr << "Expected Blob Length Does Not Match Input ";
std::cerr << "Input Length: " <<blob.length() << " != " << "Expected Length: " << expected_blb_size << std::endl;
error["info"] = "FV Input Length Mismatch";
return -1;
}

props[VDMS_DESC_ID_PROP] = Json::Int64(id);
long id = insert_descriptor(blob, set_path, nr_expected_descs, label, error);

int node_ref = get_value<int>(cmd, "_ref", query.get_available_reference());
if (id < 0) {
error["status"] = RSCommand::Error;

query.AddNode(node_ref, VDMS_DESC_TAG, props, Json::nullValue);
if (_use_aws_storage) {
// delete files in set_path
std::uintmax_t n = fs::remove_all(set_path);
std::cout << "Deleted " << n << " files or directories\n";
}
error["info"] = "FV Index Insert Failed";
return -1;
}

// It passed the checker, so it exists.
int set_ref = query.get_available_reference();
// It passed the checker, so it exists.
int set_ref = query.get_available_reference();

Json::Value link;
Json::Value results;
Json::Value list_arr;
list_arr.append(VDMS_DESC_SET_PATH_PROP);
list_arr.append(VDMS_DESC_SET_DIM_PROP);
results["list"] = list_arr;

//constraints for getting set node to link to.
Json::Value constraints;
Json::Value name_arr;
name_arr.append("==");
name_arr.append(set_name);
constraints[VDMS_DESC_SET_NAME_PROP] = name_arr;
bool unique = true;

// Query set node-We only need to do this once, outside of the loop
query.QueryNode(set_ref, VDMS_DESC_SET_TAG, link, constraints, results,
unique);

for(int i=0; i < nr_expected_descs; i++) {
int node_ref = query.get_available_reference();
Json::Value cur_props;
cur_props = prop_list[i];
cur_props[VDMS_DESC_ID_PROP] = Json::Int64(id+i);
cur_props[VDMS_DESC_LABEL_PROP] = label;

query.AddNode(node_ref, VDMS_DESC_TAG, cur_props, Json::nullValue);

// It passed the checker, so it exists.
//int set_ref = query.get_available_reference();
//Json::Value link;
//Json::Value results;
//Json::Value list_arr;
//list_arr.append(VDMS_DESC_SET_PATH_PROP);
//list_arr.append(VDMS_DESC_SET_DIM_PROP);
//results["list"] = list_arr;

//constraints for getting set node to link to.
//Json::Value constraints;
//Json::Value name_arr;
//name_arr.append("==");
//name_arr.append(set_name);
//constraints[VDMS_DESC_SET_NAME_PROP] = name_arr;

//bool unique = true;

// Query set node-We only need to do this once, outside of the loop TODO MOVE
//query.QueryNode(set_ref, VDMS_DESC_SET_TAG, link, constraints, results,
// unique);

//note this implicitly means that every node of a batch uses the same link
if (cmd.isMember("link")) {
add_link(query, cmd["link"], node_ref, VDMS_DESC_EDGE_TAG);
}

Json::Value link;
Json::Value results;
Json::Value list_arr;
list_arr.append(VDMS_DESC_SET_PATH_PROP);
list_arr.append(VDMS_DESC_SET_DIM_PROP);
results["list"] = list_arr;
Json::Value props_edge;
query.AddEdge(-1, set_ref, node_ref, VDMS_DESC_SET_EDGE_TAG, props_edge);
}

Json::Value constraints;
Json::Value name_arr;
name_arr.append("==");
name_arr.append(set_name);
constraints[VDMS_DESC_SET_NAME_PROP] = name_arr;
return 0;
}

bool unique = true;
int AddDescriptor::construct_protobuf(PMGDQuery &query,
const Json::Value &jsoncmd,
const std::string &blob, int grp_id,
Json::Value &error) {

// Query set node
query.QueryNode(set_ref, VDMS_DESC_SET_TAG, link, constraints, results,
unique);
bool batch_mode;
int rc;
const Json::Value &cmd = jsoncmd[_cmd_name];
const std::string set_name = cmd["set"].asString();

if (cmd.isMember("link")) {
add_link(query, cmd["link"], node_ref, VDMS_DESC_EDGE_TAG);

Json::Value prop_list = get_value<Json::Value>(cmd, "batch_properties");
if(prop_list.size() == 0){
rc = add_single_descriptor(query, jsoncmd, blob, grp_id, error);
} else {
rc = add_descriptor_batch(query, jsoncmd, blob, grp_id, error);
}

Json::Value props_edge;
query.AddEdge(-1, set_ref, node_ref, VDMS_DESC_SET_EDGE_TAG, props_edge);
if(rc < 0) error["status"] = RSCommand::Error;

// TODO: deleting files here causes problems with concurrency (TestRetail.py)
// keeping local copies as a temporary solution
// if(_use_aws_storage)
// {
// //delete files in set_path
// std::uintmax_t n = fs::remove_all(set_path);
// std::cout << "Deleted " << n << " files or directories\n";
// }

return 0;
return rc;

}

Json::Value AddDescriptor::construct_responses(
Expand Down Expand Up @@ -692,7 +848,6 @@ int FindDescriptor::construct_protobuf(PMGDQuery &query,

// Case (1)
if (cmd.isMember("link")) {

// Query for the Descriptors related to user-defined link
// that match the user-defined constraints
// We will need to do the AND operation
Expand All @@ -712,7 +867,6 @@ int FindDescriptor::construct_protobuf(PMGDQuery &query,
}
// Case (2)
else if (!cmd.isMember("k_neighbors")) {

// In this case, we either need properties of the descriptor
// ("list") on the results block, or we need the descriptor nodes
// because the user defined a reference.
Expand Down
Loading

0 comments on commit a1874e6

Please sign in to comment.