Skip to content

Commit

Permalink
Merge pull request PIVX-Project#2725 from codablock/pr_llmq_hashmaps
Browse files Browse the repository at this point in the history
Add more caching to CRecoveredSigsDb and use salted hashing for externally provided keys
  • Loading branch information
codablock authored and panleone committed Oct 29, 2024
1 parent a0084f5 commit 7ccd790
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 71 deletions.
34 changes: 26 additions & 8 deletions src/llmq/quorums_signing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,23 @@ bool CRecoveredSigsDb::HasRecoveredSigForSession(const uint256& signHash)

bool CRecoveredSigsDb::HasRecoveredSigForHash(const uint256& hash)
{
int64_t t = GetTimeMillis();

{
LOCK(cs);
auto it = hasSigForHashCache.find(hash);
if (it != hasSigForHashCache.end()) {
it->second.second = t;
return it->second.first;
}
}

auto k = std::make_tuple('h', hash);
return db.Exists(k);
bool ret = db.Exists(k);

LOCK(cs);
hasSigForHashCache.emplace(hash, std::make_pair(ret, t));
return ret;
}

bool CRecoveredSigsDb::ReadRecoveredSig(Consensus::LLMQType llmqType, const uint256& id, CRecoveredSig& ret)
Expand Down Expand Up @@ -153,13 +168,14 @@ void CRecoveredSigsDb::WriteRecoveredSig(const llmq::CRecoveredSig& recSig)
LOCK(cs);
hasSigForIdCache[std::make_pair((Consensus::LLMQType)recSig.llmqType, recSig.id)] = std::make_pair(true, t);
hasSigForSessionCache[signHash] = std::make_pair(true, t);
hasSigForHashCache[recSig.GetHash()] = std::make_pair(true, t);
}
}

template <typename K>
static void TruncateCacheMap(std::unordered_map<K, std::pair<bool, int64_t>>& m, size_t maxSize, size_t truncateThreshold)
template <typename K, typename H>
static void TruncateCacheMap(std::unordered_map<K, std::pair<bool, int64_t>, H>& m, size_t maxSize, size_t truncateThreshold)
{
typedef typename std::unordered_map<K, std::pair<bool, int64_t>> Map;
typedef typename std::unordered_map<K, std::pair<bool, int64_t>, H> Map;
typedef typename Map::iterator Iterator;

if (m.size() <= truncateThreshold) {
Expand Down Expand Up @@ -238,10 +254,12 @@ void CRecoveredSigsDb::CleanupOldRecoveredSigs(int64_t maxAge)

hasSigForIdCache.erase(std::make_pair((Consensus::LLMQType)recSig.llmqType, recSig.id));
hasSigForSessionCache.erase(signHash);
hasSigForHashCache.erase(recSig.GetHash());
}

TruncateCacheMap(hasSigForIdCache, MAX_CACHE_SIZE, MAX_CACHE_TRUNCATE_THRESHOLD);
TruncateCacheMap(hasSigForSessionCache, MAX_CACHE_SIZE, MAX_CACHE_TRUNCATE_THRESHOLD);
TruncateCacheMap(hasSigForHashCache, MAX_CACHE_SIZE, MAX_CACHE_TRUNCATE_THRESHOLD);
}

