Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

175 descriptor set optimizations batch inserts #209

Merged
merged 10 commits into from
Aug 29, 2024
310 changes: 232 additions & 78 deletions src/DescriptorsCommand.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,10 @@ std::string DescriptorsCommand::get_set_path(PMGDQuery &query_tx,
list_arr.append(VDMS_DESC_SET_DIM_PROP);
list_arr.append(VDMS_DESC_SET_ENGIN_PROP);


results["list"] = list_arr;


bool unique = true;

// Query set node
Expand All @@ -94,7 +96,6 @@ std::string DescriptorsCommand::get_set_path(PMGDQuery &query_tx,
true);

Json::Value &query_responses = query.run();

if (query_responses.size() != 1 && query_responses[0].size() != 1) {
throw ExceptionCommand(DescriptorSetError, "PMGD Transaction Error");
}
Expand All @@ -113,7 +114,6 @@ std::string DescriptorsCommand::get_set_path(PMGDQuery &query_tx,
_desc_set_locator[set_name] = set_path;
return set_path;
}

return "";
}

Expand Down Expand Up @@ -341,8 +341,9 @@ AddDescriptor::AddDescriptor() : DescriptorsCommand("AddDescriptor") {
//_use_aws_storage = VDMSConfig::instance()->get_aws_flag();
}

