From bffd14b448ee1a648f84dca311af3bb91d6e654a Mon Sep 17 00:00:00 2001 From: Vishakha Gupta-Cledat Date: Sat, 14 Apr 2018 07:09:35 +0000 Subject: [PATCH 1/2] Add reader writer lock implementation. --- src/ExceptionsCommand.h | 2 + src/RWLock.h | 196 ++++++++++++++++++++++++++++++++++++++++ src/arch.h | 58 ++++++++++++ 3 files changed, 256 insertions(+) create mode 100644 src/RWLock.h create mode 100644 src/arch.h diff --git a/src/ExceptionsCommand.h b/src/ExceptionsCommand.h index a5a68b50..f3d5fe44 100644 --- a/src/ExceptionsCommand.h +++ b/src/ExceptionsCommand.h @@ -43,6 +43,8 @@ namespace VDMS { DescriptorError, DescriptorSetError, PMGDTransactiontError, + LockTimeout, + LockError, Undefined = 100,// Any undefined error }; diff --git a/src/RWLock.h b/src/RWLock.h new file mode 100644 index 00000000..9b8cc5a1 --- /dev/null +++ b/src/RWLock.h @@ -0,0 +1,196 @@ +/** + * @file RWLock.h + * + * @section LICENSE + * + * The MIT License + * + * @copyright Copyright (c) 2017 Intel Corporation + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, + * including without limitation the rights to use, copy, modify, + * merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, + * ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + * + */ + +#pragma once +#include +#include +#include +#include + +#include "arch.h" +#include "ExceptionsCommand.h" + +// All locking structures live in DRAM. We use stripe locks +// to balance out the space used by locks for a large database +// vs. parallelism. +namespace VDMS { + class RWLock + { + static const uint16_t LOCK_READER_MASK = 0x7fff; + static const uint16_t READER_INCR = 1; + static const uint16_t WRITER_LOCK_BIT = 15; + static const uint16_t WRITE_LOCK = 1UL << WRITER_LOCK_BIT; + + // Backoff variables. + // *** Tune experimentally + static const size_t MIN_BACKOFF_DELAY = 100000; + static const size_t MAX_BACKOFF_DELAY = 50000000; + static const unsigned MAX_ATTEMPTS = 10; + + uint16_t xadd(volatile uint16_t &m, uint16_t v) + { return ::xadd(m, v); } + void atomic_and(volatile uint16_t &m, uint16_t v) + { ::atomic_and(m, v); } + + volatile uint16_t _rw_lock; + + // Ideas from here: https://geidav.wordpress.com/tag/exponential-back-off + void backoff(size_t &cur_max_delay) + { + std::random_device rd; + thread_local std::uniform_int_distribution dist; + + // The simplest pseudo-random number generator that gives an int. + thread_local std::minstd_rand gen(rd()); + + size_t delay = cur_max_delay; + size_t temp = 2 * cur_max_delay; + cur_max_delay = (temp < MAX_BACKOFF_DELAY) ? temp : MAX_BACKOFF_DELAY; + const size_t count = dist(gen, decltype(dist)::param_type{delay, cur_max_delay}); + for (size_t i = 0; i < count; ++i) + pause(); + } + + public: + RWLock() : _rw_lock(0) {} + + void read_lock() + { + size_t cur_max_delay = MIN_BACKOFF_DELAY; + unsigned attempts = 0; + + while (1) { + uint16_t r = xadd(_rw_lock, READER_INCR); + if((r & LOCK_READER_MASK) == LOCK_READER_MASK) // if r was already maxed out + throw ExceptionCommand(LockError); // distinguish this from timeout. + + // Check if we get lock w/o any active writers + if ((r & WRITE_LOCK) == 0) + return; + xadd(_rw_lock, -READER_INCR); + + // Wait for any active writers + while (_rw_lock & WRITE_LOCK) { + if (++attempts > MAX_ATTEMPTS) + throw ExceptionCommand(LockTimeout); + backoff(cur_max_delay); + } + } + } + + void read_unlock() + { + if ((_rw_lock & LOCK_READER_MASK) == 0) + throw ExceptionCommand(LockError); // distinguish this from timeout. + xadd(_rw_lock, -READER_INCR); + } + + void write_lock() + { + size_t cur_max_delay = MIN_BACKOFF_DELAY; + unsigned attempts = 0; + + while (1) { + // Check if we get lock w/o any active writers + if (bts(_rw_lock, WRITER_LOCK_BIT) == 0) { + attempts = 0; + + // Wait for any active readers + while(_rw_lock & LOCK_READER_MASK) { + if (++attempts > MAX_ATTEMPTS) { + atomic_and(_rw_lock, LOCK_READER_MASK); + throw ExceptionCommand(LockTimeout); + } + backoff(cur_max_delay); + } + return; + } + + // Wait for any active writers + while (_rw_lock & WRITE_LOCK) { + if (++attempts > MAX_ATTEMPTS) { + throw ExceptionCommand(LockTimeout); + } + backoff(cur_max_delay); + } + } + } + + // This function should be called only when the caller already possesses + // a read lock. Rather than making this recursive, we are relying on the + // caller to use this correctly. + void upgrade_write_lock() + { + size_t cur_max_delay = MIN_BACKOFF_DELAY; + unsigned attempts = 0; + + while (1) { + // Check if we get lock w/o any active writers + if (bts(_rw_lock, WRITER_LOCK_BIT) == 0) { + attempts = 0; + + // Wait for any active readers + while ((_rw_lock & LOCK_READER_MASK) > 1) { + if (++attempts > MAX_ATTEMPTS) { + atomic_and(_rw_lock, LOCK_READER_MASK); + throw ExceptionCommand(LockTimeout); + } + backoff(cur_max_delay); + } + + // Don't need reader lock anymore + xadd(_rw_lock, -READER_INCR); + return; + } + + // Wait for any active writers + // Give this another extra attempt + while (_rw_lock & WRITE_LOCK) { + if (attempts++ > MAX_ATTEMPTS) { + throw ExceptionCommand(LockTimeout); + } + backoff(cur_max_delay); + } + } + } + + void write_unlock() + { + if((_rw_lock & WRITE_LOCK) == 0) + throw ExceptionCommand(LockError); // distinguish this from timeout. + atomic_and(_rw_lock, LOCK_READER_MASK); + } + + bool is_write_locked() { return _rw_lock & WRITE_LOCK; } + + uint16_t reader_count() const { return _rw_lock & LOCK_READER_MASK; } + }; +} diff --git a/src/arch.h b/src/arch.h new file mode 100644 index 00000000..df5e9032 --- /dev/null +++ b/src/arch.h @@ -0,0 +1,58 @@ +/** + * @file arch.h + * + * @section LICENSE + * + * The MIT License + * + * @copyright Copyright (c) 2017 Intel Corporation + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + * + */ + +#pragma once + +#include + +template +static inline void atomic_and(volatile T &m, T v) + { asm volatile ("lock and%z0 %1, %0" : "+m"(m) : "ri"(v) : "memory"); } + +template +static inline bool bts(volatile T &m, int bit) +{ + bool result = 0; + __asm__ volatile ("lock bts%z1 %2, %1\n; setc %0" + : "=q"(result), "+m"(m) : "Ir"(T(bit)) : "memory", "cc"); + return result; +} + +template +static inline T xadd(volatile T &m, T v) +{ + T r = v; + asm volatile ("lock xadd %1, %0" : "+m"(m), "+r"(r)); + return r; +} + +static inline void pause() +{ + asm("pause"); +} From 8c6e943d86307b8ccc661e30f5c6b069aba40643 Mon Sep 17 00:00:00 2001 From: Vishakha Gupta-Cledat Date: Sat, 14 Apr 2018 07:11:06 +0000 Subject: [PATCH 2/2] Change mutex in PMGDQueryHandler to reader writer lock. --- src/PMGDQuery.cc | 9 +++++++-- src/PMGDQuery.h | 1 + src/PMGDQueryHandler.cc | 26 ++++++++++++++++++-------- src/PMGDQueryHandler.h | 12 +++++++----- tests/pmgd_queries.cc | 26 +++++++++++++------------- 5 files changed, 46 insertions(+), 28 deletions(-) diff --git a/src/PMGDQuery.cc b/src/PMGDQuery.cc index 2ac6978b..2c73c5c4 100644 --- a/src/PMGDQuery.cc +++ b/src/PMGDQuery.cc @@ -46,7 +46,8 @@ using namespace VDMS; #define REFERENCE_RANGE_START 20000 PMGDQuery::PMGDQuery(PMGDQueryHandler& pmgd_qh) : - _pmgd_qh(pmgd_qh), _current_ref(REFERENCE_RANGE_START) + _pmgd_qh(pmgd_qh), _current_ref(REFERENCE_RANGE_START), + _readonly(true) { _current_group_id = 0; //this command to start a new transaction @@ -77,7 +78,7 @@ Json::Value& PMGDQuery::run() // execute the queries using the PMGDQueryHandler object std::vector> _pmgd_responses; - _pmgd_responses = _pmgd_qh.process_queries(_cmds, _current_group_id + 1); + _pmgd_responses = _pmgd_qh.process_queries(_cmds, _current_group_id + 1, _readonly); if (_pmgd_responses.size() != _current_group_id + 1) { if (_pmgd_responses.size() == 1 && _pmgd_responses[0].size() == 1) { @@ -421,6 +422,8 @@ void PMGDQuery::AddNode(int ref, const Json::Value& props, const Json::Value& constraints) { + _readonly = false; + PMGDCmd* cmdadd = new PMGDCmd(); cmdadd->set_cmd_id(PMGDCmd::AddNode); cmdadd->set_cmd_grp_id(_current_group_id); @@ -453,6 +456,8 @@ void PMGDQuery::AddEdge(int ident, const std::string& tag, const Json::Value& props) { + _readonly = false; + PMGDCmd* cmdedge = new PMGDCmd(); cmdedge->set_cmd_grp_id(_current_group_id); cmdedge->set_cmd_id(PMGDCmd::AddEdge); diff --git a/src/PMGDQuery.h b/src/PMGDQuery.h index 37afc3d9..d0cfa757 100644 --- a/src/PMGDQuery.h +++ b/src/PMGDQuery.h @@ -49,6 +49,7 @@ namespace VDMS { unsigned _current_group_id; PMGDQueryHandler& _pmgd_qh; unsigned _current_ref; + bool _readonly; // Stays true unless some write cmd sets it to false. Json::Value _json_responses; diff --git a/src/PMGDQueryHandler.cc b/src/PMGDQueryHandler.cc index 368954be..e132f468 100644 --- a/src/PMGDQueryHandler.cc +++ b/src/PMGDQueryHandler.cc @@ -34,6 +34,7 @@ #include "PMGDQueryHandler.h" #include "util.h" // PMGD util #include "PMGDIterators.h" +#include "RWLock.h" // TODO In the complete version of VDMS, this file will live // within PMGD which would replace the PMGD namespace. Some of @@ -42,7 +43,7 @@ using namespace PMGD; using namespace VDMS; PMGD::Graph *PMGDQueryHandler::_db; -std::mutex *PMGDQueryHandler::_dblock; +RWLock *PMGDQueryHandler::_dblock; void PMGDQueryHandler::init() { @@ -53,17 +54,24 @@ void PMGDQueryHandler::init() _db = new PMGD::Graph(dbname.c_str(), PMGD::Graph::Create); // Create the query handler here assuming database is valid now. - _dblock = new std::mutex(); + _dblock = new RWLock(); } std::vector PMGDQueryHandler::process_queries(const PMGDCmds &cmds, - int num_groups) + int num_groups, bool readonly) { std::vector responses(num_groups); assert(_tx == NULL); - _dblock->lock(); + + // Assuming one query handler handles one TX at a time. + _readonly = readonly; + if (_readonly) + _dblock->read_lock(); + else + _dblock->write_lock(); + for (const auto cmd : cmds) { PMGDCmdResponse *response = new PMGDCmdResponse(); if (process_query(cmd, response) < 0) { @@ -85,7 +93,10 @@ std::vector _tx = NULL; } - _dblock->unlock(); + if (_readonly) + _dblock->read_unlock(); + else + _dblock->write_unlock(); return responses; } @@ -121,9 +132,8 @@ int PMGDQueryHandler::process_query(const PMGDCmd *cmd, switch (code) { case PMGDCmd::TxBegin: { - - // TODO: Needs to distinguish transaction parameters like RO/RW - _tx = new Transaction(*_db, Transaction::ReadWrite); + int tx_options = _readonly ? Transaction::ReadOnly : Transaction::ReadWrite; + _tx = new Transaction(*_db, tx_options); set_response(response, protobufs::TX, PMGDCmdResponse::Success); break; } diff --git a/src/PMGDQueryHandler.h b/src/PMGDQueryHandler.h index 22926922..32e833db 100644 --- a/src/PMGDQueryHandler.h +++ b/src/PMGDQueryHandler.h @@ -33,7 +33,6 @@ #include #include -#include #include #include @@ -56,6 +55,8 @@ namespace VDMS { typedef std::vector PMGDCmds; typedef std::vector PMGDCmdResponses; + class RWLock; + class PMGDQueryHandler { class ReusableNodeIterator; @@ -65,10 +66,10 @@ namespace VDMS { static PMGD::Graph *_db; // Need this lock till we have concurrency support in PMGD - // TODO: Make this reader writer. - static std::mutex *_dblock; + static RWLock *_dblock; PMGD::Transaction *_tx; + bool _readonly; // Variable changes per TX based on process_queries parameter. // Map an integer ID to a NodeIterator (reset at the end of each transaction). // This works for Adds and Queries. We assume that the client or @@ -117,7 +118,7 @@ namespace VDMS { public: static void init(); - PMGDQueryHandler() { _tx = NULL; } + PMGDQueryHandler() { _tx = NULL; _readonly = true; } // The vector here can contain just one JL command but will be surrounded by // TX begin and end. So just expose one call to the QueryHandler for @@ -128,6 +129,7 @@ namespace VDMS { // than the number of commands. // Ensure that the cmd_grp_id, that is the query number are in increasing // order and account for the TxBegin and TxEnd in numbering. - std::vector process_queries(const PMGDCmds &cmds, int num_groups); + std::vector process_queries(const PMGDCmds &cmds, + int num_groups, bool readonly); }; }; diff --git a/tests/pmgd_queries.cc b/tests/pmgd_queries.cc index 64a3566d..f690b2b2 100644 --- a/tests/pmgd_queries.cc +++ b/tests/pmgd_queries.cc @@ -158,7 +158,7 @@ TEST(PMGDQueryHandler, addTest) cmds.push_back(&cmdtxcommit); query_count++; - vector> responses = qh.process_queries(cmds, query_count); + vector> responses = qh.process_queries(cmds, query_count, false); int nodeids = 1, edgeids = 1; for (int i = 0; i < query_count; ++i) { vector response = responses[i]; @@ -248,7 +248,7 @@ TEST(PMGDQueryHandler, queryTestList) cmds.push_back(&cmdtxend); query_count++; - vector> responses = qh.process_queries(cmds, query_count); + vector> responses = qh.process_queries(cmds, query_count, true); int nodecount, propcount = 0; for (int i = 0; i < query_count; ++i) { vector response = responses[i]; @@ -312,7 +312,7 @@ TEST(PMGDQueryHandler, queryTestAverage) cmds.push_back(&cmdtxend); query_count++; - vector> responses = qh.process_queries(cmds, query_count); + vector> responses = qh.process_queries(cmds, query_count, true); for (int i = 0; i < query_count; ++i) { vector response = responses[i]; for (auto it : response) { @@ -375,7 +375,7 @@ TEST(PMGDQueryHandler, queryTestUnique) cmds.push_back(&cmdtxend); query_count++; - vector> responses = qh.process_queries(cmds, query_count); + vector> responses = qh.process_queries(cmds, query_count, true); ASSERT_EQ(responses.size(), 1) << "Expecting an error return situation"; for (int i = 0; i < responses.size(); ++i) { vector response = responses[i]; @@ -452,7 +452,7 @@ TEST(PMGDQueryHandler, queryNeighborTestList) cmds.push_back(&cmdtxend); query_count++; - vector> responses = qh.process_queries(cmds, query_count); + vector> responses = qh.process_queries(cmds, query_count, true); int nodecount, propcount = 0; for (int i = 0; i < query_count; ++i) { vector response = responses[i]; @@ -553,7 +553,7 @@ TEST(PMGDQueryHandler, queryConditionalNeighborTestList) cmds.push_back(&cmdtxend); query_count++; - vector> responses = qh.process_queries(cmds, query_count); + vector> responses = qh.process_queries(cmds, query_count, true); int nodecount, propcount = 0; for (int i = 0; i < query_count; ++i) { vector response = responses[i]; @@ -645,7 +645,7 @@ TEST(PMGDQueryHandler, queryNeighborTestSum) cmds.push_back(&cmdtxend); query_count++; - vector> responses = qh.process_queries(cmds, query_count); + vector> responses = qh.process_queries(cmds, query_count, true); int nodecount, propcount = 0; for (int i = 0; i < query_count; ++i) { vector response = responses[i]; @@ -729,7 +729,7 @@ TEST(PMGDQueryHandler, addConstrainedTest) cmds.push_back(&cmdtxcommit); query_count++; - vector> responses = qh.process_queries(cmds, query_count); + vector> responses = qh.process_queries(cmds, query_count, false); // Since PMGD queries always generate one response per command, // we can do the following: @@ -828,7 +828,7 @@ TEST(PMGDQueryHandler, queryNeighborLinksTestList) cmds.push_back(&cmdtxend); query_count++; - vector> responses = qh.process_queries(cmds, query_count); + vector> responses = qh.process_queries(cmds, query_count, true); int nodecount, propcount = 0; for (int i = 0; i < query_count; ++i) { vector response = responses[i]; @@ -942,7 +942,7 @@ TEST(PMGDQueryHandler, queryNeighborLinksReuseTestList) cmds.push_back(&cmdtxend); query_count++; - vector> responses = qh.process_queries(cmds, query_count); + vector> responses = qh.process_queries(cmds, query_count, true); int nodecount = 0, propcount = 0; int totnodecount = 0, totpropcount = 0; for (int i = 0; i < query_count; ++i) { @@ -1067,7 +1067,7 @@ TEST(PMGDQueryHandler, querySortedNeighborLinksReuseTestList) cmds.push_back(&cmdtxend); query_count++; - vector> responses = qh.process_queries(cmds, query_count); + vector> responses = qh.process_queries(cmds, query_count, true); int nodecount = 0, propcount = 0; int totnodecount = 0, totpropcount = 0; bool firstquery = true; @@ -1149,7 +1149,7 @@ TEST(PMGDQueryHandler, queryTestListLimit) cmds.push_back(&cmdtxend); query_count++; - vector> responses = qh.process_queries(cmds, query_count); + vector> responses = qh.process_queries(cmds, query_count, true); int nodecount, propcount = 0; for (int i = 0; i < query_count; ++i) { vector response = responses[i]; @@ -1217,7 +1217,7 @@ TEST(PMGDQueryHandler, queryTestSortedLimitedAverage) cmds.push_back(&cmdtxend); query_count++; - vector> responses = qh.process_queries(cmds, query_count); + vector> responses = qh.process_queries(cmds, query_count, true); for (int i = 0; i < query_count; ++i) { vector response = responses[i]; for (auto it : response) {