for (auto& e : toDelete2) {
Expand Down Expand Up @@ -357,15 +375,15 @@ bool CSigningManager::PreVerifyRecoveredSig(NodeId nodeId, const CRecoveredSig&
void CSigningManager::CollectPendingRecoveredSigsToVerify(
size_t maxUniqueSessions,
std::unordered_map<NodeId, std::list<CRecoveredSig>>& retSigShares,
std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr>& retQuorums)
std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher>& retQuorums)
{
{
LOCK(cs);
if (pendingRecoveredSigs.empty()) {
return;
}

std::unordered_set<std::pair<NodeId, uint256>> uniqueSignHashes;
std::unordered_set<std::pair<NodeId, uint256>, StaticSaltedHasher> uniqueSignHashes;
llmq::utils::IterateNodesRandom(pendingRecoveredSigs, [&]() { return uniqueSignHashes.size() < maxUniqueSessions; }, [&](NodeId nodeId, std::list<CRecoveredSig>& ns) {
if (ns.empty()) {
return false;
Expand Down Expand Up @@ -420,7 +438,7 @@ void CSigningManager::CollectPendingRecoveredSigsToVerify(
bool CSigningManager::ProcessPendingRecoveredSigs(CConnman& connman)
{
std::unordered_map<NodeId, std::list<CRecoveredSig>> recSigsByNode;
std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr> quorums;
std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher> quorums;

CollectPendingRecoveredSigsToVerify(32, recSigsByNode, quorums);
if (recSigsByNode.empty()) {
Expand Down Expand Up @@ -449,7 +467,7 @@ bool CSigningManager::ProcessPendingRecoveredSigs(CConnman& connman)

LogPrintf("llmq", "CSigningManager::%s -- verified recovered sig(s). count=%d, vt=%d, nodes=%d\n", __func__, verifyCount, verifyTimer.count(), recSigsByNode.size());

std::unordered_set<uint256> processed;
std::unordered_set<uint256, StaticSaltedHasher> processed;
for (auto& p : recSigsByNode) {
NodeId nodeId = p.first;
auto& v = p.second;
Expand Down
22 changes: 7 additions & 15 deletions src/llmq/quorums_signing.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,11 @@

#include "chainparams.h"
#include "net.h"
#include "saltedhasher.h"
#include "sync.h"

#include <unordered_map>

namespace std
{
template <>
struct hash<std::pair<Consensus::LLMQType, uint256>> {
std::size_t operator()(const std::pair<Consensus::LLMQType, uint256>& k) const
{
return (std::size_t)((k.first + 1) * k.second.GetCheapHash());
}
};
} // namespace std

namespace llmq
{

Expand Down Expand Up @@ -77,7 +67,6 @@ class CRecoveredSig
}
};

// TODO implement caching to speed things up
class CRecoveredSigsDb
{
static const size_t MAX_CACHE_SIZE = 30000;
Expand All @@ -87,8 +76,9 @@ class CRecoveredSigsDb
CDBWrapper db;

RecursiveMutex cs;
std::unordered_map<std::pair<Consensus::LLMQType, uint256>, std::pair<bool, int64_t>> hasSigForIdCache;
std::unordered_map<uint256, std::pair<bool, int64_t>> hasSigForSessionCache;
std::unordered_map<std::pair<Consensus::LLMQType, uint256>, std::pair<bool, int64_t>, StaticSaltedHasher> hasSigForIdCache;
std::unordered_map<uint256, std::pair<bool, int64_t>, StaticSaltedHasher> hasSigForSessionCache;
std::unordered_map<uint256, std::pair<bool, int64_t>, StaticSaltedHasher> hasSigForHashCache;

public:
CRecoveredSigsDb(bool fMemory);
Expand Down Expand Up @@ -157,7 +147,9 @@ class CSigningManager
void ProcessMessageRecoveredSig(CNode* pfrom, const CRecoveredSig& recoveredSig, CConnman& connman);
bool PreVerifyRecoveredSig(NodeId nodeId, const CRecoveredSig& recoveredSig, bool& retBan);

void CollectPendingRecoveredSigsToVerify(size_t maxUniqueSessions, std::unordered_map<NodeId, std::list<CRecoveredSig>>& retSigShares, std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr>& retQuorums);
void CollectPendingRecoveredSigsToVerify(size_t maxUniqueSessions,
std::unordered_map<NodeId, std::list<CRecoveredSig>>& retSigShares,
std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher>& retQuorums);
bool ProcessPendingRecoveredSigs(CConnman& connman); // called from the worker thread of CSigSharesManager
void ProcessRecoveredSig(NodeId nodeId, const CRecoveredSig& recoveredSig, const CQuorumCPtr& quorum, CConnman& connman);
void Cleanup(); // called from the worker thread of CSigSharesManager
Expand Down
39 changes: 20 additions & 19 deletions src/llmq/quorums_signing_shares.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ bool CSigSharesManager::PreVerifyBatchedSigShares(NodeId nodeId, const CBatchedS
void CSigSharesManager::CollectPendingSigSharesToVerify(
size_t maxUniqueSessions,
std::unordered_map<NodeId, std::vector<CSigShare>>& retSigShares,
std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr>& retQuorums)
std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher>& retQuorums)
{
{
LOCK(cs);
Expand All @@ -394,7 +394,7 @@ void CSigSharesManager::CollectPendingSigSharesToVerify(
// invalid, making batch verification fail and revert to per-share verification, which in turn would slow down
// the whole verification process

std::unordered_set<std::pair<NodeId, uint256>> uniqueSignHashes;
std::unordered_set<std::pair<NodeId, uint256>, StaticSaltedHasher> uniqueSignHashes;
llmq::utils::IterateNodesRandom(nodeStates, [&]() { return uniqueSignHashes.size() < maxUniqueSessions; }, [&](NodeId nodeId, CSigSharesNodeState& ns) {
if (ns.pendingIncomingSigShares.Empty()) {
return false;
Expand Down Expand Up @@ -439,7 +439,7 @@ void CSigSharesManager::CollectPendingSigSharesToVerify(
bool CSigSharesManager::ProcessPendingSigShares(CConnman& connman)
{
std::unordered_map<NodeId, std::vector<CSigShare>> sigSharesByNodes;
std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr> quorums;
std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher> quorums;

CollectPendingSigSharesToVerify(32, sigSharesByNodes, quorums);
if (sigSharesByNodes.empty()) {
Expand Down Expand Up @@ -508,7 +508,10 @@ bool CSigSharesManager::ProcessPendingSigShares(CConnman& connman)
}

// It's ensured that no duplicates are passed to this method
void CSigSharesManager::ProcessPendingSigSharesFromNode(NodeId nodeId, const std::vector<CSigShare>& sigShares, const std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr>& quorums, CConnman& connman)
void CSigSharesManager::ProcessPendingSigSharesFromNode(NodeId nodeId,
const std::vector<CSigShare>& sigShares,
const std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher>& quorums,
CConnman& connman)
{
LOCK(cs);
auto& nodeState = nodeStates[nodeId];
Expand Down Expand Up @@ -660,11 +663,9 @@ void CSigSharesManager::TryRecoverSig(const CQuorumCPtr& quorum, const uint256&
}

// cs must be held
void CSigSharesManager::CollectSigSharesToRequest(std::unordered_map<NodeId, std::unordered_map<uint256, CSigSharesInv>>& sigSharesToRequest)
void CSigSharesManager::CollectSigSharesToRequest(std::unordered_map<NodeId, std::unordered_map<uint256, CSigSharesInv, StaticSaltedHasher>>& sigSharesToRequest)
{
int64_t now = GetTimeMillis();
std::unordered_map<SigShareKey, std::vector<NodeId>> nodesBySigShares;

const size_t maxRequestsForNode = 32;

// avoid requesting from same nodes all the time
Expand Down Expand Up @@ -695,7 +696,7 @@ void CSigSharesManager::CollectSigSharesToRequest(std::unordered_map<NodeId, std
return false;
});

std::unordered_map<uint256, CSigSharesInv>* invMap = nullptr;
decltype(sigSharesToRequest.begin()->second)* invMap = nullptr;

for (auto& p2 : nodeState.sessions) {
auto& signHash = p2.first;
Expand Down Expand Up @@ -756,7 +757,7 @@ void CSigSharesManager::CollectSigSharesToRequest(std::unordered_map<NodeId, std
}

// cs must be held
void CSigSharesManager::CollectSigSharesToSend(std::unordered_map<NodeId, std::unordered_map<uint256, CBatchedSigShares>>& sigSharesToSend)
void CSigSharesManager::CollectSigSharesToSend(std::unordered_map<NodeId, std::unordered_map<uint256, CBatchedSigShares, StaticSaltedHasher>>& sigSharesToSend)
{
for (auto& p : nodeStates) {
auto nodeId = p.first;
Expand All @@ -766,7 +767,7 @@ void CSigSharesManager::CollectSigSharesToSend(std::unordered_map<NodeId, std::u
continue;
}

std::unordered_map<uint256, CBatchedSigShares>* sigSharesToSend2 = nullptr;
decltype(sigSharesToSend.begin()->second)* sigSharesToSend2 = nullptr;

for (auto& p2 : nodeState.sessions) {
auto& signHash = p2.first;
Expand Down Expand Up @@ -813,9 +814,9 @@ void CSigSharesManager::CollectSigSharesToSend(std::unordered_map<NodeId, std::u
}

// cs must be held
void CSigSharesManager::CollectSigSharesToAnnounce(std::unordered_map<NodeId, std::unordered_map<uint256, CSigSharesInv>>& sigSharesToAnnounce)
void CSigSharesManager::CollectSigSharesToAnnounce(std::unordered_map<NodeId, std::unordered_map<uint256, CSigSharesInv, StaticSaltedHasher>>& sigSharesToAnnounce)
{
std::unordered_set<std::pair<Consensus::LLMQType, uint256>> quorumNodesPrepared;
std::unordered_set<std::pair<Consensus::LLMQType, uint256>, StaticSaltedHasher> quorumNodesPrepared;

this->sigSharesToAnnounce.ForEach([&](const SigShareKey& sigShareKey, bool) {
auto& signHash = sigShareKey.first;
Expand Down Expand Up @@ -883,9 +884,9 @@ bool CSigSharesManager::SendMessages()
nodesByAddress.emplace(pnode->addr, pnode->GetId());
});

std::unordered_map<NodeId, std::unordered_map<uint256, CSigSharesInv>> sigSharesToRequest;
std::unordered_map<NodeId, std::unordered_map<uint256, CBatchedSigShares>> sigSharesToSend;
std::unordered_map<NodeId, std::unordered_map<uint256, CSigSharesInv>> sigSharesToAnnounce;
std::unordered_map<NodeId, std::unordered_map<uint256, CSigSharesInv, StaticSaltedHasher>> sigSharesToRequest;
std::unordered_map<NodeId, std::unordered_map<uint256, CBatchedSigShares, StaticSaltedHasher>> sigSharesToSend;
std::unordered_map<NodeId, std::unordered_map<uint256, CSigSharesInv, StaticSaltedHasher>> sigSharesToAnnounce;

{
LOCK(cs);
Expand Down Expand Up @@ -945,13 +946,13 @@ void CSigSharesManager::Cleanup()
return;
}

std::unordered_set<std::pair<Consensus::LLMQType, uint256>> quorumsToCheck;
std::unordered_set<std::pair<Consensus::LLMQType, uint256>, StaticSaltedHasher> quorumsToCheck;

{
LOCK(cs);

// Remove sessions which were successfully recovered
std::unordered_set<uint256> doneSessions;
std::unordered_set<uint256, StaticSaltedHasher> doneSessions;
sigShares.ForEach([&](const SigShareKey& k, const CSigShare& sigShare) {
if (doneSessions.count(sigShare.GetSignHash())) {
return;
Expand All @@ -965,7 +966,7 @@ void CSigSharesManager::Cleanup()
}

// Remove sessions which timed out
std::unordered_set<uint256> timeoutSessions;
std::unordered_set<uint256, StaticSaltedHasher> timeoutSessions;
for (auto& p : timeSeenForSessions) {
auto& signHash = p.first;
int64_t firstSeenTime = p.second.first;
Expand Down Expand Up @@ -1009,7 +1010,7 @@ void CSigSharesManager::Cleanup()
{
// Now delete sessions which are for inactive quorums
LOCK(cs);
std::unordered_set<uint256> inactiveQuorumSessions;
std::unordered_set<uint256, StaticSaltedHasher> inactiveQuorumSessions;
sigShares.ForEach([&](const SigShareKey& k, const CSigShare& sigShare) {
if (quorumsToCheck.count(std::make_pair((Consensus::LLMQType)sigShare.llmqType, sigShare.quorumHash))) {
inactiveQuorumSessions.emplace(sigShare.GetSignHash());
Expand Down
42 changes: 13 additions & 29 deletions src/llmq/quorums_signing_shares.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "consensus/params.h"
#include "net.h"
#include "random.h"
#include "saltedhasher.h"
#include "serialize.h"
#include "sync.h"
#include "tinyformat.h"
Expand All @@ -29,28 +30,6 @@ namespace llmq
{
// <signHash, quorumMember>
typedef std::pair<uint256, uint16_t> SigShareKey;
} // namespace llmq

namespace std
{
template <>
struct hash<llmq::SigShareKey> {
std::size_t operator()(const llmq::SigShareKey& k) const
{
return (std::size_t)((k.second + 1) * k.first.GetCheapHash());
}
};
template <>
struct hash<std::pair<NodeId, uint256>> {
std::size_t operator()(const std::pair<NodeId, uint256>& k) const
{
return (std::size_t)((k.first + 1) * k.second.GetCheapHash());
}
};
} // namespace std

namespace llmq
{

// this one does not get transmitted over the wire as it is batched inside CBatchedSigShares
class CSigShare
Expand Down Expand Up @@ -151,7 +130,7 @@ template <typename T>
class SigShareMap
{
private:
std::unordered_map<uint256, std::unordered_map<uint16_t, T>> internalMap;
std::unordered_map<uint256, std::unordered_map<uint16_t, T>, StaticSaltedHasher> internalMap;

public:
bool Add(const SigShareKey& k, const T& v)
Expand Down Expand Up @@ -340,7 +319,7 @@ class CSigSharesManager : public CRecoveredSigsListener
SigShareMap<CSigShare> sigShares;

// stores time of first and last receivedSigShare. Used to detect timeouts
std::unordered_map<uint256, std::pair<int64_t, int64_t>> timeSeenForSessions;
std::unordered_map<uint256, std::pair<int64_t, int64_t>, StaticSaltedHasher> timeSeenForSessions;

std::unordered_map<NodeId, CSigSharesNodeState> nodeStates;
SigShareMap<std::pair<NodeId, int64_t>> sigSharesRequested;
Expand Down Expand Up @@ -379,10 +358,15 @@ class CSigSharesManager : public CRecoveredSigsListener
bool VerifySigSharesInv(NodeId from, const CSigSharesInv& inv);
bool PreVerifyBatchedSigShares(NodeId nodeId, const CBatchedSigShares& batchedSigShares, bool& retBan);

void CollectPendingSigSharesToVerify(size_t maxUniqueSessions, std::unordered_map<NodeId, std::vector<CSigShare>>& retSigShares, std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr>& retQuorums);
void CollectPendingSigSharesToVerify(size_t maxUniqueSessions,
std::unordered_map<NodeId, std::vector<CSigShare>>& retSigShares,
std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher>& retQuorums);
bool ProcessPendingSigShares(CConnman& connman);

void ProcessPendingSigSharesFromNode(NodeId nodeId, const std::vector<CSigShare>& sigShares, const std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr>& quorums, CConnman& connman);
void ProcessPendingSigSharesFromNode(NodeId nodeId,
const std::vector<CSigShare>& sigShares,
const std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher>& quorums,
CConnman& connman);

void ProcessSigShare(NodeId nodeId, const CSigShare& sigShare, CConnman& connman, const CQuorumCPtr& quorum);
void TryRecoverSig(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash, CConnman& connman);
Expand All @@ -395,9 +379,9 @@ class CSigSharesManager : public CRecoveredSigsListener
void BanNode(NodeId nodeId);

bool SendMessages();
void CollectSigSharesToRequest(std::unordered_map<NodeId, std::unordered_map<uint256, CSigSharesInv>>& sigSharesToRequest);
void CollectSigSharesToSend(std::unordered_map<NodeId, std::unordered_map<uint256, CBatchedSigShares>>& sigSharesToSend);
void CollectSigSharesToAnnounce(std::unordered_map<NodeId, std::unordered_map<uint256, CSigSharesInv>>& sigSharesToAnnounce);
void CollectSigSharesToRequest(std::unordered_map<NodeId, std::unordered_map<uint256, CSigSharesInv, StaticSaltedHasher>>& sigSharesToRequest);
void CollectSigSharesToSend(std::unordered_map<NodeId, std::unordered_map<uint256, CBatchedSigShares, StaticSaltedHasher>>& sigSharesToSend);
void CollectSigSharesToAnnounce(std::unordered_map<NodeId, std::unordered_map<uint256, CSigSharesInv, StaticSaltedHasher>>& sigSharesToAnnounce);
bool SignPendingSigShares();
void WorkThreadMain();
};
Expand Down

0 comments on commit 7ccd790

Please sign in to comment.