Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More Dash llmq backports #2941

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
20 changes: 13 additions & 7 deletions src/llmq/quorums_chainlocks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,17 +77,17 @@ void CChainLocksHandler::ProcessMessage(CNode* pfrom, const std::string& strComm

auto hash = ::SerializeHash(clsig);

{
LOCK(cs_main);
connman.RemoveAskFor(hash, MSG_CLSIG);
}

ProcessNewChainLock(pfrom->GetId(), clsig, hash);
}
}

void CChainLocksHandler::ProcessNewChainLock(NodeId from, const llmq::CChainLockSig& clsig, const uint256& hash)
{
{
LOCK(cs_main);
g_connman->RemoveAskFor(hash, MSG_CLSIG);
}

{
LOCK(cs);
if (!seenChainLocks.emplace(hash, GetTimeMillis()).second) {
Expand Down Expand Up @@ -190,6 +190,8 @@ void CChainLocksHandler::UpdatedBlockTip(const CBlockIndex* pindexNew, const CBl
return;
}

Cleanup();

// DIP8 defines a process called "Signing attempts" which should run before the CLSIG is finalized
// To simplify the initial implementation, we skip this process and directly try to create a CLSIG
// This will fail when multiple blocks compete, but we accept this for the initial implementation.
Expand All @@ -201,6 +203,12 @@ void CChainLocksHandler::UpdatedBlockTip(const CBlockIndex* pindexNew, const CBl
{
LOCK(cs);

if (bestChainLockBlockIndex == pindexNew) {
// we first got the CLSIG, then the header, and then the block was connected.
// In this case there is no need to continue here.
return;
}

if (InternalHasConflictingChainLock(pindexNew->nHeight, pindexNew->GetBlockHash())) {
if (!inEnforceBestChainLock) {
// we accepted this block when there was no lock yet, but now a conflicting lock appeared. Invalidate it.
Expand All @@ -226,8 +234,6 @@ void CChainLocksHandler::UpdatedBlockTip(const CBlockIndex* pindexNew, const CBl
}

quorumSigningManager->AsyncSignIfMember(Params().GetConsensus().llmqChainLocks, requestId, msgHash);

Cleanup();
}

// WARNING: cs_main and cs should not be held!
Expand Down
2 changes: 2 additions & 0 deletions src/llmq/quorums_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ void StartLLMQSystem()
quorumDKGSessionManager->StartThreads();
}
if (quorumSigSharesManager) {
quorumSigSharesManager->RegisterAsRecoveredSigsListener();
quorumSigSharesManager->StartWorkerThread();
}
if (chainLocksHandler) {
Expand All @@ -68,6 +69,7 @@ void StopLLMQSystem()
}
if (quorumSigSharesManager) {
quorumSigSharesManager->StopWorkerThread();
quorumSigSharesManager->UnregisterAsRecoveredSigsListener();
}
if (quorumDKGSessionManager) {
quorumDKGSessionManager->StopThreads();
Expand Down
79 changes: 58 additions & 21 deletions src/llmq/quorums_signing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include <algorithm>
#include <limits>
#include <unordered_set>

namespace llmq
{
Expand Down Expand Up @@ -79,8 +80,23 @@ bool CRecoveredSigsDb::HasRecoveredSigForSession(const uint256& signHash)

bool CRecoveredSigsDb::HasRecoveredSigForHash(const uint256& hash)
{
int64_t t = GetTimeMillis();

{
LOCK(cs);
auto it = hasSigForHashCache.find(hash);
if (it != hasSigForHashCache.end()) {
it->second.second = t;
return it->second.first;
}
}

auto k = std::make_tuple('h', hash);
return db.Exists(k);
bool ret = db.Exists(k);

LOCK(cs);
hasSigForHashCache.emplace(hash, std::make_pair(ret, t));
return ret;
}

bool CRecoveredSigsDb::ReadRecoveredSig(Consensus::LLMQType llmqType, const uint256& id, CRecoveredSig& ret)
Expand Down Expand Up @@ -152,13 +168,14 @@ void CRecoveredSigsDb::WriteRecoveredSig(const llmq::CRecoveredSig& recSig)
LOCK(cs);
hasSigForIdCache[std::make_pair((Consensus::LLMQType)recSig.llmqType, recSig.id)] = std::make_pair(true, t);
hasSigForSessionCache[signHash] = std::make_pair(true, t);
hasSigForHashCache[recSig.GetHash()] = std::make_pair(true, t);
}
}

template<typename K>
static void TruncateCacheMap(std::unordered_map<K, std::pair<bool, int64_t>>& m, size_t maxSize, size_t truncateThreshold)
template <typename K, typename H>
static void TruncateCacheMap(std::unordered_map<K, std::pair<bool, int64_t>, H>& m, size_t maxSize, size_t truncateThreshold)
{
typedef typename std::unordered_map<K, std::pair<bool, int64_t>> Map;
typedef typename std::unordered_map<K, std::pair<bool, int64_t>, H> Map;
typedef typename Map::iterator Iterator;

if (m.size() <= truncateThreshold) {
Expand Down Expand Up @@ -237,10 +254,12 @@ void CRecoveredSigsDb::CleanupOldRecoveredSigs(int64_t maxAge)

hasSigForIdCache.erase(std::make_pair((Consensus::LLMQType)recSig.llmqType, recSig.id));
hasSigForSessionCache.erase(signHash);
hasSigForHashCache.erase(recSig.GetHash());
}

TruncateCacheMap(hasSigForIdCache, MAX_CACHE_SIZE, MAX_CACHE_TRUNCATE_THRESHOLD);
TruncateCacheMap(hasSigForSessionCache, MAX_CACHE_SIZE, MAX_CACHE_TRUNCATE_THRESHOLD);
TruncateCacheMap(hasSigForHashCache, MAX_CACHE_SIZE, MAX_CACHE_TRUNCATE_THRESHOLD);
}

for (auto& e : toDelete2) {
Expand Down Expand Up @@ -355,18 +374,17 @@ bool CSigningManager::PreVerifyRecoveredSig(NodeId nodeId, const CRecoveredSig&

void CSigningManager::CollectPendingRecoveredSigsToVerify(
size_t maxUniqueSessions,
std::map<NodeId, std::list<CRecoveredSig>>& retSigShares,
std::map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr>& retQuorums)
std::unordered_map<NodeId, std::list<CRecoveredSig>>& retSigShares,
std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher>& retQuorums)
{
{
LOCK(cs);
if (pendingRecoveredSigs.empty()) {
return;
}

std::set<std::pair<NodeId, uint256>> uniqueSignHashes;
llmq::utils::IterateNodesRandom(
pendingRecoveredSigs, [&]() { return uniqueSignHashes.size() < maxUniqueSessions; }, [&](NodeId nodeId, std::list<CRecoveredSig>& ns) {
std::unordered_set<std::pair<NodeId, uint256>, StaticSaltedHasher> uniqueSignHashes;
llmq::utils::IterateNodesRandom(pendingRecoveredSigs, [&]() { return uniqueSignHashes.size() < maxUniqueSessions; }, [&](NodeId nodeId, std::list<CRecoveredSig>& ns) {
if (ns.empty()) {
return false;
}
Expand Down Expand Up @@ -419,8 +437,8 @@ void CSigningManager::CollectPendingRecoveredSigsToVerify(

bool CSigningManager::ProcessPendingRecoveredSigs(CConnman& connman)
{
std::map<NodeId, std::list<CRecoveredSig>> recSigsByNode;
std::map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr> quorums;
std::unordered_map<NodeId, std::list<CRecoveredSig>> recSigsByNode;
std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher> quorums;

CollectPendingRecoveredSigsToVerify(32, recSigsByNode, quorums);
if (recSigsByNode.empty()) {
Expand All @@ -443,13 +461,13 @@ bool CSigningManager::ProcessPendingRecoveredSigs(CConnman& connman)
}
}

cxxtimer::Timer verifyTimer;
cxxtimer::Timer verifyTimer(true);
batchVerifier.Verify();
verifyTimer.stop();

LogPrintf("llmq", "CSigningManager::%s -- verified recovered sig(s). count=%d, vt=%d, nodes=%d\n", __func__, verifyCount, verifyTimer.count(), recSigsByNode.size());

std::set<uint256> processed;
std::unordered_set<uint256, StaticSaltedHasher> processed;
for (auto& p : recSigsByNode) {
NodeId nodeId = p.first;
auto& v = p.second;
Expand Down Expand Up @@ -494,11 +512,25 @@ void CSigningManager::ProcessRecoveredSig(NodeId nodeId, const CRecoveredSig& re
signHash.ToString(), recoveredSig.id.ToString(), recoveredSig.msgHash.ToString(), nodeId);

if (db.HasRecoveredSigForId(llmqType, recoveredSig.id)) {
// this should really not happen, as each masternode is participating in only one vote,
// even if it's a member of multiple quorums. so a majority is only possible on one quorum and one msgHash per id
LogPrintf("CSigningManager::%s -- conflicting recoveredSig for id=%s, msgHash=%s\n", __func__,
recoveredSig.id.ToString(), recoveredSig.msgHash.ToString());
return;
CRecoveredSig otherRecoveredSig;
if (db.GetRecoveredSigById(llmqType, recoveredSig.id, otherRecoveredSig)) {
auto otherSignHash = llmq::utils::BuildSignHash(recoveredSig);
if (signHash != otherSignHash) {
// this should really not happen, as each masternode is participating in only one vote,
// even if it's a member of multiple quorums. so a majority is only possible on one quorum and one msgHash per id
LogPrintf("CSigningManager::%s -- conflicting recoveredSig for signHash=%s, id=%s, msgHash=%s, otherSignHash=%s\n", __func__,
signHash.ToString(), recoveredSig.id.ToString(), recoveredSig.msgHash.ToString(), otherSignHash.ToString());
} else {
// Looks like we're trying to process a recSig that is already known. This might happen if the same
// recSig comes in through regular QRECSIG messages and at the same time through some other message
// which allowed to reconstruct a recSig (e.g. IXLOCK). In this case, just bail out.
}
return;
} else {
// This case is very unlikely. It can only happen when cleanup caused this specific recSig to vanish
// between the HasRecoveredSigForId and GetRecoveredSigById call. If that happens, treat it as if we
// never had that recSig
}
}

db.WriteRecoveredSig(recoveredSig);
Expand Down Expand Up @@ -552,14 +584,19 @@ bool CSigningManager::AsyncSignIfMember(Consensus::LLMQType llmqType, const uint
if (db.HasVotedOnId(llmqType, id)) {
uint256 prevMsgHash;
db.GetVoteForId(llmqType, id, prevMsgHash);
LogPrintf("CSigningManager::%s -- already voted for id=%s and msgHash=%s. Not voting on conflicting msgHash=%s\n", __func__,
id.ToString(), prevMsgHash.ToString(), msgHash.ToString());
if (msgHash != prevMsgHash) {
LogPrintf("CSigningManager::%s -- already voted for id=%s and msgHash=%s. Not voting on conflicting msgHash=%s\n", __func__,
id.ToString(), prevMsgHash.ToString(), msgHash.ToString());
} else {
LogPrintf("CSigningManager::%s -- already voted for id=%s and msgHash=%s. Not voting again.\n", __func__,
id.ToString(), prevMsgHash.ToString());
}
return false;
}

if (db.HasRecoveredSigForId(llmqType, id)) {
// no need to sign it if we already have a recovered sig
return false;
return true;
}
db.WriteVoteForId(llmqType, id, msgHash);
}
Expand Down
24 changes: 8 additions & 16 deletions src/llmq/quorums_signing.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,11 @@

#include "chainparams.h"
#include "net.h"
#include "saltedhasher.h"
#include "sync.h"

#include <unordered_map>

namespace std {
template <>
struct hash<std::pair<Consensus::LLMQType, uint256>>
{
std::size_t operator()(const std::pair<Consensus::LLMQType, uint256>& k) const
{
return (std::size_t)((k.first + 1) * k.second.GetCheapHash());
}
};
}

namespace llmq
{

Expand Down Expand Up @@ -77,7 +67,6 @@ class CRecoveredSig
}
};

// TODO implement caching to speed things up
class CRecoveredSigsDb
{
static const size_t MAX_CACHE_SIZE = 30000;
Expand All @@ -87,8 +76,9 @@ class CRecoveredSigsDb
CDBWrapper db;

RecursiveMutex cs;
std::unordered_map<std::pair<Consensus::LLMQType, uint256>, std::pair<bool, int64_t>> hasSigForIdCache;
std::unordered_map<uint256, std::pair<bool, int64_t>> hasSigForSessionCache;
std::unordered_map<std::pair<Consensus::LLMQType, uint256>, std::pair<bool, int64_t>, StaticSaltedHasher> hasSigForIdCache;
std::unordered_map<uint256, std::pair<bool, int64_t>, StaticSaltedHasher> hasSigForSessionCache;
std::unordered_map<uint256, std::pair<bool, int64_t>, StaticSaltedHasher> hasSigForHashCache;

public:
CRecoveredSigsDb(bool fMemory);
Expand Down Expand Up @@ -136,7 +126,7 @@ class CSigningManager
CRecoveredSigsDb db;

// Incoming and not verified yet
std::map<NodeId, std::list<CRecoveredSig>> pendingRecoveredSigs;
std::unordered_map<NodeId, std::list<CRecoveredSig>> pendingRecoveredSigs;

// must be protected by cs
FastRandomContext rnd;
Expand All @@ -157,7 +147,9 @@ class CSigningManager
void ProcessMessageRecoveredSig(CNode* pfrom, const CRecoveredSig& recoveredSig, CConnman& connman);
bool PreVerifyRecoveredSig(NodeId nodeId, const CRecoveredSig& recoveredSig, bool& retBan);

void CollectPendingRecoveredSigsToVerify(size_t maxUniqueSessions, std::map<NodeId, std::list<CRecoveredSig>>& retSigShares, std::map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr>& retQuorums);
void CollectPendingRecoveredSigsToVerify(size_t maxUniqueSessions,
std::unordered_map<NodeId, std::list<CRecoveredSig>>& retSigShares,
std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher>& retQuorums);
bool ProcessPendingRecoveredSigs(CConnman& connman); // called from the worker thread of CSigSharesManager
void ProcessRecoveredSig(NodeId nodeId, const CRecoveredSig& recoveredSig, const CQuorumCPtr& quorum, CConnman& connman);
void Cleanup(); // called from the worker thread of CSigSharesManager
Expand Down
Loading
Loading