Skip to content

Commit

Permalink
Merge pull request #22 from IntelLabs/rwlock
Browse files Browse the repository at this point in the history
Rwlock
  • Loading branch information
vishakha041 authored Apr 27, 2018
2 parents bb5a437 + 8c6e943 commit 5e243ea
Show file tree
Hide file tree
Showing 8 changed files with 302 additions and 28 deletions.
2 changes: 2 additions & 0 deletions src/ExceptionsCommand.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ namespace VDMS {
DescriptorError,
DescriptorSetError,
PMGDTransactiontError,
LockTimeout,
LockError,

Undefined = 100,// Any undefined error
};
Expand Down
9 changes: 7 additions & 2 deletions src/PMGDQuery.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ using namespace VDMS;
#define REFERENCE_RANGE_START 20000

PMGDQuery::PMGDQuery(PMGDQueryHandler& pmgd_qh) :
_pmgd_qh(pmgd_qh), _current_ref(REFERENCE_RANGE_START)
_pmgd_qh(pmgd_qh), _current_ref(REFERENCE_RANGE_START),
_readonly(true)
{
_current_group_id = 0;
//this command to start a new transaction
Expand Down Expand Up @@ -77,7 +78,7 @@ Json::Value& PMGDQuery::run()

// execute the queries using the PMGDQueryHandler object
std::vector<std::vector<PMGDCmdResponse* >> _pmgd_responses;
_pmgd_responses = _pmgd_qh.process_queries(_cmds, _current_group_id + 1);
_pmgd_responses = _pmgd_qh.process_queries(_cmds, _current_group_id + 1, _readonly);

if (_pmgd_responses.size() != _current_group_id + 1) {
if (_pmgd_responses.size() == 1 && _pmgd_responses[0].size() == 1) {
Expand Down Expand Up @@ -421,6 +422,8 @@ void PMGDQuery::AddNode(int ref,
const Json::Value& props,
const Json::Value& constraints)
{
_readonly = false;

PMGDCmd* cmdadd = new PMGDCmd();
cmdadd->set_cmd_id(PMGDCmd::AddNode);
cmdadd->set_cmd_grp_id(_current_group_id);
Expand Down Expand Up @@ -453,6 +456,8 @@ void PMGDQuery::AddEdge(int ident,
const std::string& tag,
const Json::Value& props)
{
_readonly = false;

PMGDCmd* cmdedge = new PMGDCmd();
cmdedge->set_cmd_grp_id(_current_group_id);
cmdedge->set_cmd_id(PMGDCmd::AddEdge);
Expand Down
1 change: 1 addition & 0 deletions src/PMGDQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ namespace VDMS {
unsigned _current_group_id;
PMGDQueryHandler& _pmgd_qh;
unsigned _current_ref;
bool _readonly; // Stays true unless some write cmd sets it to false.

Json::Value _json_responses;

Expand Down
26 changes: 18 additions & 8 deletions src/PMGDQueryHandler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "PMGDQueryHandler.h"
#include "util.h" // PMGD util
#include "PMGDIterators.h"
#include "RWLock.h"

// TODO In the complete version of VDMS, this file will live
// within PMGD which would replace the PMGD namespace. Some of
Expand All @@ -42,7 +43,7 @@ using namespace PMGD;
using namespace VDMS;

PMGD::Graph *PMGDQueryHandler::_db;
std::mutex *PMGDQueryHandler::_dblock;
RWLock *PMGDQueryHandler::_dblock;

void PMGDQueryHandler::init()
{
Expand All @@ -53,17 +54,24 @@ void PMGDQueryHandler::init()
_db = new PMGD::Graph(dbname.c_str(), PMGD::Graph::Create);

// Create the query handler here assuming database is valid now.
_dblock = new std::mutex();
_dblock = new RWLock();
}

std::vector<PMGDCmdResponses>
PMGDQueryHandler::process_queries(const PMGDCmds &cmds,
int num_groups)
int num_groups, bool readonly)
{
std::vector<PMGDCmdResponses> responses(num_groups);

assert(_tx == NULL);
_dblock->lock();

// Assuming one query handler handles one TX at a time.
_readonly = readonly;
if (_readonly)
_dblock->read_lock();
else
_dblock->write_lock();

for (const auto cmd : cmds) {
PMGDCmdResponse *response = new PMGDCmdResponse();
if (process_query(cmd, response) < 0) {
Expand All @@ -85,7 +93,10 @@ std::vector<PMGDCmdResponses>
_tx = NULL;
}

_dblock->unlock();
if (_readonly)
_dblock->read_unlock();
else
_dblock->write_unlock();
return responses;
}

Expand Down Expand Up @@ -121,9 +132,8 @@ int PMGDQueryHandler::process_query(const PMGDCmd *cmd,
switch (code) {
case PMGDCmd::TxBegin:
{

// TODO: Needs to distinguish transaction parameters like RO/RW
_tx = new Transaction(*_db, Transaction::ReadWrite);
int tx_options = _readonly ? Transaction::ReadOnly : Transaction::ReadWrite;
_tx = new Transaction(*_db, tx_options);
set_response(response, protobufs::TX, PMGDCmdResponse::Success);
break;
}
Expand Down
12 changes: 7 additions & 5 deletions src/PMGDQueryHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@

#include <map>
#include <unordered_map>
#include <mutex>
#include <vector>
#include <list>

Expand All @@ -56,6 +55,8 @@ namespace VDMS {
typedef std::vector<PMGDCmd *> PMGDCmds;
typedef std::vector<PMGDCmdResponse *> PMGDCmdResponses;

class RWLock;

class PMGDQueryHandler
{
class ReusableNodeIterator;
Expand All @@ -65,10 +66,10 @@ namespace VDMS {
static PMGD::Graph *_db;

// Need this lock till we have concurrency support in PMGD
// TODO: Make this reader writer.
static std::mutex *_dblock;
static RWLock *_dblock;

PMGD::Transaction *_tx;
bool _readonly; // Variable changes per TX based on process_queries parameter.

// Map an integer ID to a NodeIterator (reset at the end of each transaction).
// This works for Adds and Queries. We assume that the client or
Expand Down Expand Up @@ -118,7 +119,7 @@ namespace VDMS {

public:
static void init();
PMGDQueryHandler() { _tx = NULL; }
PMGDQueryHandler() { _tx = NULL; _readonly = true; }

// The vector here can contain just one JL command but will be surrounded by
// TX begin and end. So just expose one call to the QueryHandler for
Expand All @@ -129,6 +130,7 @@ namespace VDMS {
// than the number of commands.
// Ensure that the cmd_grp_id, that is the query number are in increasing
// order and account for the TxBegin and TxEnd in numbering.
std::vector<PMGDCmdResponses> process_queries(const PMGDCmds &cmds, int num_groups);
std::vector<PMGDCmdResponses> process_queries(const PMGDCmds &cmds,
int num_groups, bool readonly);
};
};
196 changes: 196 additions & 0 deletions src/RWLock.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
/**
* @file RWLock.h
*
* @section LICENSE
*
* The MIT License
*
* @copyright Copyright (c) 2017 Intel Corporation
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"),
* to deal in the Software without restriction,
* including without limitation the rights to use, copy, modify,
* merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
* ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*
*/

#pragma once
#include <thread>
#include <random>
#include <vector>
#include <algorithm>

#include "arch.h"
#include "ExceptionsCommand.h"

// All locking structures live in DRAM. We use stripe locks
// to balance out the space used by locks for a large database
// vs. parallelism.
namespace VDMS {
class RWLock
{
static const uint16_t LOCK_READER_MASK = 0x7fff;
static const uint16_t READER_INCR = 1;
static const uint16_t WRITER_LOCK_BIT = 15;
static const uint16_t WRITE_LOCK = 1UL << WRITER_LOCK_BIT;

// Backoff variables.
// *** Tune experimentally
static const size_t MIN_BACKOFF_DELAY = 100000;
static const size_t MAX_BACKOFF_DELAY = 50000000;
static const unsigned MAX_ATTEMPTS = 10;

uint16_t xadd(volatile uint16_t &m, uint16_t v)
{ return ::xadd<uint16_t>(m, v); }
void atomic_and(volatile uint16_t &m, uint16_t v)
{ ::atomic_and<uint16_t>(m, v); }

volatile uint16_t _rw_lock;

// Ideas from here: https://geidav.wordpress.com/tag/exponential-back-off
void backoff(size_t &cur_max_delay)
{
std::random_device rd;
thread_local std::uniform_int_distribution<size_t> dist;

// The simplest pseudo-random number generator that gives an int.
thread_local std::minstd_rand gen(rd());

size_t delay = cur_max_delay;
size_t temp = 2 * cur_max_delay;
cur_max_delay = (temp < MAX_BACKOFF_DELAY) ? temp : MAX_BACKOFF_DELAY;
const size_t count = dist(gen, decltype(dist)::param_type{delay, cur_max_delay});
for (size_t i = 0; i < count; ++i)
pause();
}

public:
RWLock() : _rw_lock(0) {}

void read_lock()
{
size_t cur_max_delay = MIN_BACKOFF_DELAY;
unsigned attempts = 0;

while (1) {
uint16_t r = xadd(_rw_lock, READER_INCR);
if((r & LOCK_READER_MASK) == LOCK_READER_MASK) // if r was already maxed out
throw ExceptionCommand(LockError); // distinguish this from timeout.

// Check if we get lock w/o any active writers
if ((r & WRITE_LOCK) == 0)
return;
xadd(_rw_lock, -READER_INCR);

// Wait for any active writers
while (_rw_lock & WRITE_LOCK) {
if (++attempts > MAX_ATTEMPTS)
throw ExceptionCommand(LockTimeout);
backoff(cur_max_delay);
}
}
}

void read_unlock()
{
if ((_rw_lock & LOCK_READER_MASK) == 0)
throw ExceptionCommand(LockError); // distinguish this from timeout.
xadd(_rw_lock, -READER_INCR);
}

void write_lock()
{
size_t cur_max_delay = MIN_BACKOFF_DELAY;
unsigned attempts = 0;

while (1) {
// Check if we get lock w/o any active writers
if (bts(_rw_lock, WRITER_LOCK_BIT) == 0) {
attempts = 0;

// Wait for any active readers
while(_rw_lock & LOCK_READER_MASK) {
if (++attempts > MAX_ATTEMPTS) {
atomic_and(_rw_lock, LOCK_READER_MASK);
throw ExceptionCommand(LockTimeout);
}
backoff(cur_max_delay);
}
return;
}

// Wait for any active writers
while (_rw_lock & WRITE_LOCK) {
if (++attempts > MAX_ATTEMPTS) {
throw ExceptionCommand(LockTimeout);
}
backoff(cur_max_delay);
}
}
}

// This function should be called only when the caller already possesses
// a read lock. Rather than making this recursive, we are relying on the
// caller to use this correctly.
void upgrade_write_lock()
{
size_t cur_max_delay = MIN_BACKOFF_DELAY;
unsigned attempts = 0;

while (1) {
// Check if we get lock w/o any active writers
if (bts(_rw_lock, WRITER_LOCK_BIT) == 0) {
attempts = 0;

// Wait for any active readers
while ((_rw_lock & LOCK_READER_MASK) > 1) {
if (++attempts > MAX_ATTEMPTS) {
atomic_and(_rw_lock, LOCK_READER_MASK);
throw ExceptionCommand(LockTimeout);
}
backoff(cur_max_delay);
}

// Don't need reader lock anymore
xadd(_rw_lock, -READER_INCR);
return;
}

// Wait for any active writers
// Give this another extra attempt
while (_rw_lock & WRITE_LOCK) {
if (attempts++ > MAX_ATTEMPTS) {
throw ExceptionCommand(LockTimeout);
}
backoff(cur_max_delay);
}
}
}

void write_unlock()
{
if((_rw_lock & WRITE_LOCK) == 0)
throw ExceptionCommand(LockError); // distinguish this from timeout.
atomic_and(_rw_lock, LOCK_READER_MASK);
}

bool is_write_locked() { return _rw_lock & WRITE_LOCK; }

uint16_t reader_count() const { return _rw_lock & LOCK_READER_MASK; }
};
}
Loading

0 comments on commit 5e243ea

Please sign in to comment.