//update to handle multiple descriptors at a go
long AddDescriptor::insert_descriptor(const std::string &blob,
const std::string &set_path, int dim,
const std::string &set_path, int nr_desc,
const std::string &label,
Json::Value &error) {
long id_first;
Expand All @@ -351,21 +352,13 @@ long AddDescriptor::insert_descriptor(const std::string &blob,

VCL::DescriptorSet *desc_set = _dm->get_descriptors_handler(set_path);

if (blob.length() / 4 != dim) {
std::cerr << "AddDescriptor::insert_descriptor: ";
std::cerr << "Dimensions mismatch: ";
std::cerr << blob.length() / 4 << " " << dim << std::endl;
error["info"] = "Blob Dimensions Mismatch";
return -1;
}

if (!label.empty()) {
long label_id = desc_set->get_label_id(label);
long *label_ptr = &label_id;
id_first = desc_set->add((float *)blob.data(), 1, label_ptr);
id_first = desc_set->add((float *)blob.data(), nr_desc, label_ptr);

} else {
id_first = desc_set->add((float *)blob.data(), 1);
id_first = desc_set->add((float *)blob.data(), nr_desc);
}

if (output_vcl_timing) {
Expand Down Expand Up @@ -409,94 +402,257 @@ void AddDescriptor::retrieve_aws_descriptorSet(const std::string &set_path) {
}
}

int AddDescriptor::construct_protobuf(PMGDQuery &query,
const Json::Value &jsoncmd,
const std::string &blob, int grp_id,
Json::Value &error) {
const Json::Value &cmd = jsoncmd[_cmd_name];
int AddDescriptor::add_single_descriptor(PMGDQuery &query,
const Json::Value &jsoncmd,
const std::string &blob, int grp_id,
Json::Value &error){

const std::string set_name = cmd["set"].asString();
const Json::Value &cmd = jsoncmd[_cmd_name];
const std::string set_name = cmd["set"].asString();

Json::Value props = get_value<Json::Value>(cmd, "properties");
Json::Value props = get_value<Json::Value>(cmd, "properties");

std::string label = get_value<std::string>(cmd, "label", "None");
props[VDMS_DESC_LABEL_PROP] = label;
std::string label = get_value<std::string>(cmd, "label", "None");
props[VDMS_DESC_LABEL_PROP] = label;

int dimensions;
const std::string set_path = get_set_path(query, set_name, dimensions);
int dim;
const std::string set_path = get_set_path(query, set_name, dim);

if (set_path.empty()) {
error["info"] = "Set " + set_name + " not found";
error["status"] = RSCommand::Error;
return -1;
}
if (set_path.empty()) {
error["info"] = "Set " + set_name + " not found";
error["status"] = RSCommand::Error;
return -1;
}

// retrieve the descriptor set from AWS here
// operations are currently done in memory with no subsequent write to disk
// so there's no need to re-upload to AWS
if (_use_aws_storage) {
retrieve_aws_descriptorSet(set_path);
}
if (blob.length() / 4 != dim) {
std::cerr << "AddDescriptor::insert_descriptor: ";
std::cerr << "Dimensions mismatch: ";
std::cerr << blob.length() / 4 << " " << dim << std::endl;
error["info"] = "Blob Dimensions Mismatch";
return -1;
}

long id = insert_descriptor(blob, set_path, dimensions, label, error);
// retrieve the descriptor set from AWS here
// operations are currently done in memory with no subsequent write to disk
// so there's no need to re-upload to AWS
if (_use_aws_storage) {
retrieve_aws_descriptorSet(set_path);
}

if (id < 0) {
error["status"] = RSCommand::Error;
long id = insert_descriptor(blob, set_path, 1, label, error);

if (id < 0) {
error["status"] = RSCommand::Error;

if (_use_aws_storage) {
// delete files in set_path
std::uintmax_t n = fs::remove_all(set_path);
std::cout << "Deleted " << n << " files or directories\n";
}

return -1;
}

props[VDMS_DESC_ID_PROP] = Json::Int64(id);

int node_ref = get_value<int>(cmd, "_ref", query.get_available_reference());

query.AddNode(node_ref, VDMS_DESC_TAG, props, Json::nullValue);

// It passed the checker, so it exists.
int set_ref = query.get_available_reference();

Json::Value link;
Json::Value results;
Json::Value list_arr;
list_arr.append(VDMS_DESC_SET_PATH_PROP);
list_arr.append(VDMS_DESC_SET_DIM_PROP);
results["list"] = list_arr;

Json::Value constraints;
Json::Value name_arr;
name_arr.append("==");
name_arr.append(set_name);
constraints[VDMS_DESC_SET_NAME_PROP] = name_arr;

bool unique = true;

// Query set node
query.QueryNode(set_ref, VDMS_DESC_SET_TAG, link, constraints, results,
unique);

if (cmd.isMember("link")) {
add_link(query, cmd["link"], node_ref, VDMS_DESC_EDGE_TAG);
}

Json::Value props_edge;
query.AddEdge(-1, set_ref, node_ref, VDMS_DESC_SET_EDGE_TAG, props_edge);

// TODO: deleting files here causes problems with concurrency (TestRetail.py)
// keeping local copies as a temporary solution
// if(_use_aws_storage)
// {
// //delete files in set_path
// std::uintmax_t n = fs::remove_all(set_path);
// std::cout << "Deleted " << n << " files or directories\n";
// }

return 0;

}

int AddDescriptor::add_descriptor_batch(PMGDQuery &query,
const Json::Value &jsoncmd,
const std::string &blob, int grp_id,
Json::Value &error){

const int FOUR_BYTE_INT = 4;
int expected_blb_size;
int nr_expected_descs;
int dimensions;

//Extract set name
const Json::Value &cmd = jsoncmd[_cmd_name];
const std::string set_name = cmd["set"].asString();

//Json::Value props = get_value<Json::Value>(cmd, "properties");

//extract properties list and get filepath/object location of set
Json::Value prop_list = get_value<Json::Value>(cmd, "batch_properties");
const std::string set_path = get_set_path(query, set_name, dimensions);

if (set_path.empty()) {
error["info"] = "Set " + set_name + " not found";
error["status"] = RSCommand::Error;
return -1;
}

std::string label = get_value<std::string>(cmd, "label", "None");
//props[VDMS_DESC_LABEL_PROP] = label;

// retrieve the descriptor set from AWS here
// operations are currently done in memory with no subsequent write to disk
// so there's no need to re-upload to AWS
if (_use_aws_storage) {
// delete files in set_path
std::uintmax_t n = fs::remove_all(set_path);
std::cout << "Deleted " << n << " files or directories\n";
retrieve_aws_descriptorSet(set_path);
}

return -1;
}
// Note dimensionse are based on a 32 bit integer, hence the /4 math on size
// as the string blob is sized in 8 bit ints.
nr_expected_descs = prop_list.size();
expected_blb_size = nr_expected_descs * dimensions * FOUR_BYTE_INT;

//Verify length of input is matching expectations
if (blob.length() != expected_blb_size) {
std::cerr << "AddDescriptor::insert_descriptor: ";
std::cerr << "Expected Blob Length Does Not Match Input ";
std::cerr << "Input Length: " <<blob.length() << " != " << "Expected Length: " << expected_blb_size << std::endl;
error["info"] = "FV Input Length Mismatch";
return -1;
}

props[VDMS_DESC_ID_PROP] = Json::Int64(id);
long id = insert_descriptor(blob, set_path, nr_expected_descs, label, error);

int node_ref = get_value<int>(cmd, "_ref", query.get_available_reference());
if (id < 0) {
error["status"] = RSCommand::Error;

query.AddNode(node_ref, VDMS_DESC_TAG, props, Json::nullValue);
if (_use_aws_storage) {
// delete files in set_path
std::uintmax_t n = fs::remove_all(set_path);
std::cout << "Deleted " << n << " files or directories\n";
ifadams marked this conversation as resolved.
Show resolved Hide resolved
}
error["info"] = "FV Index Insert Failed";
return -1;
}

// It passed the checker, so it exists.
int set_ref = query.get_available_reference();
// It passed the checker, so it exists.
int set_ref = query.get_available_reference();

Json::Value link;
Json::Value results;
Json::Value list_arr;
list_arr.append(VDMS_DESC_SET_PATH_PROP);
list_arr.append(VDMS_DESC_SET_DIM_PROP);
results["list"] = list_arr;

//constraints for getting set node to link to.
Json::Value constraints;
Json::Value name_arr;
name_arr.append("==");
name_arr.append(set_name);
constraints[VDMS_DESC_SET_NAME_PROP] = name_arr;
bool unique = true;

// Query set node-We only need to do this once, outside of the loop
query.QueryNode(set_ref, VDMS_DESC_SET_TAG, link, constraints, results,
unique);

for(int i=0; i < nr_expected_descs; i++) {
int node_ref = query.get_available_reference();
Json::Value cur_props;
cur_props = prop_list[i];
cur_props[VDMS_DESC_ID_PROP] = Json::Int64(id+i);
cur_props[VDMS_DESC_LABEL_PROP] = label;

query.AddNode(node_ref, VDMS_DESC_TAG, cur_props, Json::nullValue);

// It passed the checker, so it exists.
//int set_ref = query.get_available_reference();
//Json::Value link;
//Json::Value results;
//Json::Value list_arr;
//list_arr.append(VDMS_DESC_SET_PATH_PROP);
//list_arr.append(VDMS_DESC_SET_DIM_PROP);
//results["list"] = list_arr;

//constraints for getting set node to link to.
//Json::Value constraints;
//Json::Value name_arr;
//name_arr.append("==");
//name_arr.append(set_name);
//constraints[VDMS_DESC_SET_NAME_PROP] = name_arr;

//bool unique = true;

// Query set node-We only need to do this once, outside of the loop TODO MOVE
ifadams marked this conversation as resolved.
Show resolved Hide resolved
//query.QueryNode(set_ref, VDMS_DESC_SET_TAG, link, constraints, results,
// unique);

//note this implicitly means that every node of a batch uses the same link
ifadams marked this conversation as resolved.
Show resolved Hide resolved
if (cmd.isMember("link")) {
add_link(query, cmd["link"], node_ref, VDMS_DESC_EDGE_TAG);
}

Json::Value link;
Json::Value results;
Json::Value list_arr;
list_arr.append(VDMS_DESC_SET_PATH_PROP);
list_arr.append(VDMS_DESC_SET_DIM_PROP);
results["list"] = list_arr;
Json::Value props_edge;
query.AddEdge(-1, set_ref, node_ref, VDMS_DESC_SET_EDGE_TAG, props_edge);
}

Json::Value constraints;
Json::Value name_arr;
name_arr.append("==");
name_arr.append(set_name);
constraints[VDMS_DESC_SET_NAME_PROP] = name_arr;
return 0;
}

bool unique = true;
int AddDescriptor::construct_protobuf(PMGDQuery &query,
const Json::Value &jsoncmd,
const std::string &blob, int grp_id,
Json::Value &error) {

// Query set node
query.QueryNode(set_ref, VDMS_DESC_SET_TAG, link, constraints, results,
unique);
bool batch_mode;
int rc;
const Json::Value &cmd = jsoncmd[_cmd_name];
const std::string set_name = cmd["set"].asString();

if (cmd.isMember("link")) {
add_link(query, cmd["link"], node_ref, VDMS_DESC_EDGE_TAG);

Json::Value prop_list = get_value<Json::Value>(cmd, "batch_properties");
if(prop_list.size() == 0){
rc = add_single_descriptor(query, jsoncmd, blob, grp_id, error);
} else {
rc = add_descriptor_batch(query, jsoncmd, blob, grp_id, error);
}

Json::Value props_edge;
query.AddEdge(-1, set_ref, node_ref, VDMS_DESC_SET_EDGE_TAG, props_edge);
if(rc < 0) error["status"] = RSCommand::Error;
ifadams marked this conversation as resolved.
Show resolved Hide resolved

// TODO: deleting files here causes problems with concurrency (TestRetail.py)
// keeping local copies as a temporary solution
// if(_use_aws_storage)
// {
// //delete files in set_path
// std::uintmax_t n = fs::remove_all(set_path);
// std::cout << "Deleted " << n << " files or directories\n";
// }

return 0;
return rc;

}

Json::Value AddDescriptor::construct_responses(
Expand Down Expand Up @@ -692,7 +848,6 @@ int FindDescriptor::construct_protobuf(PMGDQuery &query,

// Case (1)
if (cmd.isMember("link")) {

// Query for the Descriptors related to user-defined link
// that match the user-defined constraints
// We will need to do the AND operation
Expand All @@ -712,7 +867,6 @@ int FindDescriptor::construct_protobuf(PMGDQuery &query,
}
// Case (2)
else if (!cmd.isMember("k_neighbors")) {

// In this case, we either need properties of the descriptor
// ("list") on the results block, or we need the descriptor nodes
// because the user defined a reference.
Expand Down
Loading
Loading