Skip to content

Commit

Permalink
Eliminate the use of CLockFreeGuard with AtomicMutex (#1800)
Browse files Browse the repository at this point in the history
* Make CLockFreeGuard safer

* Fix getburninfo consortium and result allocations

* Fix CLockFreeGuard, introduce AtomicMutex

* Fix missing expected

* Use larger thread quantums

* Increase default yields since sleep quantums are larger

* Fix syntax

* Add more comments

* Introduce BufferPool

* Switch getburninfo to use BufferPool

* Eliminate CLockFreeGuard, use AtomicMutex

---------

Co-authored-by: Peter John Bushnell <[email protected]>
  • Loading branch information
prasannavl and Bushstar authored Mar 6, 2023
1 parent 6813315 commit 71fcf5e
Show file tree
Hide file tree
Showing 10 changed files with 26 additions and 50 deletions.
4 changes: 2 additions & 2 deletions src/miner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,7 @@ namespace pos {

//initialize static variables here
std::map<uint256, int64_t> Staker::mapMNLastBlockCreationAttemptTs;
std::atomic_bool Staker::cs_MNLastBlockCreationAttemptTs(false);
AtomicMutex cs_MNLastBlockCreationAttemptTs;
int64_t Staker::nLastCoinStakeSearchTime{0};
int64_t Staker::nFutureTime{0};
uint256 Staker::lastBlockSeen{};
Expand Down Expand Up @@ -825,7 +825,7 @@ namespace pos {
withSearchInterval([&](const int64_t currentTime, const int64_t lastSearchTime, const int64_t futureTime) {
// update last block creation attempt ts for the master node here
{
CLockFreeGuard lock(pos::Staker::cs_MNLastBlockCreationAttemptTs);
std::unique_lock l{pos::cs_MNLastBlockCreationAttemptTs};
pos::Staker::mapMNLastBlockCreationAttemptTs[masternodeID] = GetTime();
}
CheckContextState ctxState;
Expand Down
4 changes: 3 additions & 1 deletion src/miner.h
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,9 @@ namespace pos {
// The main staking routine.
// Creates stakes using CWallet API, creates PoS kernels and mints blocks.
// Uses Args.getWallets() to receive and update wallets list.

extern AtomicMutex cs_MNLastBlockCreationAttemptTs;

class ThreadStaker {
public:

Expand Down Expand Up @@ -258,7 +261,6 @@ namespace pos {
// declaration static variables
// Map to store [master node id : last block creation attempt timestamp] for local master nodes
static std::map<uint256, int64_t> mapMNLastBlockCreationAttemptTs;
static std::atomic_bool cs_MNLastBlockCreationAttemptTs;

// Variables to manage search time across threads
static int64_t nLastCoinStakeSearchTime;
Expand Down
2 changes: 1 addition & 1 deletion src/rpc/mining.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ static UniValue getmininginfo(const JSONRPCRequest& request)
subObj.pushKV("lastblockcreationattempt", "0");
} else {
// get the last block creation attempt by the master node
CLockFreeGuard lock(pos::Staker::cs_MNLastBlockCreationAttemptTs);
std::unique_lock l{pos::cs_MNLastBlockCreationAttemptTs};
auto lastBlockCreationAttemptTs = pos::Staker::mapMNLastBlockCreationAttemptTs[mnId.second];
subObj.pushKV("lastblockcreationattempt", (lastBlockCreationAttemptTs != 0) ? FormatISO8601DateTime(lastBlockCreationAttemptTs) : "0");
}
Expand Down
14 changes: 7 additions & 7 deletions src/rpc/resultcache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#include <logging.h>

void RPCResultCache::Init(RPCCacheMode mode) {
CLockFreeGuard lock{syncFlag};
std::unique_lock l{aMutex};
this->mode = mode;
}

Expand All @@ -14,7 +14,7 @@ std::string GetKey(const JSONRPCRequest &request) {
}

bool RPCResultCache::InvalidateCaches() {
CLockFreeGuard lock{syncFlag};
std::unique_lock l{aMutex};
auto height = GetLastValidatedHeight();
if (cacheHeight != height) {
LogPrint(BCLog::RPCCACHE, "RPCCache: clear\n");
Expand All @@ -33,7 +33,7 @@ std::optional<UniValue> RPCResultCache::TryGet(const JSONRPCRequest &request) {
auto key = GetKey(request);
UniValue val;
{
CLockFreeGuard lock{syncFlag};
std::unique_lock l{aMutex};
if (auto res = cacheMap.find(key); res != cacheMap.end()) {
if (LogAcceptCategory(BCLog::RPCCACHE)) {
LogPrint(BCLog::RPCCACHE, "RPCCache: hit: key: %d/%s, val: %s\n", cacheHeight, key, res->second.write());
Expand All @@ -47,7 +47,7 @@ std::optional<UniValue> RPCResultCache::TryGet(const JSONRPCRequest &request) {
const UniValue& RPCResultCache::Set(const JSONRPCRequest &request, const UniValue &value) {
auto key = GetKey(request);
{
CLockFreeGuard lock{syncFlag};
std::unique_lock l{aMutex};
if (LogAcceptCategory(BCLog::RPCCACHE)) {
LogPrint(BCLog::RPCCACHE, "RPCCache: set: key: %d/%s, val: %s\n", cacheHeight, key, value.write());
}
Expand Down Expand Up @@ -77,15 +77,15 @@ void SetLastValidatedHeight(int height) {
}

void MemoizedResultCache::Init(RPCResultCache::RPCCacheMode mode) {
CLockFreeGuard lock{syncFlag};
std::unique_lock l{aMutex};
this->mode = mode;
}

CMemoizedResultValue MemoizedResultCache::GetOrDefault(const JSONRPCRequest &request) {
if (mode == RPCResultCache::RPCCacheMode::None) return {};
auto key = GetKey(request);
{
CLockFreeGuard lock{syncFlag};
std::unique_lock l{aMutex};
if (auto res = cacheMap.find(key); res != cacheMap.end()) {
if (!::ChainActive().Contains(LookupBlockIndex(res->second.hash)))
return {};
Expand All @@ -102,7 +102,7 @@ CMemoizedResultValue MemoizedResultCache::GetOrDefault(const JSONRPCRequest &req
void MemoizedResultCache::Set(const JSONRPCRequest &request, const CMemoizedResultValue &value) {
auto key = GetKey(request);
{
CLockFreeGuard lock{syncFlag};
std::unique_lock l{aMutex};
if (LogAcceptCategory(BCLog::RPCCACHE)) {
LogPrint(BCLog::RPCCACHE, "RPCCache: set: key: %d/%s\n", value.height, key);
}
Expand Down
4 changes: 2 additions & 2 deletions src/rpc/resultcache.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class RPCResultCache {
bool InvalidateCaches();

private:
std::atomic_bool syncFlag{false};
AtomicMutex aMutex;
std::set<std::string> smartModeList{};
RPCCacheMode mode{RPCCacheMode::None};
std::map<std::string, UniValue> cacheMap{};
Expand All @@ -60,7 +60,7 @@ class MemoizedResultCache {
CMemoizedResultValue GetOrDefault(const JSONRPCRequest &request);
void Set(const JSONRPCRequest &request, const CMemoizedResultValue &value);
private:
std::atomic_bool syncFlag{false};
AtomicMutex aMutex;
std::map<std::string, CMemoizedResultValue> cacheMap{};
RPCResultCache::RPCCacheMode mode{RPCResultCache::RPCCacheMode::None};
};
Expand Down
8 changes: 4 additions & 4 deletions src/rpc/stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ bool CRPCStats::isActive() { return active.load(); }
void CRPCStats::setActive(bool isActive) { active.store(isActive); }

std::optional<RPCStats> CRPCStats::get(const std::string& name) {
CLockFreeGuard lock(lock_stats);
std::unique_lock lock(lock_stats);

auto it = map.find(name);
if (it == map.end()) {
Expand All @@ -16,7 +16,7 @@ std::optional<RPCStats> CRPCStats::get(const std::string& name) {
}

std::map<std::string, RPCStats> CRPCStats::getMap() {
CLockFreeGuard lock(lock_stats);
std::unique_lock lock(lock_stats);
return map;
}

Expand All @@ -41,7 +41,7 @@ void CRPCStats::load() {
UniValue arr(UniValue::VARR);
arr.read((const std::string)line);

CLockFreeGuard lock(lock_stats);
std::unique_lock lock(lock_stats);
for (const auto &val : arr.getValues()) {
auto name = val["name"].get_str();
map[name] = RPCStats::fromJSON(val);
Expand Down Expand Up @@ -140,7 +140,7 @@ void CRPCStats::add(const std::string& name, const int64_t latency, const int64_
}
stats->history.push_back({ stats->lastUsedTime, latency, payload });

CLockFreeGuard lock(lock_stats);
std::unique_lock lock(lock_stats);
map[name] = *stats;
}

Expand Down
2 changes: 1 addition & 1 deletion src/rpc/stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ struct RPCStats {
class CRPCStats
{
private:
std::atomic_bool lock_stats{false};
AtomicMutex lock_stats;
std::map<std::string, RPCStats> map;
std::atomic_bool active{DEFAULT_RPC_STATS};

Expand Down
26 changes: 0 additions & 26 deletions src/sync.h
Original file line number Diff line number Diff line change
Expand Up @@ -389,31 +389,5 @@ class AtomicMutex {
}
};

class CLockFreeGuard
{
std::atomic_bool& lock;
public:
CLockFreeGuard(std::atomic_bool& lock) : lock(lock)
{
bool expected = false;
while (std::atomic_compare_exchange_weak_explicit(
&lock,
&expected, true,
std::memory_order_seq_cst,
std::memory_order_relaxed) == false) {
// Could have been a spurious failure or another thread could have taken the
// lock in-between since we're now out of the atomic ops.
// Reset expected to start from scratch again, since we only want
// a singular atomic false -> true transition.
expected = false;
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}

~CLockFreeGuard()
{
lock.store(false);
}
};

#endif // DEFI_SYNC_H
4 changes: 2 additions & 2 deletions src/test/sync_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,13 @@ BOOST_AUTO_TEST_CASE(lock_free)
constexpr int num_threads = 10;

auto testFunc = []() {
static std::atomic_bool cs_lock;
static AtomicMutex m;
static std::atomic_int context(0);
static std::atomic_int threads(num_threads);

threads--; // every thread decrements count

CLockFreeGuard lock(cs_lock);
std::unique_lock lock{m};
context++;
while (threads > 0); // wait all threads to be here
BOOST_CHECK_EQUAL(threads.load(), 0); // now they wait for lock
Expand Down
8 changes: 4 additions & 4 deletions src/wallet/ismine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,19 +210,19 @@ struct CCacheInfo

isminetype IsMineCached(const CWallet& keystore, CScript const & script)
{
static std::atomic_bool cs_cache(false);
static AtomicMutex cs_cache;
static std::unordered_map<const CWallet*, CCacheInfo> cache;
auto* wallet = &keystore;
CLockFreeGuard lock(cs_cache);
std::unique_lock lock(cs_cache);
auto it = cache.find(wallet);
if (it == cache.end()) {
it = cache.emplace(wallet, CCacheInfo{{}, {
wallet->NotifyOwnerChanged.connect([wallet](CScript const & owner) {
CLockFreeGuard lock(cs_cache);
std::unique_lock lock(cs_cache);
cache[wallet].mineData.erase(owner);
}),
wallet->NotifyUnload.connect([wallet]() {
CLockFreeGuard lock(cs_cache);
std::unique_lock lock(cs_cache);
cache.erase(wallet);
}),
}}).first;
Expand Down

0 comments on commit 71fcf5e

Please sign in to comment.