Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: significant Mutex refactoring #5954

Merged
1 commit merged into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions src/llmq/debug.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,13 +138,13 @@ UniValue CDKGDebugStatus::ToJson(CDeterministicMNManager& dmnman, const Chainsta

void CDKGDebugManager::GetLocalDebugStatus(llmq::CDKGDebugStatus& ret) const
{
LOCK(cs);
LOCK(cs_lockStatus);
ret = localStatus;
}

void CDKGDebugManager::ResetLocalSessionStatus(Consensus::LLMQType llmqType, int quorumIndex)
{
LOCK(cs);
LOCK(cs_lockStatus);

auto it = localStatus.sessions.find(std::make_pair(llmqType, quorumIndex));
if (it == localStatus.sessions.end()) {
Expand All @@ -157,7 +157,7 @@ void CDKGDebugManager::ResetLocalSessionStatus(Consensus::LLMQType llmqType, int

void CDKGDebugManager::InitLocalSessionStatus(const Consensus::LLMQParams& llmqParams, int quorumIndex, const uint256& quorumHash, int quorumHeight)
{
LOCK(cs);
LOCK(cs_lockStatus);

auto it = localStatus.sessions.find(std::make_pair(llmqParams.type, quorumIndex));
if (it == localStatus.sessions.end()) {
Expand All @@ -176,7 +176,7 @@ void CDKGDebugManager::InitLocalSessionStatus(const Consensus::LLMQParams& llmqP

void CDKGDebugManager::UpdateLocalSessionStatus(Consensus::LLMQType llmqType, int quorumIndex, std::function<bool(CDKGDebugSessionStatus& status)>&& func)
{
LOCK(cs);
LOCK(cs_lockStatus);

auto it = localStatus.sessions.find(std::make_pair(llmqType, quorumIndex));
if (it == localStatus.sessions.end()) {
Expand All @@ -190,7 +190,7 @@ void CDKGDebugManager::UpdateLocalSessionStatus(Consensus::LLMQType llmqType, in

void CDKGDebugManager::UpdateLocalMemberStatus(Consensus::LLMQType llmqType, int quorumIndex, size_t memberIdx, std::function<bool(CDKGDebugMemberStatus& status)>&& func)
{
LOCK(cs);
LOCK(cs_lockStatus);

auto it = localStatus.sessions.find(std::make_pair(llmqType, quorumIndex));
if (it == localStatus.sessions.end()) {
Expand Down
4 changes: 2 additions & 2 deletions src/llmq/debug.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ class CDKGDebugStatus
class CDKGDebugManager
{
private:
mutable RecursiveMutex cs;
CDKGDebugStatus localStatus GUARDED_BY(cs);
mutable Mutex cs_lockStatus;
CDKGDebugStatus localStatus GUARDED_BY(cs_lockStatus);

public:
CDKGDebugManager();
Expand Down
8 changes: 1 addition & 7 deletions src/llmq/dkgsession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,6 @@ bool CDKGSession::PreVerifyMessage(const CDKGContribution& qc, bool& retBan) con

void CDKGSession::ReceiveMessage(const CDKGContribution& qc, bool& retBan)
{
LOCK(cs_pending);

CDKGLogger logger(*this, __func__, __LINE__);

retBan = false;
Expand Down Expand Up @@ -336,15 +334,11 @@ void CDKGSession::ReceiveMessage(const CDKGContribution& qc, bool& retBan)

logger.Batch("decrypted our contribution share. time=%d", t2.count());

bool verifyPending = false;
receivedSkContributions[member->idx] = skContribution;
vecEncryptedContributions[member->idx] = qc.contributions;
LOCK(cs_pending);
pendingContributionVerifications.emplace_back(member->idx);
if (pendingContributionVerifications.size() >= 32) {
verifyPending = true;
}

if (verifyPending) {
VerifyPendingContributions();
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/llmq/dkgsession.h
Original file line number Diff line number Diff line change
Expand Up @@ -308,13 +308,13 @@ class CDKGSession
// we expect to only receive a single vvec and contribution per member, but we must also be able to relay
// conflicting messages as otherwise an attacker might be able to broadcast conflicting (valid+invalid) messages
// and thus split the quorum. Such members are later removed from the quorum.
mutable RecursiveMutex invCs;
mutable Mutex invCs;
std::map<uint256, CDKGContribution> contributions GUARDED_BY(invCs);
std::map<uint256, CDKGComplaint> complaints GUARDED_BY(invCs);
std::map<uint256, CDKGJustification> justifications GUARDED_BY(invCs);
std::map<uint256, CDKGPrematureCommitment> prematureCommitments GUARDED_BY(invCs);

mutable RecursiveMutex cs_pending;
mutable Mutex cs_pending;
std::vector<size_t> pendingContributionVerifications GUARDED_BY(cs_pending);

// filled by ReceivePrematureCommitment and used by FinalizeCommitments
Expand Down
60 changes: 26 additions & 34 deletions src/llmq/dkgsessionhandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ void CDKGPendingMessages::PushPendingMessage(NodeId from, PeerManager* peerman,
EraseObjectRequest(from, CInv(invType, hash));
}

LOCK(cs);
LOCK(cs_messages);

if (messagesPerNode[from] >= maxMessagesPerNode) {
// TODO ban?
Expand All @@ -95,7 +95,7 @@ void CDKGPendingMessages::PushPendingMessage(NodeId from, PeerManager* peerman,

std::list<CDKGPendingMessages::BinaryMessage> CDKGPendingMessages::PopPendingMessages(size_t maxCount)
{
LOCK(cs);
LOCK(cs_messages);

std::list<BinaryMessage> ret;
while (!pendingMessages.empty() && ret.size() < maxCount) {
Expand All @@ -108,7 +108,7 @@ std::list<CDKGPendingMessages::BinaryMessage> CDKGPendingMessages::PopPendingMes

bool CDKGPendingMessages::HasSeen(const uint256& hash) const
{
LOCK(cs);
LOCK(cs_messages);
return seenMessages.count(hash) != 0;
}

Expand All @@ -120,7 +120,7 @@ void CDKGPendingMessages::Misbehaving(const NodeId from, const int score)

void CDKGPendingMessages::Clear()
{
LOCK(cs);
LOCK(cs_messages);
pendingMessages.clear();
messagesPerNode.clear();
seenMessages.clear();
Expand All @@ -135,7 +135,7 @@ void CDKGSessionHandler::UpdatedBlockTip(const CBlockIndex* pindexNew)
if (quorumIndex > 0 && !IsQuorumRotationEnabled(params, pindexNew)) {
return;
}
LOCK(cs);
LOCK(cs_phase_qhash);

int quorumStageInt = (pindexNew->nHeight - quorumIndex) % params.dkgInterval;

Expand Down Expand Up @@ -207,7 +207,7 @@ bool CDKGSessionHandler::InitNewQuorum(const CBlockIndex* pQuorumBaseBlockIndex)

std::pair<QuorumPhase, uint256> CDKGSessionHandler::GetPhaseAndQuorumHash() const
{
LOCK(cs);
LOCK(cs_phase_qhash);
return std::make_pair(phase, quorumHash);
}

Expand Down Expand Up @@ -304,9 +304,8 @@ void CDKGSessionHandler::SleepBeforePhase(QuorumPhase curPhase,

int64_t sleepTime = (int64_t)(adjustedPhaseSleepTimePerMember * curSession->GetMyMemberIndex().value_or(0));
int64_t endTime = GetTimeMillis() + sleepTime;
int heightTmp{-1};
int heightStart{-1};
heightTmp = heightStart = WITH_LOCK(cs, return currentHeight);
int heightTmp{currentHeight.load()};
int heightStart{heightTmp};

LogPrint(BCLog::LLMQ_DKG, "CDKGSessionManager::%s -- %s qi[%d] - starting sleep for %d ms, curPhase=%d\n", __func__, params.name, quorumIndex, sleepTime, ToUnderlying(curPhase));

Expand All @@ -315,22 +314,20 @@ void CDKGSessionHandler::SleepBeforePhase(QuorumPhase curPhase,
LogPrint(BCLog::LLMQ_DKG, "CDKGSessionManager::%s -- %s qi[%d] - aborting due to stop/shutdown requested\n", __func__, params.name, quorumIndex);
throw AbortPhaseException();
}
{
LOCK(cs);
if (currentHeight > heightTmp) {
// New block(s) just came in
int64_t expectedBlockTime = (currentHeight - heightStart) * Params().GetConsensus().nPowTargetSpacing * 1000;
if (expectedBlockTime > sleepTime) {
// Blocks came faster than we expected, jump into the phase func asap
break;
}
heightTmp = currentHeight;
}
if (phase != curPhase || quorumHash != expectedQuorumHash) {
// Something went wrong and/or we missed quite a few blocks and it's just too late now
LogPrint(BCLog::LLMQ_DKG, "CDKGSessionManager::%s -- %s qi[%d] - aborting due unexpected phase/expectedQuorumHash change\n", __func__, params.name, quorumIndex);
throw AbortPhaseException();
auto cur_height = currentHeight.load();
if (cur_height > heightTmp) {
// New block(s) just came in
int64_t expectedBlockTime = (cur_height - heightStart) * Params().GetConsensus().nPowTargetSpacing * 1000;
if (expectedBlockTime > sleepTime) {
// Blocks came faster than we expected, jump into the phase func asap
break;
}
heightTmp = cur_height;
}
if (WITH_LOCK(cs_phase_qhash, return phase != curPhase || quorumHash != expectedQuorumHash)) {
// Something went wrong and/or we missed quite a few blocks and it's just too late now
LogPrint(BCLog::LLMQ_DKG, "CDKGSessionManager::%s -- %s qi[%d] - aborting due unexpected phase/expectedQuorumHash change\n", __func__, params.name, quorumIndex);
throw AbortPhaseException();
}
if (!runWhileWaiting()) {
UninterruptibleSleep(std::chrono::milliseconds{100});
Expand Down Expand Up @@ -505,18 +502,13 @@ bool ProcessPendingMessageBatch(CDKGSession& session, CDKGPendingMessages& pendi

void CDKGSessionHandler::HandleDKGRound()
{
uint256 curQuorumHash;

WaitForNextPhase(std::nullopt, QuorumPhase::Initialized);

{
LOCK(cs);
pendingContributions.Clear();
pendingComplaints.Clear();
pendingJustifications.Clear();
pendingPrematureCommitments.Clear();
curQuorumHash = quorumHash;
}
pendingContributions.Clear();
knst marked this conversation as resolved.
Show resolved Hide resolved
pendingComplaints.Clear();
pendingJustifications.Clear();
pendingPrematureCommitments.Clear();
uint256 curQuorumHash = WITH_LOCK(cs_phase_qhash, return quorumHash);

const CBlockIndex* pQuorumBaseBlockIndex = WITH_LOCK(cs_main, return m_chainstate.m_blockman.LookupBlockIndex(curQuorumHash));

Expand Down
18 changes: 9 additions & 9 deletions src/llmq/dkgsessionhandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,13 @@ class CDKGPendingMessages
using BinaryMessage = std::pair<NodeId, std::shared_ptr<CDataStream>>;

private:
mutable RecursiveMutex cs;
std::atomic<PeerManager*> m_peerman{nullptr};
const int invType;
size_t maxMessagesPerNode GUARDED_BY(cs);
std::list<BinaryMessage> pendingMessages GUARDED_BY(cs);
std::map<NodeId, size_t> messagesPerNode GUARDED_BY(cs);
std::set<uint256> seenMessages GUARDED_BY(cs);
const size_t maxMessagesPerNode;
mutable Mutex cs_messages;
std::list<BinaryMessage> pendingMessages GUARDED_BY(cs_messages);
std::map<NodeId, size_t> messagesPerNode GUARDED_BY(cs_messages);
std::set<uint256> seenMessages GUARDED_BY(cs_messages);

public:
explicit CDKGPendingMessages(size_t _maxMessagesPerNode, int _invType) :
Expand Down Expand Up @@ -117,7 +117,6 @@ class CDKGSessionHandler
friend class CDKGSessionManager;

private:
mutable RecursiveMutex cs;
std::atomic<bool> stopRequested{false};

CBLSWorker& blsWorker;
Expand All @@ -134,9 +133,10 @@ class CDKGSessionHandler
const Consensus::LLMQParams params;
const int quorumIndex;

QuorumPhase phase GUARDED_BY(cs) {QuorumPhase::Idle};
int currentHeight GUARDED_BY(cs) {-1};
uint256 quorumHash GUARDED_BY(cs);
std::atomic<int> currentHeight {-1};
mutable Mutex cs_phase_qhash;
QuorumPhase phase GUARDED_BY(cs_phase_qhash) {QuorumPhase::Idle};
uint256 quorumHash GUARDED_BY(cs_phase_qhash);

std::unique_ptr<CDKGSession> curSession;
std::thread phaseHandlerThread;
Expand Down
8 changes: 4 additions & 4 deletions src/llmq/dkgsessionmgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ bool CDKGSessionManager::GetContribution(const uint256& hash, CDKGContribution&

for (const auto& p : dkgSessionHandlers) {
const auto& dkgType = p.second;
LOCK(dkgType.cs);
LOCK(dkgType.cs_phase_qhash);
if (dkgType.phase < QuorumPhase::Initialized || dkgType.phase > QuorumPhase::Contribute) {
continue;
}
Expand All @@ -314,7 +314,7 @@ bool CDKGSessionManager::GetComplaint(const uint256& hash, CDKGComplaint& ret) c

for (const auto& p : dkgSessionHandlers) {
const auto& dkgType = p.second;
LOCK(dkgType.cs);
LOCK(dkgType.cs_phase_qhash);
if (dkgType.phase < QuorumPhase::Contribute || dkgType.phase > QuorumPhase::Complain) {
continue;
}
Expand All @@ -335,7 +335,7 @@ bool CDKGSessionManager::GetJustification(const uint256& hash, CDKGJustification

for (const auto& p : dkgSessionHandlers) {
const auto& dkgType = p.second;
LOCK(dkgType.cs);
LOCK(dkgType.cs_phase_qhash);
if (dkgType.phase < QuorumPhase::Complain || dkgType.phase > QuorumPhase::Justify) {
continue;
}
Expand All @@ -356,7 +356,7 @@ bool CDKGSessionManager::GetPrematureCommitment(const uint256& hash, CDKGPrematu

for (const auto& p : dkgSessionHandlers) {
const auto& dkgType = p.second;
LOCK(dkgType.cs);
LOCK(dkgType.cs_phase_qhash);
if (dkgType.phase < QuorumPhase::Justify || dkgType.phase > QuorumPhase::Commit) {
continue;
}
Expand Down
Loading
Loading