diff --git a/config-vdms.json b/config-vdms.json index c3df33e6..23ea1b68 100644 --- a/config-vdms.json +++ b/config-vdms.json @@ -6,6 +6,10 @@ "port": 55555, // Default is 55555 "max_simultaneous_clients": 20, // Default is 500 + // Tune the number of maximum attempts when acquiring the + // reader writer lock for metadata changes. + "max_lock_attempts": 10, + // Database paths "pmgd_path": "db/graph", // This will be an IP address in the future "png_path": "db/images/pngs/", diff --git a/src/PMGDQueryHandler.cc b/src/PMGDQueryHandler.cc index 19a116d2..5813bdc0 100644 --- a/src/PMGDQueryHandler.cc +++ b/src/PMGDQueryHandler.cc @@ -49,12 +49,24 @@ void PMGDQueryHandler::init() { std::string dbname = VDMSConfig::instance() ->get_string_value("pmgd_path", "default_pmgd"); + unsigned attempts = VDMSConfig::instance() + ->get_int_value("max_lock_attempts", RWLock::MAX_ATTEMPTS); // Create a db _db = new PMGD::Graph(dbname.c_str(), PMGD::Graph::Create); // Create the query handler here assuming database is valid now. - _dblock = new RWLock(); + _dblock = new RWLock(attempts); +} + +void PMGDQueryHandler::destroy() +{ + if (_db) { + delete _db; + delete _dblock; + _db = NULL; + _dblock = NULL; + } } std::vector @@ -67,10 +79,20 @@ std::vector // Assuming one query handler handles one TX at a time. _readonly = readonly; - if (_readonly) - _dblock->read_lock(); - else - _dblock->write_lock(); + try { + if (_readonly) + _dblock->read_lock(); + else + _dblock->write_lock(); + } + catch (Exception e) { + PMGDCmdResponses &resp_v = responses[0]; + PMGDCmdResponse *response = new PMGDCmdResponse(); + set_response(response, PMGDCmdResponse::Exception, + e.name + std::string(": ") + e.msg); + resp_v.push_back(response); + return responses; + } for (const auto cmd : cmds) { PMGDCmdResponse *response = new PMGDCmdResponse(); @@ -364,6 +386,8 @@ int PMGDQueryHandler::query_node(const protobufs::QueryNode &qn, if (!bool(ni)) { set_response(response, PMGDCmdResponse::Empty, "Null search iterator\n"); + if (has_link) + start_ni->reset(); return -1; } @@ -391,6 +415,8 @@ int PMGDQueryHandler::query_node(const protobufs::QueryNode &qn, if (bool(*tni)) { // Not unique and that is an error here. set_response(response, PMGDCmdResponse::NotUnique, "Query response not unique\n"); + if (has_link) + start_ni->reset(); delete tni; return -1; } diff --git a/src/PMGDQueryHandler.h b/src/PMGDQueryHandler.h index dd65e0f3..edb97645 100644 --- a/src/PMGDQueryHandler.h +++ b/src/PMGDQueryHandler.h @@ -119,6 +119,7 @@ namespace VDMS { public: static void init(); + static void destroy(); PMGDQueryHandler() { _tx = NULL; _readonly = true; } // The vector here can contain just one JL command but will be surrounded by diff --git a/src/RWLock.h b/src/RWLock.h index 9b8cc5a1..a17799c3 100644 --- a/src/RWLock.h +++ b/src/RWLock.h @@ -52,8 +52,7 @@ namespace VDMS { // Backoff variables. // *** Tune experimentally static const size_t MIN_BACKOFF_DELAY = 100000; - static const size_t MAX_BACKOFF_DELAY = 50000000; - static const unsigned MAX_ATTEMPTS = 10; + static const size_t MAX_BACKOFF_DELAY = 500000000; uint16_t xadd(volatile uint16_t &m, uint16_t v) { return ::xadd(m, v); } @@ -61,6 +60,7 @@ namespace VDMS { { ::atomic_and(m, v); } volatile uint16_t _rw_lock; + const unsigned _max_attempts; // Ideas from here: https://geidav.wordpress.com/tag/exponential-back-off void backoff(size_t &cur_max_delay) @@ -80,7 +80,9 @@ namespace VDMS { } public: - RWLock() : _rw_lock(0) {} + static const unsigned MAX_ATTEMPTS = 10; + + RWLock(unsigned max_attempts) : _rw_lock(0), _max_attempts(max_attempts) {} void read_lock() { @@ -99,7 +101,7 @@ namespace VDMS { // Wait for any active writers while (_rw_lock & WRITE_LOCK) { - if (++attempts > MAX_ATTEMPTS) + if (++attempts > _max_attempts) throw ExceptionCommand(LockTimeout); backoff(cur_max_delay); } @@ -125,7 +127,7 @@ namespace VDMS { // Wait for any active readers while(_rw_lock & LOCK_READER_MASK) { - if (++attempts > MAX_ATTEMPTS) { + if (++attempts > _max_attempts) { atomic_and(_rw_lock, LOCK_READER_MASK); throw ExceptionCommand(LockTimeout); } @@ -136,7 +138,7 @@ namespace VDMS { // Wait for any active writers while (_rw_lock & WRITE_LOCK) { - if (++attempts > MAX_ATTEMPTS) { + if (++attempts > _max_attempts) { throw ExceptionCommand(LockTimeout); } backoff(cur_max_delay); @@ -159,7 +161,7 @@ namespace VDMS { // Wait for any active readers while ((_rw_lock & LOCK_READER_MASK) > 1) { - if (++attempts > MAX_ATTEMPTS) { + if (++attempts > _max_attempts) { atomic_and(_rw_lock, LOCK_READER_MASK); throw ExceptionCommand(LockTimeout); } @@ -174,7 +176,7 @@ namespace VDMS { // Wait for any active writers // Give this another extra attempt while (_rw_lock & WRITE_LOCK) { - if (attempts++ > MAX_ATTEMPTS) { + if (attempts++ > _max_attempts) { throw ExceptionCommand(LockTimeout); } backoff(cur_max_delay); diff --git a/src/Server.cc b/src/Server.cc index c8a57f0e..72f0d8da 100644 --- a/src/Server.cc +++ b/src/Server.cc @@ -104,4 +104,6 @@ Server::~Server() { _cm->shutdown(); delete _cm; + PMGDQueryHandler::destroy(); + VDMSConfig::destroy(); } diff --git a/tests/json_queries.cc b/tests/json_queries.cc index a93acceb..348ba59a 100644 --- a/tests/json_queries.cc +++ b/tests/json_queries.cc @@ -100,6 +100,7 @@ TEST(AddImage, simpleAdd) EXPECT_EQ(json_response[0]["AddImage"]["status"].asString(), "0"); VDMSConfig::destroy(); + PMGDQueryHandler::destroy(); } TEST(AddImage, simpleAddx10) @@ -151,6 +152,7 @@ TEST(AddImage, simpleAddx10) EXPECT_EQ(json_response[i]["AddImage"]["status"].asString(), "0"); } VDMSConfig::destroy(); + PMGDQueryHandler::destroy(); } TEST(QueryHandler, AddAndFind){ @@ -290,4 +292,5 @@ TEST(QueryHandler, AddAndFind){ EXPECT_EQ(sum_found_before, sum_found_after); EXPECT_EQ(count_found_before, count_found_after); VDMSConfig::destroy(); + PMGDQueryHandler::destroy(); } diff --git a/tests/pmgd_queries.cc b/tests/pmgd_queries.cc index f690b2b2..e2d1f244 100644 --- a/tests/pmgd_queries.cc +++ b/tests/pmgd_queries.cc @@ -163,7 +163,7 @@ TEST(PMGDQueryHandler, addTest) for (int i = 0; i < query_count; ++i) { vector response = responses[i]; for (auto it : response) { - ASSERT_EQ(it->error_code(), protobufs::CommandResponse::Success) << "Unsuccessful TX"; + EXPECT_EQ(it->error_code(), protobufs::CommandResponse::Success) << "Unsuccessful TX"; if (it->r_type() == protobufs::NodeID) { long nodeid = it->op_int_value(); EXPECT_EQ(nodeid, nodeids++) << "Unexpected node id"; @@ -176,6 +176,7 @@ TEST(PMGDQueryHandler, addTest) } } VDMSConfig::destroy(); + PMGDQueryHandler::destroy(); } void print_property(const string &key, const protobufs::Property &p) @@ -250,10 +251,10 @@ TEST(PMGDQueryHandler, queryTestList) vector> responses = qh.process_queries(cmds, query_count, true); int nodecount, propcount = 0; - for (int i = 0; i < query_count; ++i) { - vector response = responses[i]; + for (int q = 0; q < query_count; ++q) { + vector response = responses[q]; for (auto it : response) { - ASSERT_EQ(it->error_code(), protobufs::CommandResponse::Success) << it->error_msg(); + EXPECT_EQ(it->error_code(), protobufs::CommandResponse::Success) << it->error_msg(); if (it->r_type() == protobufs::List) { auto mymap = it->prop_values(); for(auto m_it : mymap) { @@ -274,6 +275,7 @@ TEST(PMGDQueryHandler, queryTestList) EXPECT_EQ(propcount, 2) << "Not enough properties read"; } VDMSConfig::destroy(); + PMGDQueryHandler::destroy(); } TEST(PMGDQueryHandler, queryTestAverage) @@ -316,7 +318,7 @@ TEST(PMGDQueryHandler, queryTestAverage) for (int i = 0; i < query_count; ++i) { vector response = responses[i]; for (auto it : response) { - ASSERT_EQ(it->error_code(), protobufs::CommandResponse::Success) << it->error_msg(); + EXPECT_EQ(it->error_code(), protobufs::CommandResponse::Success) << it->error_msg(); if (it->r_type() == protobufs::Average) { EXPECT_EQ(it->op_float_value(), 76.5) << "Average didn't match expected for four patients' age"; } @@ -324,6 +326,7 @@ TEST(PMGDQueryHandler, queryTestAverage) } } VDMSConfig::destroy(); + PMGDQueryHandler::destroy(); } TEST(PMGDQueryHandler, queryTestUnique) @@ -376,7 +379,7 @@ TEST(PMGDQueryHandler, queryTestUnique) query_count++; vector> responses = qh.process_queries(cmds, query_count, true); - ASSERT_EQ(responses.size(), 1) << "Expecting an error return situation"; + EXPECT_EQ(responses.size(), 1) << "Expecting an error return situation"; for (int i = 0; i < responses.size(); ++i) { vector response = responses[i]; for (auto it : response) { @@ -386,6 +389,7 @@ TEST(PMGDQueryHandler, queryTestUnique) } } VDMSConfig::destroy(); + PMGDQueryHandler::destroy(); } TEST(PMGDQueryHandler, queryNeighborTestList) @@ -454,10 +458,10 @@ TEST(PMGDQueryHandler, queryNeighborTestList) vector> responses = qh.process_queries(cmds, query_count, true); int nodecount, propcount = 0; - for (int i = 0; i < query_count; ++i) { - vector response = responses[i]; + for (int q = 0; q < query_count; ++q) { + vector response = responses[q]; for (auto it : response) { - ASSERT_EQ(it->error_code(), protobufs::CommandResponse::Success) << it->error_msg(); + EXPECT_EQ(it->error_code(), protobufs::CommandResponse::Success) << it->error_msg(); if (it->r_type() == protobufs::List) { auto mymap = it->prop_values(); for(auto m_it : mymap) { @@ -478,6 +482,7 @@ TEST(PMGDQueryHandler, queryNeighborTestList) EXPECT_EQ(propcount, 1) << "Not enough properties read"; } VDMSConfig::destroy(); + PMGDQueryHandler::destroy(); } TEST(PMGDQueryHandler, queryConditionalNeighborTestList) @@ -555,10 +560,10 @@ TEST(PMGDQueryHandler, queryConditionalNeighborTestList) vector> responses = qh.process_queries(cmds, query_count, true); int nodecount, propcount = 0; - for (int i = 0; i < query_count; ++i) { - vector response = responses[i]; + for (int q = 0; q < query_count; ++q) { + vector response = responses[q]; for (auto it : response) { - ASSERT_EQ(it->error_code(), protobufs::CommandResponse::Success) << it->error_msg(); + EXPECT_EQ(it->error_code(), protobufs::CommandResponse::Success) << it->error_msg(); if (it->r_type() == protobufs::List) { auto mymap = it->prop_values(); for(auto m_it : mymap) { @@ -579,6 +584,7 @@ TEST(PMGDQueryHandler, queryConditionalNeighborTestList) EXPECT_EQ(propcount, 1) << "Not enough properties read"; } VDMSConfig::destroy(); + PMGDQueryHandler::destroy(); } TEST(PMGDQueryHandler, queryNeighborTestSum) @@ -650,7 +656,7 @@ TEST(PMGDQueryHandler, queryNeighborTestSum) for (int i = 0; i < query_count; ++i) { vector response = responses[i]; for (auto it : response) { - ASSERT_EQ(it->error_code(), protobufs::CommandResponse::Success) << it->error_msg(); + EXPECT_EQ(it->error_code(), protobufs::CommandResponse::Success) << it->error_msg(); if (it->r_type() == protobufs::Sum) { EXPECT_EQ(it->op_int_value(), 150) << "Sum didn't match expected for two patients' age"; } @@ -658,6 +664,7 @@ TEST(PMGDQueryHandler, queryNeighborTestSum) } } VDMSConfig::destroy(); + PMGDQueryHandler::destroy(); } TEST(PMGDQueryHandler, addConstrainedTest) @@ -734,18 +741,19 @@ TEST(PMGDQueryHandler, addConstrainedTest) // Since PMGD queries always generate one response per command, // we can do the following: protobufs::CommandResponse *resp = responses[0][0]; // TxBegin - ASSERT_EQ(resp->error_code(), protobufs::CommandResponse::Success) << "Unsuccessful TX"; + EXPECT_EQ(resp->error_code(), protobufs::CommandResponse::Success) << "Unsuccessful TX"; resp = responses[1][0]; // Conditional add - ASSERT_EQ(resp->error_code(), protobufs::CommandResponse::Exists) << resp->error_msg(); + EXPECT_EQ(resp->error_code(), protobufs::CommandResponse::Exists) << resp->error_msg(); EXPECT_EQ(resp->op_int_value(), 1) << "Unexpected node id for conditional add"; resp = responses[2][0]; // Regular add - ASSERT_EQ(resp->error_code(), protobufs::CommandResponse::Success) << resp->error_msg(); + EXPECT_EQ(resp->error_code(), protobufs::CommandResponse::Success) << resp->error_msg(); EXPECT_EQ(resp->op_int_value(), 5) << "Unexpected node id for add"; resp = responses[3][0]; // Regular add edge - ASSERT_EQ(resp->error_code(), protobufs::CommandResponse::Success) << resp->error_msg(); + EXPECT_EQ(resp->error_code(), protobufs::CommandResponse::Success) << resp->error_msg(); EXPECT_EQ(resp->op_int_value(), 3) << "Unexpected edge id for add"; } VDMSConfig::destroy(); + PMGDQueryHandler::destroy(); } TEST(PMGDQueryHandler, queryNeighborLinksTestList) @@ -830,10 +838,10 @@ TEST(PMGDQueryHandler, queryNeighborLinksTestList) vector> responses = qh.process_queries(cmds, query_count, true); int nodecount, propcount = 0; - for (int i = 0; i < query_count; ++i) { - vector response = responses[i]; + for (int q = 0; q < query_count; ++q) { + vector response = responses[q]; for (auto it : response) { - ASSERT_EQ(it->error_code(), protobufs::CommandResponse::Success) << it->error_msg(); + EXPECT_EQ(it->error_code(), protobufs::CommandResponse::Success) << it->error_msg(); if (it->r_type() == protobufs::List) { auto mymap = it->prop_values(); for(auto m_it : mymap) { @@ -854,6 +862,7 @@ TEST(PMGDQueryHandler, queryNeighborLinksTestList) EXPECT_EQ(propcount, 1) << "Not enough properties read"; } VDMSConfig::destroy(); + PMGDQueryHandler::destroy(); } TEST(PMGDQueryHandler, queryNeighborLinksReuseTestList) @@ -945,10 +954,10 @@ TEST(PMGDQueryHandler, queryNeighborLinksReuseTestList) vector> responses = qh.process_queries(cmds, query_count, true); int nodecount = 0, propcount = 0; int totnodecount = 0, totpropcount = 0; - for (int i = 0; i < query_count; ++i) { - vector response = responses[i]; + for (int q = 0; q < query_count; ++q) { + vector response = responses[q]; for (auto it : response) { - ASSERT_EQ(it->error_code(), protobufs::CommandResponse::Success) << it->error_msg(); + EXPECT_EQ(it->error_code(), protobufs::CommandResponse::Success) << it->error_msg(); if (it->r_type() == protobufs::List) { propcount = 0; auto mymap = it->prop_values(); @@ -977,6 +986,7 @@ TEST(PMGDQueryHandler, queryNeighborLinksReuseTestList) EXPECT_EQ(totpropcount, 3) << "Not enough total properties read"; } VDMSConfig::destroy(); + PMGDQueryHandler::destroy(); } TEST(PMGDQueryHandler, querySortedNeighborLinksReuseTestList) @@ -1071,10 +1081,10 @@ TEST(PMGDQueryHandler, querySortedNeighborLinksReuseTestList) int nodecount = 0, propcount = 0; int totnodecount = 0, totpropcount = 0; bool firstquery = true; - for (int i = 0; i < query_count; ++i) { - vector response = responses[i]; + for (int q = 0; q < query_count; ++q) { + vector response = responses[q]; for (auto it : response) { - ASSERT_EQ(it->error_code(), protobufs::CommandResponse::Success) << it->error_msg(); + EXPECT_EQ(it->error_code(), protobufs::CommandResponse::Success) << it->error_msg(); if (it->r_type() == protobufs::List) { propcount = 0; auto mymap = it->prop_values(); @@ -1107,6 +1117,7 @@ TEST(PMGDQueryHandler, querySortedNeighborLinksReuseTestList) EXPECT_EQ(totpropcount, 3) << "Not enough total properties read"; } VDMSConfig::destroy(); + PMGDQueryHandler::destroy(); } TEST(PMGDQueryHandler, queryTestListLimit) @@ -1151,10 +1162,10 @@ TEST(PMGDQueryHandler, queryTestListLimit) vector> responses = qh.process_queries(cmds, query_count, true); int nodecount, propcount = 0; - for (int i = 0; i < query_count; ++i) { - vector response = responses[i]; + for (int q = 0; q < query_count; ++q) { + vector response = responses[q]; for (auto it : response) { - ASSERT_EQ(it->error_code(), protobufs::CommandResponse::Success) << it->error_msg(); + EXPECT_EQ(it->error_code(), protobufs::CommandResponse::Success) << it->error_msg(); if (it->r_type() == protobufs::List) { auto mymap = it->prop_values(); for(auto m_it : mymap) { @@ -1175,6 +1186,7 @@ TEST(PMGDQueryHandler, queryTestListLimit) EXPECT_EQ(propcount, 2) << "Not enough properties read"; } VDMSConfig::destroy(); + PMGDQueryHandler::destroy(); } TEST(PMGDQueryHandler, queryTestSortedLimitedAverage) @@ -1221,7 +1233,7 @@ TEST(PMGDQueryHandler, queryTestSortedLimitedAverage) for (int i = 0; i < query_count; ++i) { vector response = responses[i]; for (auto it : response) { - ASSERT_EQ(it->error_code(), protobufs::CommandResponse::Success) << it->error_msg(); + EXPECT_EQ(it->error_code(), protobufs::CommandResponse::Success) << it->error_msg(); if (it->r_type() == protobufs::Average) { EXPECT_EQ(static_cast(it->op_float_value()), 73) << "Average didn't match expected for three middle patients' age"; } @@ -1229,4 +1241,5 @@ TEST(PMGDQueryHandler, queryTestSortedLimitedAverage) } } VDMSConfig::destroy(); + PMGDQueryHandler::destroy(); } diff --git a/tests/python/TestEntities.py b/tests/python/TestEntities.py index f58cd66b..216eb495 100644 --- a/tests/python/TestEntities.py +++ b/tests/python/TestEntities.py @@ -98,13 +98,13 @@ def findEntity(self, thID): ["threadid"], thID) def test_runMultipleAdds(self): - simultaneous = 1000; thread_arr = [] for i in range(1,simultaneous): thread_add = Thread(target=self.addEntity,args=(i,) ) thread_add.start() thread_arr.append(thread_add) + time.sleep(0.002) for i in range(1,simultaneous): thread_find = Thread(target=self.findEntity,args=(i,) ) diff --git a/tests/python/config-tests.json b/tests/python/config-tests.json index b67e1a71..f744a42c 100644 --- a/tests/python/config-tests.json +++ b/tests/python/config-tests.json @@ -5,6 +5,10 @@ // Network "port": 55557, // Default is 55555 + // Tune the number of maximum attempts when acquiring the + // reader writer lock for metadata changes. + "max_lock_attempts": 20, + // Database paths "pmgd_path": "db/test-graph", "png_path": "db/images/pngs/",