From 87265b846ec52bcfc394616f983aa62cbe4c90a8 Mon Sep 17 00:00:00 2001 From: Ian Adams Date: Thu, 1 Aug 2024 16:13:07 -0700 Subject: [PATCH 01/10] WiP commit --- src/DescriptorsCommand.cc | 175 +++++++++++++++++---------- src/DescriptorsCommand.h | 10 +- src/QueryHandlerPMGD.cc | 20 ++- utils/src/api_schema/api_schema.json | 8 +- 4 files changed, 139 insertions(+), 74 deletions(-) diff --git a/src/DescriptorsCommand.cc b/src/DescriptorsCommand.cc index df60d6de..bdf8cb8f 100644 --- a/src/DescriptorsCommand.cc +++ b/src/DescriptorsCommand.cc @@ -84,8 +84,10 @@ std::string DescriptorsCommand::get_set_path(PMGDQuery &query_tx, list_arr.append(VDMS_DESC_SET_DIM_PROP); list_arr.append(VDMS_DESC_SET_ENGIN_PROP); + results["list"] = list_arr; + bool unique = true; // Query set node @@ -94,7 +96,6 @@ std::string DescriptorsCommand::get_set_path(PMGDQuery &query_tx, true); Json::Value &query_responses = query.run(); - if (query_responses.size() != 1 && query_responses[0].size() != 1) { throw ExceptionCommand(DescriptorSetError, "PMGD Transaction Error"); } @@ -113,7 +114,6 @@ std::string DescriptorsCommand::get_set_path(PMGDQuery &query_tx, _desc_set_locator[set_name] = set_path; return set_path; } - return ""; } @@ -409,94 +409,137 @@ void AddDescriptor::retrieve_aws_descriptorSet(const std::string &set_path) { } } -int AddDescriptor::construct_protobuf(PMGDQuery &query, - const Json::Value &jsoncmd, - const std::string &blob, int grp_id, - Json::Value &error) { - const Json::Value &cmd = jsoncmd[_cmd_name]; +int AddDescriptor::add_single_descriptor(PMGDQuery &query, + const Json::Value &jsoncmd, + const std::string &blob, int grp_id, + Json::Value &error){ - const std::string set_name = cmd["set"].asString(); + const Json::Value &cmd = jsoncmd[_cmd_name]; + const std::string set_name = cmd["set"].asString(); - Json::Value props = get_value(cmd, "properties"); + Json::Value props = get_value(cmd, "properties"); - std::string label = get_value(cmd, "label", "None"); - props[VDMS_DESC_LABEL_PROP] = label; + std::string label = get_value(cmd, "label", "None"); + props[VDMS_DESC_LABEL_PROP] = label; - int dimensions; - const std::string set_path = get_set_path(query, set_name, dimensions); + int dimensions; + const std::string set_path = get_set_path(query, set_name, dimensions); - if (set_path.empty()) { - error["info"] = "Set " + set_name + " not found"; - error["status"] = RSCommand::Error; - return -1; - } + if (set_path.empty()) { + error["info"] = "Set " + set_name + " not found"; + error["status"] = RSCommand::Error; + return -1; + } - // retrieve the descriptor set from AWS here - // operations are currently done in memory with no subsequent write to disk - // so there's no need to re-upload to AWS - if (_use_aws_storage) { - retrieve_aws_descriptorSet(set_path); - } + // retrieve the descriptor set from AWS here + // operations are currently done in memory with no subsequent write to disk + // so there's no need to re-upload to AWS + if (_use_aws_storage) { + retrieve_aws_descriptorSet(set_path); + } - long id = insert_descriptor(blob, set_path, dimensions, label, error); + //TODO modify descriptor + long id = insert_descriptor(blob, set_path, dimensions, label, error); - if (id < 0) { - error["status"] = RSCommand::Error; + if (id < 0) { + error["status"] = RSCommand::Error; - if (_use_aws_storage) { - // delete files in set_path - std::uintmax_t n = fs::remove_all(set_path); - std::cout << "Deleted " << n << " files or directories\n"; + if (_use_aws_storage) { + // delete files in set_path + std::uintmax_t n = fs::remove_all(set_path); + std::cout << "Deleted " << n << " files or directories\n"; + } + + return -1; } - return -1; - } + props[VDMS_DESC_ID_PROP] = Json::Int64(id); - props[VDMS_DESC_ID_PROP] = Json::Int64(id); + int node_ref = get_value(cmd, "_ref", query.get_available_reference()); - int node_ref = get_value(cmd, "_ref", query.get_available_reference()); + query.AddNode(node_ref, VDMS_DESC_TAG, props, Json::nullValue); - query.AddNode(node_ref, VDMS_DESC_TAG, props, Json::nullValue); + // It passed the checker, so it exists. + int set_ref = query.get_available_reference(); - // It passed the checker, so it exists. - int set_ref = query.get_available_reference(); + Json::Value link; + Json::Value results; + Json::Value list_arr; + list_arr.append(VDMS_DESC_SET_PATH_PROP); + list_arr.append(VDMS_DESC_SET_DIM_PROP); + results["list"] = list_arr; - Json::Value link; - Json::Value results; - Json::Value list_arr; - list_arr.append(VDMS_DESC_SET_PATH_PROP); - list_arr.append(VDMS_DESC_SET_DIM_PROP); - results["list"] = list_arr; + Json::Value constraints; + Json::Value name_arr; + name_arr.append("=="); + name_arr.append(set_name); + constraints[VDMS_DESC_SET_NAME_PROP] = name_arr; - Json::Value constraints; - Json::Value name_arr; - name_arr.append("=="); - name_arr.append(set_name); - constraints[VDMS_DESC_SET_NAME_PROP] = name_arr; + bool unique = true; - bool unique = true; + // Query set node + query.QueryNode(set_ref, VDMS_DESC_SET_TAG, link, constraints, results, + unique); - // Query set node - query.QueryNode(set_ref, VDMS_DESC_SET_TAG, link, constraints, results, - unique); + if (cmd.isMember("link")) { + add_link(query, cmd["link"], node_ref, VDMS_DESC_EDGE_TAG); + } + + Json::Value props_edge; + query.AddEdge(-1, set_ref, node_ref, VDMS_DESC_SET_EDGE_TAG, props_edge); + + // TODO: deleting files here causes problems with concurrency (TestRetail.py) + // keeping local copies as a temporary solution + // if(_use_aws_storage) + // { + // //delete files in set_path + // std::uintmax_t n = fs::remove_all(set_path); + // std::cout << "Deleted " << n << " files or directories\n"; + // } + + return 0; + +} + +int AddDescriptor::add_descriptor_batch(PMGDQuery &query, + const Json::Value &jsoncmd, + const std::string &blob, int grp_id, + Json::Value &error){ + +} + +int AddDescriptor::construct_protobuf(PMGDQuery &query, + const Json::Value &jsoncmd, + const std::string &blob, int grp_id, + Json::Value &error) { + + bool batch_mode; + int rc; + const Json::Value &cmd = jsoncmd[_cmd_name]; + const std::string set_name = cmd["set"].asString(); + + + Json::Value prop_list = get_value(cmd, "propertieslist"); + if(prop_list.size() == 0){ + //todo check for _ref usage + batch_mode = false; + rc = add_single_descriptor(query, jsoncmd, blob, grp_id, error); + + } else { + printf("batch mode not implemented\n"); + exit(0); - if (cmd.isMember("link")) { - add_link(query, cmd["link"], node_ref, VDMS_DESC_EDGE_TAG); } - Json::Value props_edge; - query.AddEdge(-1, set_ref, node_ref, VDMS_DESC_SET_EDGE_TAG, props_edge); + /* + printf("property list size: %d\n", prop_list.size()); + for(Json::Value::ArrayIndex i = 0; i < prop_list.size(); i++){ + Json::Value prop_dict = prop_list[i]; + std::cout< _cache_map; static tbb::concurrent_unordered_map - _desc_set_locator; + _desc_set_locator; static tbb::concurrent_unordered_map _desc_set_dims; // Will return the path to the set and the dimensions @@ -132,6 +132,14 @@ class AddDescriptor : public DescriptorsCommand { void retrieve_aws_descriptorSet(const std::string &set_path); + int add_single_descriptor(PMGDQuery &tx, const Json::Value &root, + const std::string &blob, int grp_id, + Json::Value &error); + + int add_descriptor_batch(PMGDQuery &tx, const Json::Value &root, + const std::string &blob, int grp_id, + Json::Value &error); + public: AddDescriptor(); diff --git a/src/QueryHandlerPMGD.cc b/src/QueryHandlerPMGD.cc index 66a1cfb6..33f97e12 100644 --- a/src/QueryHandlerPMGD.cc +++ b/src/QueryHandlerPMGD.cc @@ -63,8 +63,7 @@ std::unordered_map QueryHandlerPMGD::_rs_cmds; // DescriptorCommand.h tbb::concurrent_unordered_map DescriptorsCommand::_desc_set_locator; -tbb::concurrent_unordered_map - DescriptorsCommand::_desc_set_dims; +tbb::concurrent_unordered_map DescriptorsCommand::_desc_set_dims; void QueryHandlerPMGD::init() { DescriptorsManager::init(); @@ -131,6 +130,7 @@ QueryHandlerPMGD::QueryHandlerPMGD() bool QueryHandlerPMGD::syntax_checker(const Json::Value &root, Json::Value &error) { + printf("Syntax Checker\n"); valijson::ValidationResults results; valijson::adapters::JsonCppAdapter user_query(root); if (!_validator.validate(*_schema, user_query, &results)) { @@ -199,7 +199,8 @@ bool QueryHandlerPMGD::syntax_checker(const Json::Value &root, int QueryHandlerPMGD::parse_commands(const protobufs::queryMessage &proto_query, Json::Value &root) { - Json::Reader reader; + printf("Parse commands Checker\n"); + Json::Reader reader; const std::string commands = proto_query.json(); try { @@ -244,12 +245,12 @@ int QueryHandlerPMGD::parse_commands(const protobufs::queryMessage &proto_query, root["status"] = RSCommand::Error; return -1; } - + printf("Parse COmmands complete\n"); return 0; } void QueryHandlerPMGD::process_query(protobufs::queryMessage &proto_query, - protobufs::queryMessage &proto_res) { + protobufs::queryMessage &proto_res) { //TODO Investigate why/where json throwing Json::FastWriter fastWriter; Json::Value root; @@ -297,18 +298,20 @@ void QueryHandlerPMGD::process_query(protobufs::queryMessage &proto_query, Json::StyledWriter w; std::cerr << w.write(json_responses); }; - + printf("Parse Command Pre-\n"); if (parse_commands(proto_query, root) != 0) { cmd_current = "Transaction"; error(root, cmd_current); return; } + printf("Parse Commands Completed, back in proc-query\n"); PMGDQuery pmgd_query(_pmgd_qh); int blob_count = 0; // iterate over the list of the queries for (int j = 0; j < root.size(); j++) { + printf("Iterating over incoming queries: %d...\n", j); const Json::Value &query = root[j]; std::string cmd = query.getMemberNames()[0]; @@ -342,9 +345,11 @@ void QueryHandlerPMGD::process_query(protobufs::queryMessage &proto_query, construct_results.push_back(cmd_result); } + printf("Running PMGD Query\n"); timers.add_timestamp("pmgd_query_time"); Json::Value &tx_responses = pmgd_query.run(_autodelete_init); timers.add_timestamp("pmgd_query_time"); + printf("PMGD query Complete\n"); if (!tx_responses.isArray() || tx_responses.size() != root.size()) { Json::StyledWriter writer; @@ -365,6 +370,7 @@ void QueryHandlerPMGD::process_query(protobufs::queryMessage &proto_query, } else { blob_count = 0; for (int j = 0; j < root.size(); j++) { + printf("Response iteration: %d\n", j); Json::Value &query = root[j]; std::string cmd = query.getMemberNames()[0]; @@ -400,8 +406,10 @@ void QueryHandlerPMGD::process_query(protobufs::queryMessage &proto_query, if (output_query_level_timing) { timers.print_map_runtimes(); } + printf("Writing JSON responses...\n"); proto_res.set_json(fastWriter.write(json_responses)); _pmgd_qh.cleanup_files(); + printf("Cleaning up!\n"); } catch (VCL::Exception &e) { print_exception(e); diff --git a/utils/src/api_schema/api_schema.json b/utils/src/api_schema/api_schema.json index 9725c663..0057afbf 100644 --- a/utils/src/api_schema/api_schema.json +++ b/utils/src/api_schema/api_schema.json @@ -75,6 +75,11 @@ "definitions": { // misc + "propertyArray": { + "type": "array", + "items": {"type" : "object"}, + "minimum" : 1 + }, "positiveInt": { "type": "integer", @@ -744,7 +749,8 @@ "label": { "type": "string" }, "_ref": { "$ref": "#/definitions/refInt" }, "link": { "$ref": "#/definitions/blockLink" }, - "properties": { "type": "object" } + "properties": { "type": "object" }, + "propertieslist": {"$ref": "#/definitions/propertyArray"} }, "required": ["set"], "additionalProperties": false From b1e141ae63fb738e77fcff5e03dc7f2cfa3b1c3c Mon Sep 17 00:00:00 2001 From: Ian Adams Date: Mon, 5 Aug 2024 13:47:48 -0700 Subject: [PATCH 02/10] Initial batch development compiles and superficially functioning, needs refinement and verification testing --- src/DescriptorsCommand.cc | 156 +++++++++++++++++++++++++++++++++----- 1 file changed, 135 insertions(+), 21 deletions(-) diff --git a/src/DescriptorsCommand.cc b/src/DescriptorsCommand.cc index bdf8cb8f..363ba151 100644 --- a/src/DescriptorsCommand.cc +++ b/src/DescriptorsCommand.cc @@ -341,8 +341,9 @@ AddDescriptor::AddDescriptor() : DescriptorsCommand("AddDescriptor") { //_use_aws_storage = VDMSConfig::instance()->get_aws_flag(); } +//update to handle multiple descriptors at a go long AddDescriptor::insert_descriptor(const std::string &blob, - const std::string &set_path, int dim, + const std::string &set_path, int nr_desc, const std::string &label, Json::Value &error) { long id_first; @@ -351,21 +352,23 @@ long AddDescriptor::insert_descriptor(const std::string &blob, VCL::DescriptorSet *desc_set = _dm->get_descriptors_handler(set_path); - if (blob.length() / 4 != dim) { + //TODO this check no longer applies, should move it elsewhere + /*if (blob.length() / 4 != dim) { std::cerr << "AddDescriptor::insert_descriptor: "; std::cerr << "Dimensions mismatch: "; std::cerr << blob.length() / 4 << " " << dim << std::endl; error["info"] = "Blob Dimensions Mismatch"; return -1; - } + }*/ + if (!label.empty()) { long label_id = desc_set->get_label_id(label); long *label_ptr = &label_id; - id_first = desc_set->add((float *)blob.data(), 1, label_ptr); + id_first = desc_set->add((float *)blob.data(), nr_desc, label_ptr); } else { - id_first = desc_set->add((float *)blob.data(), 1); + id_first = desc_set->add((float *)blob.data(), nr_desc); } if (output_vcl_timing) { @@ -422,8 +425,8 @@ int AddDescriptor::add_single_descriptor(PMGDQuery &query, std::string label = get_value(cmd, "label", "None"); props[VDMS_DESC_LABEL_PROP] = label; - int dimensions; - const std::string set_path = get_set_path(query, set_name, dimensions); + int dim; + const std::string set_path = get_set_path(query, set_name, dim); if (set_path.empty()) { error["info"] = "Set " + set_name + " not found"; @@ -431,6 +434,14 @@ int AddDescriptor::add_single_descriptor(PMGDQuery &query, return -1; } + if (blob.length() / 4 != dim) { + std::cerr << "AddDescriptor::insert_descriptor: "; + std::cerr << "Dimensions mismatch: "; + std::cerr << blob.length() / 4 << " " << dim << std::endl; + error["info"] = "Blob Dimensions Mismatch"; + return -1; + } + // retrieve the descriptor set from AWS here // operations are currently done in memory with no subsequent write to disk // so there's no need to re-upload to AWS @@ -438,8 +449,8 @@ int AddDescriptor::add_single_descriptor(PMGDQuery &query, retrieve_aws_descriptorSet(set_path); } - //TODO modify descriptor - long id = insert_descriptor(blob, set_path, dimensions, label, error); + //TODO modify insert descriptor to handle batches + long id = insert_descriptor(blob, set_path, 1, label, error); if (id < 0) { error["status"] = RSCommand::Error; @@ -506,6 +517,117 @@ int AddDescriptor::add_descriptor_batch(PMGDQuery &query, const std::string &blob, int grp_id, Json::Value &error){ + int expected_blb_size; + int nr_expected_descs; + int dimensions; + + //Extract set name + const Json::Value &cmd = jsoncmd[_cmd_name]; + const std::string set_name = cmd["set"].asString(); + + //extract properties list and get filepath/object location of set + Json::Value prop_list = get_value(cmd, "propertieslist"); + const std::string set_path = get_set_path(query, set_name, dimensions); + + if (set_path.empty()) { + error["info"] = "Set " + set_name + " not found"; + error["status"] = RSCommand::Error; + return -1; + } + + std::string label = get_value(cmd, "label", "None"); + + // retrieve the descriptor set from AWS here + // operations are currently done in memory with no subsequent write to disk + // so there's no need to re-upload to AWS + if (_use_aws_storage) { + retrieve_aws_descriptorSet(set_path); + } + + // Note dimensionse are based on a 32 bit integer, hence the /4 math on size + // as the string blob is sized in 8 bit ints. + nr_expected_descs = prop_list.size(); + expected_blb_size = nr_expected_descs * dimensions * 4; + + //Verify length of input is matching expectations + if (blob.length() != expected_blb_size) { + std::cerr << "AddDescriptor::insert_descriptor: "; + std::cerr << "Expectected Blob Length Does Not Match Input "; + std::cerr << blob.length() << " != " << expected_blb_size << std::endl; + error["info"] = "FV Input Length Mismatch"; + return -1; + } + + //TODO modify insert descriptor to handle batches + long id = insert_descriptor(blob, set_path, nr_expected_descs, label, error); + + if (id < 0) { + error["status"] = RSCommand::Error; + + if (_use_aws_storage) { + // delete files in set_path + std::uintmax_t n = fs::remove_all(set_path); + std::cout << "Deleted " << n << " files or directories\n"; + } + + return -1; + } + + //get reference tag for source node for ID + + + // Loop over properties list, add relevant query, link, and edges for each + for(int i=0; i < nr_expected_descs; i++) { + int node_ref = query.get_available_reference(); + Json::Value cur_props; + cur_props = prop_list[i]; + //TODO Note using iterator to modify ID return, we're gonna want to watch this closely. + cur_props[VDMS_DESC_ID_PROP] = Json::Int64(id+i); + + + query.AddNode(node_ref, VDMS_DESC_TAG, cur_props, Json::nullValue); + + // It passed the checker, so it exists. + int set_ref = query.get_available_reference(); + + Json::Value link; + Json::Value results; + Json::Value list_arr; + list_arr.append(VDMS_DESC_SET_PATH_PROP); + list_arr.append(VDMS_DESC_SET_DIM_PROP); + results["list"] = list_arr; + + //constraints for getting set node to link to. + Json::Value constraints; + Json::Value name_arr; + name_arr.append("=="); + name_arr.append(set_name); + constraints[VDMS_DESC_SET_NAME_PROP] = name_arr; + + bool unique = true; + + // Query set node-We only need to do this once, outside of the loop TODO MOVE + query.QueryNode(set_ref, VDMS_DESC_SET_TAG, link, constraints, results, + unique); + + //note this implicitly means that every node of a batch uses the same link + if (cmd.isMember("link")) { + add_link(query, cmd["link"], node_ref, VDMS_DESC_EDGE_TAG); + } + + Json::Value props_edge; + query.AddEdge(-1, set_ref, node_ref, VDMS_DESC_SET_EDGE_TAG, props_edge); + } + + + /* TODO example iteration over properties list + * TODO update API to call field "batch_properties" + printf("property list size: %d\n", prop_list.size()); + for(Json::Value::ArrayIndex i = 0; i < prop_list.size(); i++){ + Json::Value prop_dict = prop_list[i]; + std::cout<(cmd, "propertieslist"); if(prop_list.size() == 0){ - //todo check for _ref usage - batch_mode = false; + printf("Adding Single Descriptor\n"); rc = add_single_descriptor(query, jsoncmd, blob, grp_id, error); - } else { - printf("batch mode not implemented\n"); - exit(0); - + printf("Adding Descriptor Batch\n"); + rc = add_descriptor_batch(query, jsoncmd, blob, grp_id, error); } - /* - printf("property list size: %d\n", prop_list.size()); - for(Json::Value::ArrayIndex i = 0; i < prop_list.size(); i++){ - Json::Value prop_dict = prop_list[i]; - std::cout< Date: Thu, 15 Aug 2024 13:23:35 -0700 Subject: [PATCH 03/10] batch inserts working --- src/DescriptorsCommand.cc | 10 ++- tests/python/TestDescriptors.py | 113 ++++++++++++++++++++++++++++++++ 2 files changed, 120 insertions(+), 3 deletions(-) diff --git a/src/DescriptorsCommand.cc b/src/DescriptorsCommand.cc index 363ba151..a15ced85 100644 --- a/src/DescriptorsCommand.cc +++ b/src/DescriptorsCommand.cc @@ -574,7 +574,7 @@ int AddDescriptor::add_descriptor_batch(PMGDQuery &query, } //get reference tag for source node for ID - + printf("Base ID for insertion: %d\n", id); // Loop over properties list, add relevant query, link, and edges for each for(int i=0; i < nr_expected_descs; i++) { @@ -650,6 +650,7 @@ int AddDescriptor::construct_protobuf(PMGDQuery &query, rc = add_descriptor_batch(query, jsoncmd, blob, grp_id, error); } + if(rc < 0) error["status"] = RSCommand::Error; return rc; @@ -849,7 +850,7 @@ int FindDescriptor::construct_protobuf(PMGDQuery &query, // Case (1) if (cmd.isMember("link")) { - + printf("Link Case for FindDesc\n"); // Query for the Descriptors related to user-defined link // that match the user-defined constraints // We will need to do the AND operation @@ -863,13 +864,15 @@ int FindDescriptor::construct_protobuf(PMGDQuery &query, Json::Value link_to_desc; link_to_desc["ref"] = desc_ref; - // Query for the set + // Query for the set RESET TO UNIQUE FOR BOOLEAN FALSE query.QueryNode(-1, VDMS_DESC_SET_TAG, link_to_desc, constraints_set, results_set, unique); } // Case (2) else if (!cmd.isMember("k_neighbors")) { + printf("Regular Case for Find Desc\n"); + // In this case, we either need properties of the descriptor // ("list") on the results block, or we need the descriptor nodes // because the user defined a reference. @@ -892,6 +895,7 @@ int FindDescriptor::construct_protobuf(PMGDQuery &query, } // Case (3), Just want the descriptor by value, we only need the set else { + printf("KNN Case\n"); Json::Value link_null; // null const int k_neighbors = get_value(cmd, "k_neighbors", 0); diff --git a/tests/python/TestDescriptors.py b/tests/python/TestDescriptors.py index 0d6d4be2..7abe8030 100644 --- a/tests/python/TestDescriptors.py +++ b/tests/python/TestDescriptors.py @@ -206,6 +206,119 @@ def test_addSetAndDescriptorsDimMismatch(self): self.disconnect(db) + def test_AddSetAndWrongBatchSize(self): + + + db = self.create_connection() + + # Create and verify descriptor set + trans_list = [] + trans_dict = {} + desc_set = {} + desc_set["engine"] = "FaissFlat" + desc_set["metric"] = "L2" + desc_set["name"] = "wrongbatchsize" + desc_set["dimensions"] = 128 + trans_dict["AddDescriptorSet"] = desc_set + + trans_list.append(trans_dict) + + response, img_array = db.query(trans_list) + self.assertEqual(response[0]["AddDescriptorSet"]["status"],0) + + # Create and add a batch of feature vectors + trans = [] + blobs = [] + nr_dims = 128 + batch_size = 10 + desc_blob = [] + x = np.zeros(nr_dims * batch_size) + x = x.astype("float32") + desc_blob.append(x.tobytes()) + + properties_list=[] + for x in range(batch_size + 3): + props = {"batchprop": x} + properties_list.append(props) + + descriptor = {} + descriptor["set"] = "wrongbatchsize" + descriptor["propertieslist"] = properties_list + query = {} + query["AddDescriptor"] = descriptor + trans.append(query) + blobs.append(desc_blob) + + response, img_array = db.query(trans, blobs) + self.assertEqual(response[0]["info"], "FV Input Length Mismatch") + self.assertEqual(response[0]["status"], -1) + + self.disconnect(db) + + def test_AddSetAndInsertBatch(self): + + db = self.create_connection() + + # Create and verify descriptor set + trans_list = [] + trans_dict = {} + desc_set = {} + desc_set["engine"] = "FaissFlat" + desc_set["metric"] = "L2" + desc_set["name"] = "rightbatchsize" + desc_set["dimensions"] = 128 + trans_dict["AddDescriptorSet"] = desc_set + + trans_list.append(trans_dict) + + response, img_array = db.query(trans_list) + self.assertEqual(response[0]["AddDescriptorSet"]["status"],0) + + # Create and add a batch of feature vectors + trans = [] + blobs = [] + nr_dims = 128 + batch_size = 10 + desc_blob = [] + x = np.zeros(nr_dims * batch_size) + x = x.astype("float32") + desc_blob.append(x.tobytes()) + + properties_list=[] + for x in range(batch_size): + props = {"batchprop": x} + properties_list.append(props) + + descriptor = {} + descriptor["set"] = "rightbatchsize" + descriptor["propertieslist"] = properties_list + query = {} + query["AddDescriptor"] = descriptor + trans.append(query) + blobs.append(desc_blob) + + response, img_array = db.query(trans, blobs) + print(response) + self.assertEqual(response[0]["AddDescriptor"]["status"], 0) + + # now try to get those same descriptors back + desc_find = {} + desc_find["set"] = "rightbatchsize" + desc_find["results"] = {"list":["batchprop"]} + + query = {} + query["FindDescriptor"] = desc_find + + trans = [] + blobs = [] + trans.append(query) + response, img_array = db.query(trans, blobs) + print(response) + + self.disconnect(db) + + + def test_classifyDescriptor(self): db = self.create_connection() set_name = "features_128d_4_classify" From 753c9d2eb6a3af45fbb0fec00a8c5b345291aa44 Mon Sep 17 00:00:00 2001 From: Ian Adams Date: Fri, 16 Aug 2024 10:46:06 -0700 Subject: [PATCH 04/10] cruft cleanup, renamed batch field to batch properties --- src/DescriptorsCommand.cc | 23 +++-------------------- tests/python/TestDescriptors.py | 7 +++---- utils/src/api_schema/api_schema.json | 2 +- 3 files changed, 7 insertions(+), 25 deletions(-) diff --git a/src/DescriptorsCommand.cc b/src/DescriptorsCommand.cc index a15ced85..bbaec89e 100644 --- a/src/DescriptorsCommand.cc +++ b/src/DescriptorsCommand.cc @@ -526,7 +526,7 @@ int AddDescriptor::add_descriptor_batch(PMGDQuery &query, const std::string set_name = cmd["set"].asString(); //extract properties list and get filepath/object location of set - Json::Value prop_list = get_value(cmd, "propertieslist"); + Json::Value prop_list = get_value(cmd, "batch_properties"); const std::string set_path = get_set_path(query, set_name, dimensions); if (set_path.empty()) { @@ -574,8 +574,6 @@ int AddDescriptor::add_descriptor_batch(PMGDQuery &query, } //get reference tag for source node for ID - printf("Base ID for insertion: %d\n", id); - // Loop over properties list, add relevant query, link, and edges for each for(int i=0; i < nr_expected_descs; i++) { int node_ref = query.get_available_reference(); @@ -619,14 +617,6 @@ int AddDescriptor::add_descriptor_batch(PMGDQuery &query, query.AddEdge(-1, set_ref, node_ref, VDMS_DESC_SET_EDGE_TAG, props_edge); } - - /* TODO example iteration over properties list - * TODO update API to call field "batch_properties" - printf("property list size: %d\n", prop_list.size()); - for(Json::Value::ArrayIndex i = 0; i < prop_list.size(); i++){ - Json::Value prop_dict = prop_list[i]; - std::cout<(cmd, "propertieslist"); + Json::Value prop_list = get_value(cmd, "batch_properties"); if(prop_list.size() == 0){ - printf("Adding Single Descriptor\n"); rc = add_single_descriptor(query, jsoncmd, blob, grp_id, error); } else { - printf("Adding Descriptor Batch\n"); rc = add_descriptor_batch(query, jsoncmd, blob, grp_id, error); } @@ -850,7 +838,6 @@ int FindDescriptor::construct_protobuf(PMGDQuery &query, // Case (1) if (cmd.isMember("link")) { - printf("Link Case for FindDesc\n"); // Query for the Descriptors related to user-defined link // that match the user-defined constraints // We will need to do the AND operation @@ -864,15 +851,12 @@ int FindDescriptor::construct_protobuf(PMGDQuery &query, Json::Value link_to_desc; link_to_desc["ref"] = desc_ref; - // Query for the set RESET TO UNIQUE FOR BOOLEAN FALSE + // Query for the set query.QueryNode(-1, VDMS_DESC_SET_TAG, link_to_desc, constraints_set, results_set, unique); } // Case (2) else if (!cmd.isMember("k_neighbors")) { - - printf("Regular Case for Find Desc\n"); - // In this case, we either need properties of the descriptor // ("list") on the results block, or we need the descriptor nodes // because the user defined a reference. @@ -895,7 +879,6 @@ int FindDescriptor::construct_protobuf(PMGDQuery &query, } // Case (3), Just want the descriptor by value, we only need the set else { - printf("KNN Case\n"); Json::Value link_null; // null const int k_neighbors = get_value(cmd, "k_neighbors", 0); diff --git a/tests/python/TestDescriptors.py b/tests/python/TestDescriptors.py index 7abe8030..cee23db9 100644 --- a/tests/python/TestDescriptors.py +++ b/tests/python/TestDescriptors.py @@ -243,7 +243,7 @@ def test_AddSetAndWrongBatchSize(self): descriptor = {} descriptor["set"] = "wrongbatchsize" - descriptor["propertieslist"] = properties_list + descriptor["batch_properties"] = properties_list query = {} query["AddDescriptor"] = descriptor trans.append(query) @@ -291,7 +291,7 @@ def test_AddSetAndInsertBatch(self): descriptor = {} descriptor["set"] = "rightbatchsize" - descriptor["propertieslist"] = properties_list + descriptor["batch_properties"] = properties_list query = {} query["AddDescriptor"] = descriptor trans.append(query) @@ -313,12 +313,11 @@ def test_AddSetAndInsertBatch(self): blobs = [] trans.append(query) response, img_array = db.query(trans, blobs) - print(response) + #print(response) self.disconnect(db) - def test_classifyDescriptor(self): db = self.create_connection() set_name = "features_128d_4_classify" diff --git a/utils/src/api_schema/api_schema.json b/utils/src/api_schema/api_schema.json index 0057afbf..5b337e78 100644 --- a/utils/src/api_schema/api_schema.json +++ b/utils/src/api_schema/api_schema.json @@ -750,7 +750,7 @@ "_ref": { "$ref": "#/definitions/refInt" }, "link": { "$ref": "#/definitions/blockLink" }, "properties": { "type": "object" }, - "propertieslist": {"$ref": "#/definitions/propertyArray"} + "batch_properties": {"$ref": "#/definitions/propertyArray"} }, "required": ["set"], "additionalProperties": false From a355336040854fb57fd0bee5141b8fe30966bc20 Mon Sep 17 00:00:00 2001 From: Ian Adams Date: Fri, 16 Aug 2024 10:49:46 -0700 Subject: [PATCH 05/10] additional debug statement removal --- src/QueryHandlerPMGD.cc | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/src/QueryHandlerPMGD.cc b/src/QueryHandlerPMGD.cc index 33f97e12..2c17895d 100644 --- a/src/QueryHandlerPMGD.cc +++ b/src/QueryHandlerPMGD.cc @@ -130,7 +130,6 @@ QueryHandlerPMGD::QueryHandlerPMGD() bool QueryHandlerPMGD::syntax_checker(const Json::Value &root, Json::Value &error) { - printf("Syntax Checker\n"); valijson::ValidationResults results; valijson::adapters::JsonCppAdapter user_query(root); if (!_validator.validate(*_schema, user_query, &results)) { @@ -199,7 +198,6 @@ bool QueryHandlerPMGD::syntax_checker(const Json::Value &root, int QueryHandlerPMGD::parse_commands(const protobufs::queryMessage &proto_query, Json::Value &root) { - printf("Parse commands Checker\n"); Json::Reader reader; const std::string commands = proto_query.json(); @@ -245,7 +243,6 @@ int QueryHandlerPMGD::parse_commands(const protobufs::queryMessage &proto_query, root["status"] = RSCommand::Error; return -1; } - printf("Parse COmmands complete\n"); return 0; } @@ -298,20 +295,19 @@ void QueryHandlerPMGD::process_query(protobufs::queryMessage &proto_query, Json::StyledWriter w; std::cerr << w.write(json_responses); }; - printf("Parse Command Pre-\n"); + if (parse_commands(proto_query, root) != 0) { cmd_current = "Transaction"; error(root, cmd_current); return; } - printf("Parse Commands Completed, back in proc-query\n"); PMGDQuery pmgd_query(_pmgd_qh); int blob_count = 0; // iterate over the list of the queries for (int j = 0; j < root.size(); j++) { - printf("Iterating over incoming queries: %d...\n", j); + const Json::Value &query = root[j]; std::string cmd = query.getMemberNames()[0]; @@ -345,11 +341,11 @@ void QueryHandlerPMGD::process_query(protobufs::queryMessage &proto_query, construct_results.push_back(cmd_result); } - printf("Running PMGD Query\n"); + timers.add_timestamp("pmgd_query_time"); Json::Value &tx_responses = pmgd_query.run(_autodelete_init); timers.add_timestamp("pmgd_query_time"); - printf("PMGD query Complete\n"); + if (!tx_responses.isArray() || tx_responses.size() != root.size()) { Json::StyledWriter writer; @@ -370,7 +366,6 @@ void QueryHandlerPMGD::process_query(protobufs::queryMessage &proto_query, } else { blob_count = 0; for (int j = 0; j < root.size(); j++) { - printf("Response iteration: %d\n", j); Json::Value &query = root[j]; std::string cmd = query.getMemberNames()[0]; @@ -406,10 +401,10 @@ void QueryHandlerPMGD::process_query(protobufs::queryMessage &proto_query, if (output_query_level_timing) { timers.print_map_runtimes(); } - printf("Writing JSON responses...\n"); + proto_res.set_json(fastWriter.write(json_responses)); _pmgd_qh.cleanup_files(); - printf("Cleaning up!\n"); + } catch (VCL::Exception &e) { print_exception(e); From 1f8186be5e19d7bc536dc9f6582f281a8bbcf508 Mon Sep 17 00:00:00 2001 From: Ian Adams Date: Mon, 19 Aug 2024 13:36:57 -0700 Subject: [PATCH 06/10] Fixed bug on batch insert, was not correctly including descriptor labels as part of metadata --- src/DescriptorsCommand.cc | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/DescriptorsCommand.cc b/src/DescriptorsCommand.cc index bbaec89e..a74d77aa 100644 --- a/src/DescriptorsCommand.cc +++ b/src/DescriptorsCommand.cc @@ -525,6 +525,8 @@ int AddDescriptor::add_descriptor_batch(PMGDQuery &query, const Json::Value &cmd = jsoncmd[_cmd_name]; const std::string set_name = cmd["set"].asString(); + Json::Value props = get_value(cmd, "properties"); + //extract properties list and get filepath/object location of set Json::Value prop_list = get_value(cmd, "batch_properties"); const std::string set_path = get_set_path(query, set_name, dimensions); @@ -536,6 +538,7 @@ int AddDescriptor::add_descriptor_batch(PMGDQuery &query, } std::string label = get_value(cmd, "label", "None"); + props[VDMS_DESC_LABEL_PROP] = label; // retrieve the descriptor set from AWS here // operations are currently done in memory with no subsequent write to disk @@ -581,7 +584,7 @@ int AddDescriptor::add_descriptor_batch(PMGDQuery &query, cur_props = prop_list[i]; //TODO Note using iterator to modify ID return, we're gonna want to watch this closely. cur_props[VDMS_DESC_ID_PROP] = Json::Int64(id+i); - + cur_props[VDMS_DESC_LABEL_PROP] = label; query.AddNode(node_ref, VDMS_DESC_TAG, cur_props, Json::nullValue); From 374a3ecf9baebfb67c307abbd60b2819c1d47619 Mon Sep 17 00:00:00 2001 From: Ian Adams Date: Mon, 19 Aug 2024 13:46:10 -0700 Subject: [PATCH 07/10] tweaked test after last fix --- tests/python/TestDescriptors.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/python/TestDescriptors.py b/tests/python/TestDescriptors.py index cee23db9..edc705d1 100644 --- a/tests/python/TestDescriptors.py +++ b/tests/python/TestDescriptors.py @@ -298,7 +298,6 @@ def test_AddSetAndInsertBatch(self): blobs.append(desc_blob) response, img_array = db.query(trans, blobs) - print(response) self.assertEqual(response[0]["AddDescriptor"]["status"], 0) # now try to get those same descriptors back @@ -313,7 +312,7 @@ def test_AddSetAndInsertBatch(self): blobs = [] trans.append(query) response, img_array = db.query(trans, blobs) - #print(response) + self.assertEqual(response[0]["FindDescriptor"]["returned"],10) self.disconnect(db) From 82a979fc48237f6855bd79651ef2db3735f62d7d Mon Sep 17 00:00:00 2001 From: Ian Adams Date: Thu, 22 Aug 2024 13:39:29 -0700 Subject: [PATCH 08/10] test updates, continuing cleanup --- src/DescriptorsCommand.cc | 81 +++++++++++++++++-------------- tests/python/TestDescriptors.py | 86 +++++++++++++++++++++++++++++++++ 2 files changed, 130 insertions(+), 37 deletions(-) diff --git a/src/DescriptorsCommand.cc b/src/DescriptorsCommand.cc index a74d77aa..91bc38c2 100644 --- a/src/DescriptorsCommand.cc +++ b/src/DescriptorsCommand.cc @@ -352,16 +352,6 @@ long AddDescriptor::insert_descriptor(const std::string &blob, VCL::DescriptorSet *desc_set = _dm->get_descriptors_handler(set_path); - //TODO this check no longer applies, should move it elsewhere - /*if (blob.length() / 4 != dim) { - std::cerr << "AddDescriptor::insert_descriptor: "; - std::cerr << "Dimensions mismatch: "; - std::cerr << blob.length() / 4 << " " << dim << std::endl; - error["info"] = "Blob Dimensions Mismatch"; - return -1; - }*/ - - if (!label.empty()) { long label_id = desc_set->get_label_id(label); long *label_ptr = &label_id; @@ -449,7 +439,6 @@ int AddDescriptor::add_single_descriptor(PMGDQuery &query, retrieve_aws_descriptorSet(set_path); } - //TODO modify insert descriptor to handle batches long id = insert_descriptor(blob, set_path, 1, label, error); if (id < 0) { @@ -517,6 +506,7 @@ int AddDescriptor::add_descriptor_batch(PMGDQuery &query, const std::string &blob, int grp_id, Json::Value &error){ + const int FOUR_BYTE_INT = 4; int expected_blb_size; int nr_expected_descs; int dimensions; @@ -525,7 +515,7 @@ int AddDescriptor::add_descriptor_batch(PMGDQuery &query, const Json::Value &cmd = jsoncmd[_cmd_name]; const std::string set_name = cmd["set"].asString(); - Json::Value props = get_value(cmd, "properties"); + //Json::Value props = get_value(cmd, "properties"); //extract properties list and get filepath/object location of set Json::Value prop_list = get_value(cmd, "batch_properties"); @@ -538,7 +528,7 @@ int AddDescriptor::add_descriptor_batch(PMGDQuery &query, } std::string label = get_value(cmd, "label", "None"); - props[VDMS_DESC_LABEL_PROP] = label; + //props[VDMS_DESC_LABEL_PROP] = label; // retrieve the descriptor set from AWS here // operations are currently done in memory with no subsequent write to disk @@ -550,18 +540,17 @@ int AddDescriptor::add_descriptor_batch(PMGDQuery &query, // Note dimensionse are based on a 32 bit integer, hence the /4 math on size // as the string blob is sized in 8 bit ints. nr_expected_descs = prop_list.size(); - expected_blb_size = nr_expected_descs * dimensions * 4; + expected_blb_size = nr_expected_descs * dimensions * FOUR_BYTE_INT; //Verify length of input is matching expectations if (blob.length() != expected_blb_size) { std::cerr << "AddDescriptor::insert_descriptor: "; - std::cerr << "Expectected Blob Length Does Not Match Input "; - std::cerr << blob.length() << " != " << expected_blb_size << std::endl; + std::cerr << "Expected Blob Length Does Not Match Input "; + std::cerr << "Input Length: " < Date: Mon, 26 Aug 2024 11:49:25 -0700 Subject: [PATCH 09/10] typo in cout, fixed --- src/DescriptorsCommand.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/DescriptorsCommand.cc b/src/DescriptorsCommand.cc index 91bc38c2..1276333c 100644 --- a/src/DescriptorsCommand.cc +++ b/src/DescriptorsCommand.cc @@ -546,7 +546,7 @@ int AddDescriptor::add_descriptor_batch(PMGDQuery &query, if (blob.length() != expected_blb_size) { std::cerr << "AddDescriptor::insert_descriptor: "; std::cerr << "Expected Blob Length Does Not Match Input "; - std::cerr << "Input Length: " < Date: Thu, 29 Aug 2024 13:42:22 -0700 Subject: [PATCH 10/10] fixed minor typo --- tests/python/TestDescriptors.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/python/TestDescriptors.py b/tests/python/TestDescriptors.py index 327fc957..0488950a 100644 --- a/tests/python/TestDescriptors.py +++ b/tests/python/TestDescriptors.py @@ -385,7 +385,7 @@ def test_AddBatchAndFindKNN(self): descriptor_blob = [] x = np.ones(128) - x[2] = x[2] = 2.34 + 1 * 20 # 2.34 + 1*20 + x[2] = 2.34 + 1 * 20 # 2.34 + 1*20 x = x.astype("float32") descriptor_blob.append(x.tobytes())