diff --git a/src/DescriptorsCommand.cc b/src/DescriptorsCommand.cc index df60d6de..1276333c 100644 --- a/src/DescriptorsCommand.cc +++ b/src/DescriptorsCommand.cc @@ -84,8 +84,10 @@ std::string DescriptorsCommand::get_set_path(PMGDQuery &query_tx, list_arr.append(VDMS_DESC_SET_DIM_PROP); list_arr.append(VDMS_DESC_SET_ENGIN_PROP); + results["list"] = list_arr; + bool unique = true; // Query set node @@ -94,7 +96,6 @@ std::string DescriptorsCommand::get_set_path(PMGDQuery &query_tx, true); Json::Value &query_responses = query.run(); - if (query_responses.size() != 1 && query_responses[0].size() != 1) { throw ExceptionCommand(DescriptorSetError, "PMGD Transaction Error"); } @@ -113,7 +114,6 @@ std::string DescriptorsCommand::get_set_path(PMGDQuery &query_tx, _desc_set_locator[set_name] = set_path; return set_path; } - return ""; } @@ -341,8 +341,9 @@ AddDescriptor::AddDescriptor() : DescriptorsCommand("AddDescriptor") { //_use_aws_storage = VDMSConfig::instance()->get_aws_flag(); } +//update to handle multiple descriptors at a go long AddDescriptor::insert_descriptor(const std::string &blob, - const std::string &set_path, int dim, + const std::string &set_path, int nr_desc, const std::string &label, Json::Value &error) { long id_first; @@ -351,21 +352,13 @@ long AddDescriptor::insert_descriptor(const std::string &blob, VCL::DescriptorSet *desc_set = _dm->get_descriptors_handler(set_path); - if (blob.length() / 4 != dim) { - std::cerr << "AddDescriptor::insert_descriptor: "; - std::cerr << "Dimensions mismatch: "; - std::cerr << blob.length() / 4 << " " << dim << std::endl; - error["info"] = "Blob Dimensions Mismatch"; - return -1; - } - if (!label.empty()) { long label_id = desc_set->get_label_id(label); long *label_ptr = &label_id; - id_first = desc_set->add((float *)blob.data(), 1, label_ptr); + id_first = desc_set->add((float *)blob.data(), nr_desc, label_ptr); } else { - id_first = desc_set->add((float *)blob.data(), 1); + id_first = desc_set->add((float *)blob.data(), nr_desc); } if (output_vcl_timing) { @@ -409,94 +402,257 @@ void AddDescriptor::retrieve_aws_descriptorSet(const std::string &set_path) { } } -int AddDescriptor::construct_protobuf(PMGDQuery &query, - const Json::Value &jsoncmd, - const std::string &blob, int grp_id, - Json::Value &error) { - const Json::Value &cmd = jsoncmd[_cmd_name]; +int AddDescriptor::add_single_descriptor(PMGDQuery &query, + const Json::Value &jsoncmd, + const std::string &blob, int grp_id, + Json::Value &error){ - const std::string set_name = cmd["set"].asString(); + const Json::Value &cmd = jsoncmd[_cmd_name]; + const std::string set_name = cmd["set"].asString(); - Json::Value props = get_value(cmd, "properties"); + Json::Value props = get_value(cmd, "properties"); - std::string label = get_value(cmd, "label", "None"); - props[VDMS_DESC_LABEL_PROP] = label; + std::string label = get_value(cmd, "label", "None"); + props[VDMS_DESC_LABEL_PROP] = label; - int dimensions; - const std::string set_path = get_set_path(query, set_name, dimensions); + int dim; + const std::string set_path = get_set_path(query, set_name, dim); - if (set_path.empty()) { - error["info"] = "Set " + set_name + " not found"; - error["status"] = RSCommand::Error; - return -1; - } + if (set_path.empty()) { + error["info"] = "Set " + set_name + " not found"; + error["status"] = RSCommand::Error; + return -1; + } - // retrieve the descriptor set from AWS here - // operations are currently done in memory with no subsequent write to disk - // so there's no need to re-upload to AWS - if (_use_aws_storage) { - retrieve_aws_descriptorSet(set_path); - } + if (blob.length() / 4 != dim) { + std::cerr << "AddDescriptor::insert_descriptor: "; + std::cerr << "Dimensions mismatch: "; + std::cerr << blob.length() / 4 << " " << dim << std::endl; + error["info"] = "Blob Dimensions Mismatch"; + return -1; + } - long id = insert_descriptor(blob, set_path, dimensions, label, error); + // retrieve the descriptor set from AWS here + // operations are currently done in memory with no subsequent write to disk + // so there's no need to re-upload to AWS + if (_use_aws_storage) { + retrieve_aws_descriptorSet(set_path); + } - if (id < 0) { - error["status"] = RSCommand::Error; + long id = insert_descriptor(blob, set_path, 1, label, error); + + if (id < 0) { + error["status"] = RSCommand::Error; + + if (_use_aws_storage) { + // delete files in set_path + std::uintmax_t n = fs::remove_all(set_path); + std::cout << "Deleted " << n << " files or directories\n"; + } + + return -1; + } + + props[VDMS_DESC_ID_PROP] = Json::Int64(id); + + int node_ref = get_value(cmd, "_ref", query.get_available_reference()); + + query.AddNode(node_ref, VDMS_DESC_TAG, props, Json::nullValue); + + // It passed the checker, so it exists. + int set_ref = query.get_available_reference(); + + Json::Value link; + Json::Value results; + Json::Value list_arr; + list_arr.append(VDMS_DESC_SET_PATH_PROP); + list_arr.append(VDMS_DESC_SET_DIM_PROP); + results["list"] = list_arr; + + Json::Value constraints; + Json::Value name_arr; + name_arr.append("=="); + name_arr.append(set_name); + constraints[VDMS_DESC_SET_NAME_PROP] = name_arr; + + bool unique = true; + + // Query set node + query.QueryNode(set_ref, VDMS_DESC_SET_TAG, link, constraints, results, + unique); + + if (cmd.isMember("link")) { + add_link(query, cmd["link"], node_ref, VDMS_DESC_EDGE_TAG); + } + + Json::Value props_edge; + query.AddEdge(-1, set_ref, node_ref, VDMS_DESC_SET_EDGE_TAG, props_edge); + + // TODO: deleting files here causes problems with concurrency (TestRetail.py) + // keeping local copies as a temporary solution + // if(_use_aws_storage) + // { + // //delete files in set_path + // std::uintmax_t n = fs::remove_all(set_path); + // std::cout << "Deleted " << n << " files or directories\n"; + // } + return 0; + +} + +int AddDescriptor::add_descriptor_batch(PMGDQuery &query, + const Json::Value &jsoncmd, + const std::string &blob, int grp_id, + Json::Value &error){ + + const int FOUR_BYTE_INT = 4; + int expected_blb_size; + int nr_expected_descs; + int dimensions; + + //Extract set name + const Json::Value &cmd = jsoncmd[_cmd_name]; + const std::string set_name = cmd["set"].asString(); + + //Json::Value props = get_value(cmd, "properties"); + + //extract properties list and get filepath/object location of set + Json::Value prop_list = get_value(cmd, "batch_properties"); + const std::string set_path = get_set_path(query, set_name, dimensions); + + if (set_path.empty()) { + error["info"] = "Set " + set_name + " not found"; + error["status"] = RSCommand::Error; + return -1; + } + + std::string label = get_value(cmd, "label", "None"); + //props[VDMS_DESC_LABEL_PROP] = label; + + // retrieve the descriptor set from AWS here + // operations are currently done in memory with no subsequent write to disk + // so there's no need to re-upload to AWS if (_use_aws_storage) { - // delete files in set_path - std::uintmax_t n = fs::remove_all(set_path); - std::cout << "Deleted " << n << " files or directories\n"; + retrieve_aws_descriptorSet(set_path); } - return -1; - } + // Note dimensionse are based on a 32 bit integer, hence the /4 math on size + // as the string blob is sized in 8 bit ints. + nr_expected_descs = prop_list.size(); + expected_blb_size = nr_expected_descs * dimensions * FOUR_BYTE_INT; + + //Verify length of input is matching expectations + if (blob.length() != expected_blb_size) { + std::cerr << "AddDescriptor::insert_descriptor: "; + std::cerr << "Expected Blob Length Does Not Match Input "; + std::cerr << "Input Length: " <(cmd, "_ref", query.get_available_reference()); + if (id < 0) { + error["status"] = RSCommand::Error; - query.AddNode(node_ref, VDMS_DESC_TAG, props, Json::nullValue); + if (_use_aws_storage) { + // delete files in set_path + std::uintmax_t n = fs::remove_all(set_path); + std::cout << "Deleted " << n << " files or directories\n"; + } + error["info"] = "FV Index Insert Failed"; + return -1; + } - // It passed the checker, so it exists. - int set_ref = query.get_available_reference(); + // It passed the checker, so it exists. + int set_ref = query.get_available_reference(); + + Json::Value link; + Json::Value results; + Json::Value list_arr; + list_arr.append(VDMS_DESC_SET_PATH_PROP); + list_arr.append(VDMS_DESC_SET_DIM_PROP); + results["list"] = list_arr; + + //constraints for getting set node to link to. + Json::Value constraints; + Json::Value name_arr; + name_arr.append("=="); + name_arr.append(set_name); + constraints[VDMS_DESC_SET_NAME_PROP] = name_arr; + bool unique = true; + + // Query set node-We only need to do this once, outside of the loop + query.QueryNode(set_ref, VDMS_DESC_SET_TAG, link, constraints, results, + unique); + + for(int i=0; i < nr_expected_descs; i++) { + int node_ref = query.get_available_reference(); + Json::Value cur_props; + cur_props = prop_list[i]; + cur_props[VDMS_DESC_ID_PROP] = Json::Int64(id+i); + cur_props[VDMS_DESC_LABEL_PROP] = label; + + query.AddNode(node_ref, VDMS_DESC_TAG, cur_props, Json::nullValue); + + // It passed the checker, so it exists. + //int set_ref = query.get_available_reference(); + //Json::Value link; + //Json::Value results; + //Json::Value list_arr; + //list_arr.append(VDMS_DESC_SET_PATH_PROP); + //list_arr.append(VDMS_DESC_SET_DIM_PROP); + //results["list"] = list_arr; + + //constraints for getting set node to link to. + //Json::Value constraints; + //Json::Value name_arr; + //name_arr.append("=="); + //name_arr.append(set_name); + //constraints[VDMS_DESC_SET_NAME_PROP] = name_arr; + + //bool unique = true; + + // Query set node-We only need to do this once, outside of the loop TODO MOVE + //query.QueryNode(set_ref, VDMS_DESC_SET_TAG, link, constraints, results, + // unique); + + //note this implicitly means that every node of a batch uses the same link + if (cmd.isMember("link")) { + add_link(query, cmd["link"], node_ref, VDMS_DESC_EDGE_TAG); + } - Json::Value link; - Json::Value results; - Json::Value list_arr; - list_arr.append(VDMS_DESC_SET_PATH_PROP); - list_arr.append(VDMS_DESC_SET_DIM_PROP); - results["list"] = list_arr; + Json::Value props_edge; + query.AddEdge(-1, set_ref, node_ref, VDMS_DESC_SET_EDGE_TAG, props_edge); + } - Json::Value constraints; - Json::Value name_arr; - name_arr.append("=="); - name_arr.append(set_name); - constraints[VDMS_DESC_SET_NAME_PROP] = name_arr; + return 0; +} - bool unique = true; +int AddDescriptor::construct_protobuf(PMGDQuery &query, + const Json::Value &jsoncmd, + const std::string &blob, int grp_id, + Json::Value &error) { - // Query set node - query.QueryNode(set_ref, VDMS_DESC_SET_TAG, link, constraints, results, - unique); + bool batch_mode; + int rc; + const Json::Value &cmd = jsoncmd[_cmd_name]; + const std::string set_name = cmd["set"].asString(); - if (cmd.isMember("link")) { - add_link(query, cmd["link"], node_ref, VDMS_DESC_EDGE_TAG); + + Json::Value prop_list = get_value(cmd, "batch_properties"); + if(prop_list.size() == 0){ + rc = add_single_descriptor(query, jsoncmd, blob, grp_id, error); + } else { + rc = add_descriptor_batch(query, jsoncmd, blob, grp_id, error); } - Json::Value props_edge; - query.AddEdge(-1, set_ref, node_ref, VDMS_DESC_SET_EDGE_TAG, props_edge); + if(rc < 0) error["status"] = RSCommand::Error; - // TODO: deleting files here causes problems with concurrency (TestRetail.py) - // keeping local copies as a temporary solution - // if(_use_aws_storage) - // { - // //delete files in set_path - // std::uintmax_t n = fs::remove_all(set_path); - // std::cout << "Deleted " << n << " files or directories\n"; - // } - return 0; + return rc; + } Json::Value AddDescriptor::construct_responses( @@ -692,7 +848,6 @@ int FindDescriptor::construct_protobuf(PMGDQuery &query, // Case (1) if (cmd.isMember("link")) { - // Query for the Descriptors related to user-defined link // that match the user-defined constraints // We will need to do the AND operation @@ -712,7 +867,6 @@ int FindDescriptor::construct_protobuf(PMGDQuery &query, } // Case (2) else if (!cmd.isMember("k_neighbors")) { - // In this case, we either need properties of the descriptor // ("list") on the results block, or we need the descriptor nodes // because the user defined a reference. diff --git a/src/DescriptorsCommand.h b/src/DescriptorsCommand.h index 9ad6e6ab..0199dd99 100644 --- a/src/DescriptorsCommand.h +++ b/src/DescriptorsCommand.h @@ -59,7 +59,7 @@ class DescriptorsCommand : public RSCommand { tbb::concurrent_unordered_map _cache_map; static tbb::concurrent_unordered_map - _desc_set_locator; + _desc_set_locator; static tbb::concurrent_unordered_map _desc_set_dims; // Will return the path to the set and the dimensions @@ -132,6 +132,14 @@ class AddDescriptor : public DescriptorsCommand { void retrieve_aws_descriptorSet(const std::string &set_path); + int add_single_descriptor(PMGDQuery &tx, const Json::Value &root, + const std::string &blob, int grp_id, + Json::Value &error); + + int add_descriptor_batch(PMGDQuery &tx, const Json::Value &root, + const std::string &blob, int grp_id, + Json::Value &error); + public: AddDescriptor(); diff --git a/src/QueryHandlerPMGD.cc b/src/QueryHandlerPMGD.cc index 66a1cfb6..2c17895d 100644 --- a/src/QueryHandlerPMGD.cc +++ b/src/QueryHandlerPMGD.cc @@ -63,8 +63,7 @@ std::unordered_map QueryHandlerPMGD::_rs_cmds; // DescriptorCommand.h tbb::concurrent_unordered_map DescriptorsCommand::_desc_set_locator; -tbb::concurrent_unordered_map - DescriptorsCommand::_desc_set_dims; +tbb::concurrent_unordered_map DescriptorsCommand::_desc_set_dims; void QueryHandlerPMGD::init() { DescriptorsManager::init(); @@ -199,7 +198,7 @@ bool QueryHandlerPMGD::syntax_checker(const Json::Value &root, int QueryHandlerPMGD::parse_commands(const protobufs::queryMessage &proto_query, Json::Value &root) { - Json::Reader reader; + Json::Reader reader; const std::string commands = proto_query.json(); try { @@ -244,12 +243,11 @@ int QueryHandlerPMGD::parse_commands(const protobufs::queryMessage &proto_query, root["status"] = RSCommand::Error; return -1; } - return 0; } void QueryHandlerPMGD::process_query(protobufs::queryMessage &proto_query, - protobufs::queryMessage &proto_res) { + protobufs::queryMessage &proto_res) { //TODO Investigate why/where json throwing Json::FastWriter fastWriter; Json::Value root; @@ -309,6 +307,7 @@ void QueryHandlerPMGD::process_query(protobufs::queryMessage &proto_query, // iterate over the list of the queries for (int j = 0; j < root.size(); j++) { + const Json::Value &query = root[j]; std::string cmd = query.getMemberNames()[0]; @@ -342,10 +341,12 @@ void QueryHandlerPMGD::process_query(protobufs::queryMessage &proto_query, construct_results.push_back(cmd_result); } + timers.add_timestamp("pmgd_query_time"); Json::Value &tx_responses = pmgd_query.run(_autodelete_init); timers.add_timestamp("pmgd_query_time"); + if (!tx_responses.isArray() || tx_responses.size() != root.size()) { Json::StyledWriter writer; std::cerr << "PMGD Response:" << std::endl; @@ -400,9 +401,11 @@ void QueryHandlerPMGD::process_query(protobufs::queryMessage &proto_query, if (output_query_level_timing) { timers.print_map_runtimes(); } + proto_res.set_json(fastWriter.write(json_responses)); _pmgd_qh.cleanup_files(); + } catch (VCL::Exception &e) { print_exception(e); error_msg << "Internal Server Error: VCL Exception at QH" << std::endl; diff --git a/tests/python/TestDescriptors.py b/tests/python/TestDescriptors.py index 0d6d4be2..0488950a 100644 --- a/tests/python/TestDescriptors.py +++ b/tests/python/TestDescriptors.py @@ -206,6 +206,203 @@ def test_addSetAndDescriptorsDimMismatch(self): self.disconnect(db) + def test_AddSetAndWrongBatchSize(self): + + + db = self.create_connection() + + # Create and verify descriptor set + trans_list = [] + trans_dict = {} + desc_set = {} + desc_set["engine"] = "FaissFlat" + desc_set["metric"] = "L2" + desc_set["name"] = "wrongbatchsize" + desc_set["dimensions"] = 128 + trans_dict["AddDescriptorSet"] = desc_set + + trans_list.append(trans_dict) + + response, img_array = db.query(trans_list) + self.assertEqual(response[0]["AddDescriptorSet"]["status"],0) + + # Create and add a batch of feature vectors + trans = [] + blobs = [] + nr_dims = 128 + batch_size = 10 + desc_blob = [] + x = np.zeros(nr_dims * batch_size) + x = x.astype("float32") + desc_blob.append(x.tobytes()) + + properties_list=[] + for x in range(batch_size + 3): + props = {"batchprop": x} + properties_list.append(props) + + descriptor = {} + descriptor["set"] = "wrongbatchsize" + descriptor["batch_properties"] = properties_list + query = {} + query["AddDescriptor"] = descriptor + trans.append(query) + blobs.append(desc_blob) + + response, img_array = db.query(trans, blobs) + self.assertEqual(response[0]["info"], "FV Input Length Mismatch") + self.assertEqual(response[0]["status"], -1) + + self.disconnect(db) + + def test_AddSetAndInsertBatch(self): + + db = self.create_connection() + + # Create and verify descriptor set + trans_list = [] + trans_dict = {} + desc_set = {} + desc_set["engine"] = "FaissFlat" + desc_set["metric"] = "L2" + desc_set["name"] = "rightbatchsize" + desc_set["dimensions"] = 128 + trans_dict["AddDescriptorSet"] = desc_set + + trans_list.append(trans_dict) + + response, img_array = db.query(trans_list) + self.assertEqual(response[0]["AddDescriptorSet"]["status"],0) + + # Create and add a batch of feature vectors + trans = [] + blobs = [] + nr_dims = 128 + batch_size = 10 + desc_blob = [] + x = np.zeros(nr_dims * batch_size) + x = x.astype("float32") + desc_blob.append(x.tobytes()) + + properties_list=[] + for x in range(batch_size): + props = {"batchprop": x} + properties_list.append(props) + + descriptor = {} + descriptor["set"] = "rightbatchsize" + descriptor["batch_properties"] = properties_list + query = {} + query["AddDescriptor"] = descriptor + trans.append(query) + blobs.append(desc_blob) + + response, img_array = db.query(trans, blobs) + self.assertEqual(response[0]["AddDescriptor"]["status"], 0) + + # now try to get those same descriptors back + desc_find = {} + desc_find["set"] = "rightbatchsize" + desc_find["results"] = {"list":["batchprop"]} + + query = {} + query["FindDescriptor"] = desc_find + + trans = [] + blobs = [] + trans.append(query) + response, img_array = db.query(trans, blobs) + self.assertEqual(response[0]["FindDescriptor"]["returned"],10) + + self.disconnect(db) + + def test_AddBatchAndFindKNN(self): + + db = self.create_connection() + + # Create and verify descriptor set + trans_list = [] + trans_dict = {} + desc_set = {} + desc_set["engine"] = "FaissFlat" + desc_set["metric"] = "L2" + desc_set["name"] = "knn_batch_set" + desc_set["dimensions"] = 128 + trans_dict["AddDescriptorSet"] = desc_set + + trans_list.append(trans_dict) + + response, img_array = db.query(trans_list) + self.assertEqual(response[0]["AddDescriptorSet"]["status"],0) + + # Descriptor Set Created, now lets create a batch to insert + # first lets make a big combined blob representing the inserted descriptor + trans = [] + blobs = [] + nr_dims = 128 + batch_size = 5 + desc_blob = [] + x = np.ones(nr_dims * batch_size) + for i in range(batch_size): + x[2 + (i*nr_dims)] = 2.34 + i * 20 + + x = x.astype("float32") + desc_blob.append(x.tobytes()) + + properties_list = [] + for x in range(batch_size): + props = {"myid": x + 200} + properties_list.append(props) + + descriptor = {} + descriptor["set"] = "knn_batch_set" + descriptor["batch_properties"] = properties_list + + query = {} + query["AddDescriptor"] = descriptor + trans.append(query) + blobs.append(desc_blob) + + response, img_array = db.query(trans, blobs) + self.assertEqual(response[0]["AddDescriptor"]["status"], 0) + + ### Now try to find a KNN + kn = 3 + finddescriptor = {} + finddescriptor["set"] = "knn_batch_set" + + results = {} + results["list"] = ["myid", "_id", "_distance"] + results["blob"] = True + finddescriptor["results"] = results + finddescriptor["k_neighbors"] = kn + + query = {} + query["FindDescriptor"] = finddescriptor + + all_queries = [] + all_queries.append(query) + + descriptor_blob = [] + x = np.ones(128) + x[2] = 2.34 + 1 * 20 # 2.34 + 1*20 + x = x.astype("float32") + descriptor_blob.append(x.tobytes()) + + response, blob_array = db.query(all_queries, [descriptor_blob]) + + self.assertEqual(len(blob_array), kn) + self.assertEqual(descriptor_blob[0], blob_array[0]) + + # Check success + self.assertEqual(response[0]["FindDescriptor"]["status"], 0) + self.assertEqual(response[0]["FindDescriptor"]["returned"], kn) + self.assertEqual(response[0]["FindDescriptor"]["entities"][0]["_distance"], 0) + self.assertEqual(response[0]["FindDescriptor"]["entities"][1]["_distance"], 400) + self.assertEqual(response[0]["FindDescriptor"]["entities"][2]["_distance"], 400) + self.disconnect(db) + + def test_classifyDescriptor(self): db = self.create_connection() set_name = "features_128d_4_classify" diff --git a/utils/src/api_schema/api_schema.json b/utils/src/api_schema/api_schema.json index 9725c663..5b337e78 100644 --- a/utils/src/api_schema/api_schema.json +++ b/utils/src/api_schema/api_schema.json @@ -75,6 +75,11 @@ "definitions": { // misc + "propertyArray": { + "type": "array", + "items": {"type" : "object"}, + "minimum" : 1 + }, "positiveInt": { "type": "integer", @@ -744,7 +749,8 @@ "label": { "type": "string" }, "_ref": { "$ref": "#/definitions/refInt" }, "link": { "$ref": "#/definitions/blockLink" }, - "properties": { "type": "object" } + "properties": { "type": "object" }, + "batch_properties": {"$ref": "#/definitions/propertyArray"} }, "required": ["set"], "additionalProperties": false