Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #30

Merged
merged 4 commits into from
Aug 2, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions config-vdms.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
"port": 55555, // Default is 55555
"max_simultaneous_clients": 20, // Default is 500

// Tune the number of maximum attempts when acquiring the
// reader writer lock for metadata changes.
"max_lock_attempts": 10,

// Database paths
"pmgd_path": "db/graph", // This will be an IP address in the future
"png_path": "db/images/pngs/",
Expand Down
22 changes: 17 additions & 5 deletions src/PMGDQueryHandler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,14 @@ void PMGDQueryHandler::init()
{
std::string dbname = VDMSConfig::instance()
->get_string_value("pmgd_path", "default_pmgd");
unsigned attempts = VDMSConfig::instance()
->get_int_value("max_lock_attempts", RWLock::MAX_ATTEMPTS);

// Create a db
_db = new PMGD::Graph(dbname.c_str(), PMGD::Graph::Create);

// Create the query handler here assuming database is valid now.
_dblock = new RWLock();
_dblock = new RWLock(attempts);
}

void PMGDQueryHandler::destroy()
Expand All @@ -77,10 +79,20 @@ std::vector<PMGDCmdResponses>

// Assuming one query handler handles one TX at a time.
_readonly = readonly;
if (_readonly)
_dblock->read_lock();
else
_dblock->write_lock();
try {
if (_readonly)
_dblock->read_lock();
else
_dblock->write_lock();
}
catch (Exception e) {
PMGDCmdResponses &resp_v = responses[0];
PMGDCmdResponse *response = new PMGDCmdResponse();
set_response(response, PMGDCmdResponse::Exception,
e.name + std::string(": ") + e.msg);
resp_v.push_back(response);
return responses;
}

for (const auto cmd : cmds) {
PMGDCmdResponse *response = new PMGDCmdResponse();
Expand Down
18 changes: 10 additions & 8 deletions src/RWLock.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,15 @@ namespace VDMS {
// Backoff variables.
// *** Tune experimentally
static const size_t MIN_BACKOFF_DELAY = 100000;
static const size_t MAX_BACKOFF_DELAY = 50000000;
static const unsigned MAX_ATTEMPTS = 10;
static const size_t MAX_BACKOFF_DELAY = 500000000;

uint16_t xadd(volatile uint16_t &m, uint16_t v)
{ return ::xadd<uint16_t>(m, v); }
void atomic_and(volatile uint16_t &m, uint16_t v)
{ ::atomic_and<uint16_t>(m, v); }

volatile uint16_t _rw_lock;
const unsigned _max_attempts;

// Ideas from here: https://geidav.wordpress.com/tag/exponential-back-off
void backoff(size_t &cur_max_delay)
Expand All @@ -80,7 +80,9 @@ namespace VDMS {
}

public:
RWLock() : _rw_lock(0) {}
static const unsigned MAX_ATTEMPTS = 10;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be called DEFAULT_MAX_ATTEMPTS.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree.


RWLock(unsigned max_attempts) : _rw_lock(0), _max_attempts(max_attempts) {}

void read_lock()
{
Expand All @@ -99,7 +101,7 @@ namespace VDMS {

// Wait for any active writers
while (_rw_lock & WRITE_LOCK) {
if (++attempts > MAX_ATTEMPTS)
if (++attempts > _max_attempts)
throw ExceptionCommand(LockTimeout);
backoff(cur_max_delay);
}
Expand All @@ -125,7 +127,7 @@ namespace VDMS {

// Wait for any active readers
while(_rw_lock & LOCK_READER_MASK) {
if (++attempts > MAX_ATTEMPTS) {
if (++attempts > _max_attempts) {
atomic_and(_rw_lock, LOCK_READER_MASK);
throw ExceptionCommand(LockTimeout);
}
Expand All @@ -136,7 +138,7 @@ namespace VDMS {

// Wait for any active writers
while (_rw_lock & WRITE_LOCK) {
if (++attempts > MAX_ATTEMPTS) {
if (++attempts > _max_attempts) {
throw ExceptionCommand(LockTimeout);
}
backoff(cur_max_delay);
Expand All @@ -159,7 +161,7 @@ namespace VDMS {

// Wait for any active readers
while ((_rw_lock & LOCK_READER_MASK) > 1) {
if (++attempts > MAX_ATTEMPTS) {
if (++attempts > _max_attempts) {
atomic_and(_rw_lock, LOCK_READER_MASK);
throw ExceptionCommand(LockTimeout);
}
Expand All @@ -174,7 +176,7 @@ namespace VDMS {
// Wait for any active writers
// Give this another extra attempt
while (_rw_lock & WRITE_LOCK) {
if (attempts++ > MAX_ATTEMPTS) {
if (attempts++ > _max_attempts) {
throw ExceptionCommand(LockTimeout);
}
backoff(cur_max_delay);
Expand Down
2 changes: 1 addition & 1 deletion tests/python/TestEntities.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,13 @@ def findEntity(self, thID):
["threadid"], thID)

def test_runMultipleAdds(self):

simultaneous = 1000;
thread_arr = []
for i in range(1,simultaneous):
thread_add = Thread(target=self.addEntity,args=(i,) )
thread_add.start()
thread_arr.append(thread_add)
time.sleep(0.002)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not convinced we should change the test so that it passes.
The introduction of the RWlock and msync should not break this tests, we should push a fix to address this, not change the tests so that we mask it.

Should we create an separate issue for this? Or want to address it here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can create a separate issue. What I noticed here is....msync increases how long PMGD takes. That means the reader writer lock has more timeouts and so more threads wait. That somehow seems to be messing with some queue that starts denying newer threads from even getting connected. Needs more exploring which I don't want to hold up this pull request for.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, one of the things we need to figure now at the VDMS layer is that timeout based locking is bound to cause more failures since threads don't get to wait however long. Do we introduce internal retries or let the client retry? We should of course try to figure out the right values as we run more experiments but just saying.


for i in range(1,simultaneous):
thread_find = Thread(target=self.findEntity,args=(i,) )
Expand Down
4 changes: 4 additions & 0 deletions tests/python/config-tests.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
// Network
"port": 55557, // Default is 55555

// Tune the number of maximum attempts when acquiring the
// reader writer lock for metadata changes.
"max_lock_attempts": 20,

// Database paths
"pmgd_path": "db/test-graph",
"png_path": "db/images/pngs/",
Expand Down