Skip to content

Commit

Permalink
merge bitcoin#21160: Move tx inventory into net_processing
Browse files Browse the repository at this point in the history
  • Loading branch information
kwvg committed Apr 26, 2024
1 parent 24205d9 commit 33098ae
Show file tree
Hide file tree
Showing 25 changed files with 270 additions and 235 deletions.
13 changes: 7 additions & 6 deletions src/governance/governance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <masternode/meta.h>
#include <masternode/node.h>
#include <masternode/sync.h>
#include <net_processing.h>
#include <netfulfilledman.h>
#include <netmessagemaker.h>
#include <protocol.h>
Expand Down Expand Up @@ -141,9 +142,9 @@ PeerMsgRet CGovernanceManager::ProcessMessage(CNode& peer, CConnman& connman, Pe

LogPrint(BCLog::GOBJECT, "MNGOVERNANCESYNC -- syncing governance objects to our peer %s\n", peer.GetLogString());
if (nProp == uint256()) {
return SyncObjects(peer, connman);
return SyncObjects(peer, peerman, connman);
} else {
SyncSingleObjVotes(peer, nProp, filter, connman);
SyncSingleObjVotes(peer, peerman, nProp, filter, connman);
}
}

Expand Down Expand Up @@ -858,7 +859,7 @@ bool CGovernanceManager::ConfirmInventoryRequest(const CInv& inv)
return true;
}

void CGovernanceManager::SyncSingleObjVotes(CNode& peer, const uint256& nProp, const CBloomFilter& filter, CConnman& connman)
void CGovernanceManager::SyncSingleObjVotes(CNode& peer, PeerManager& peerman, const uint256& nProp, const CBloomFilter& filter, CConnman& connman)
{
// do not provide any data until our node is synced
if (!Assert(m_mn_sync)->IsSynced()) return;
Expand Down Expand Up @@ -899,7 +900,7 @@ void CGovernanceManager::SyncSingleObjVotes(CNode& peer, const uint256& nProp, c
if (filter.contains(nVoteHash) || !vote.IsValid(tip_mn_list, onlyVotingKeyAllowed)) {
continue;
}
peer.PushInventory(CInv(MSG_GOVERNANCE_OBJECT_VOTE, nVoteHash));
peerman.PushInventory(peer.GetId(), CInv(MSG_GOVERNANCE_OBJECT_VOTE, nVoteHash));
++nVoteCount;
}

Expand All @@ -908,7 +909,7 @@ void CGovernanceManager::SyncSingleObjVotes(CNode& peer, const uint256& nProp, c
LogPrint(BCLog::GOBJECT, "CGovernanceManager::%s -- sent %d votes to peer=%d\n", __func__, nVoteCount, peer.GetId());
}

PeerMsgRet CGovernanceManager::SyncObjects(CNode& peer, CConnman& connman) const
PeerMsgRet CGovernanceManager::SyncObjects(CNode& peer, PeerManager& peerman, CConnman& connman) const
{
assert(m_netfulfilledman.IsValid());

Expand Down Expand Up @@ -959,7 +960,7 @@ PeerMsgRet CGovernanceManager::SyncObjects(CNode& peer, CConnman& connman) const

// Push the inventory budget proposal message over to the other client
LogPrint(BCLog::GOBJECT, "CGovernanceManager::%s -- syncing govobj: %s, peer=%d\n", __func__, strHash, peer.GetId());
peer.PushInventory(CInv(MSG_GOVERNANCE_OBJECT, nHash));
peerman.PushInventory(peer.GetId(), CInv(MSG_GOVERNANCE_OBJECT, nHash));
++nObjCount;
}

Expand Down
4 changes: 2 additions & 2 deletions src/governance/governance.h
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,8 @@ class CGovernanceManager : public GovernanceStore
*/
bool ConfirmInventoryRequest(const CInv& inv);

void SyncSingleObjVotes(CNode& peer, const uint256& nProp, const CBloomFilter& filter, CConnman& connman);
PeerMsgRet SyncObjects(CNode& peer, CConnman& connman) const;
void SyncSingleObjVotes(CNode& peer, PeerManager& peerman, const uint256& nProp, const CBloomFilter& filter, CConnman& connman);
PeerMsgRet SyncObjects(CNode& peer, PeerManager& peerman, CConnman& connman) const;

PeerMsgRet ProcessMessage(CNode& peer, CConnman& connman, PeerManager& peerman, std::string_view msg_type, CDataStream& vRecv);

Expand Down
4 changes: 2 additions & 2 deletions src/llmq/context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ LLMQContext::LLMQContext(CChainState& chainstate, CConnman& connman, CDeterminis
llmq::quorumBlockProcessor = std::make_unique<llmq::CQuorumBlockProcessor>(chainstate, dmnman, evo_db, peerman);
return llmq::quorumBlockProcessor.get();
}()},
qdkgsman{std::make_unique<llmq::CDKGSessionManager>(*bls_worker, chainstate, connman, dmnman, *dkg_debugman, mn_metaman, *quorum_block_processor, mn_activeman, sporkman, unit_tests, wipe)},
qdkgsman{std::make_unique<llmq::CDKGSessionManager>(*bls_worker, chainstate, connman, dmnman, *dkg_debugman, mn_metaman, *quorum_block_processor, mn_activeman, sporkman, peerman, unit_tests, wipe)},
qman{[&]() -> llmq::CQuorumManager* const {
assert(llmq::quorumManager == nullptr);
llmq::quorumManager = std::make_unique<llmq::CQuorumManager>(*bls_worker, chainstate, connman, dmnman, *qdkgsman, evo_db, *quorum_block_processor, mn_activeman, mn_sync, sporkman);
return llmq::quorumManager.get();
}()},
sigman{std::make_unique<llmq::CSigningManager>(connman, mn_activeman, *llmq::quorumManager, unit_tests, wipe)},
sigman{std::make_unique<llmq::CSigningManager>(connman, mn_activeman, *llmq::quorumManager, peerman, unit_tests, wipe)},
shareman{std::make_unique<llmq::CSigSharesManager>(connman, *sigman, mn_activeman, *llmq::quorumManager, sporkman, peerman)},
clhandler{[&]() -> llmq::CChainLocksHandler* const {
assert(llmq::chainLocksHandler == nullptr);
Expand Down
5 changes: 3 additions & 2 deletions src/llmq/dkgsession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <logging.h>
#include <masternode/meta.h>
#include <masternode/node.h>
#include <net_processing.h>
#include <netmessagemaker.h>
#include <validation.h>
#include <util/irange.h>
Expand Down Expand Up @@ -1326,10 +1327,10 @@ void CDKGSession::RelayInvToParticipants(const CInv& inv) const
myProTxHash.ToString().substr(0, 4), ss.str());

std::stringstream ss2;
connman.ForEachNode([&](CNode* pnode) {
connman.ForEachNode([&](const CNode* pnode) {
if (pnode->qwatch ||
(!pnode->GetVerifiedProRegTxHash().IsNull() && (relayMembers.count(pnode->GetVerifiedProRegTxHash()) != 0))) {
pnode->PushInventory(inv);
Assert(m_peerman)->PushInventory(pnode->GetId(), inv);
}

if (pnode->GetVerifiedProRegTxHash().IsNull()) {
Expand Down
7 changes: 5 additions & 2 deletions src/llmq/dkgsession.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class CDeterministicMN;
class CMasternodeMetaMan;
class CSporkManager;
class UniValue;
class PeerManager;

using CDeterministicMNCPtr = std::shared_ptr<const CDeterministicMN>;

Expand Down Expand Up @@ -280,6 +281,7 @@ class CDKGSession
CMasternodeMetaMan& m_mn_metaman;
const CActiveMasternodeManager* const m_mn_activeman;
const CSporkManager& m_sporkman;
const std::unique_ptr<PeerManager>& m_peerman;

const CBlockIndex* m_quorum_base_block_index{nullptr};
int quorumIndex{0};
Expand Down Expand Up @@ -322,9 +324,10 @@ class CDKGSession
CDKGSession(const Consensus::LLMQParams& _params, CBLSWorker& _blsWorker, CConnman& _connman,
CDeterministicMNManager& dmnman, CDKGSessionManager& _dkgManager, CDKGDebugManager& _dkgDebugManager,
CMasternodeMetaMan& mn_metaman, const CActiveMasternodeManager* const mn_activeman,
const CSporkManager& sporkman) :
const CSporkManager& sporkman, const std::unique_ptr<PeerManager>& peerman) :
params(_params), blsWorker(_blsWorker), cache(_blsWorker), connman(_connman), m_dmnman(dmnman), dkgManager(_dkgManager),
dkgDebugManager(_dkgDebugManager), m_mn_metaman(mn_metaman), m_mn_activeman(mn_activeman), m_sporkman(sporkman) {}
dkgDebugManager(_dkgDebugManager), m_mn_metaman(mn_metaman), m_mn_activeman(mn_activeman), m_sporkman(sporkman),
m_peerman(peerman) {}

bool Init(gsl::not_null<const CBlockIndex*> pQuorumBaseBlockIndex, Span<CDeterministicMNCPtr> mns, const uint256& _myProTxHash, int _quorumIndex);

Expand Down
7 changes: 4 additions & 3 deletions src/llmq/dkgsessionhandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ namespace llmq
CDKGSessionHandler::CDKGSessionHandler(CBLSWorker& _blsWorker, CChainState& chainstate, CConnman& _connman, CDeterministicMNManager& dmnman,
CDKGDebugManager& _dkgDebugManager, CDKGSessionManager& _dkgManager, CMasternodeMetaMan& mn_metaman,
CQuorumBlockProcessor& _quorumBlockProcessor, const CActiveMasternodeManager* const mn_activeman,
const CSporkManager& sporkman, const Consensus::LLMQParams& _params, int _quorumIndex) :
const CSporkManager& sporkman, const std::unique_ptr<PeerManager>& peerman, const Consensus::LLMQParams& _params, int _quorumIndex) :
blsWorker(_blsWorker),
m_chainstate(chainstate),
connman(_connman),
Expand All @@ -38,9 +38,10 @@ CDKGSessionHandler::CDKGSessionHandler(CBLSWorker& _blsWorker, CChainState& chai
quorumBlockProcessor(_quorumBlockProcessor),
m_mn_activeman(mn_activeman),
m_sporkman(sporkman),
m_peerman(peerman),
params(_params),
quorumIndex(_quorumIndex),
curSession(std::make_unique<CDKGSession>(_params, _blsWorker, _connman, dmnman, _dkgManager, _dkgDebugManager, m_mn_metaman, m_mn_activeman, sporkman)),
curSession(std::make_unique<CDKGSession>(_params, _blsWorker, _connman, dmnman, _dkgManager, _dkgDebugManager, m_mn_metaman, m_mn_activeman, sporkman, peerman)),
pendingContributions((size_t)_params.size * 2, MSG_QUORUM_CONTRIB), // we allow size*2 messages as we need to make sure we see bad behavior (double messages)
pendingComplaints((size_t)_params.size * 2, MSG_QUORUM_COMPLAINT),
pendingJustifications((size_t)_params.size * 2, MSG_QUORUM_JUSTIFICATION),
Expand Down Expand Up @@ -188,7 +189,7 @@ void CDKGSessionHandler::StopThread()

bool CDKGSessionHandler::InitNewQuorum(const CBlockIndex* pQuorumBaseBlockIndex)
{
curSession = std::make_unique<CDKGSession>(params, blsWorker, connman, m_dmnman, dkgManager, dkgDebugManager, m_mn_metaman, m_mn_activeman, m_sporkman);
curSession = std::make_unique<CDKGSession>(params, blsWorker, connman, m_dmnman, dkgManager, dkgDebugManager, m_mn_metaman, m_mn_activeman, m_sporkman, m_peerman);

if (!DeploymentDIP0003Enforced(pQuorumBaseBlockIndex->nHeight, Params().GetConsensus())) {
return false;
Expand Down
3 changes: 2 additions & 1 deletion src/llmq/dkgsessionhandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ class CDKGSessionHandler
CQuorumBlockProcessor& quorumBlockProcessor;
const CActiveMasternodeManager* const m_mn_activeman;
const CSporkManager& m_sporkman;
const std::unique_ptr<PeerManager>& m_peerman;
const Consensus::LLMQParams params;
const int quorumIndex;

Expand All @@ -151,7 +152,7 @@ class CDKGSessionHandler
CDKGSessionHandler(CBLSWorker& _blsWorker, CChainState& chainstate, CConnman& _connman, CDeterministicMNManager& dmnman,
CDKGDebugManager& _dkgDebugManager, CDKGSessionManager& _dkgManager, CMasternodeMetaMan& mn_metaman,
CQuorumBlockProcessor& _quorumBlockProcessor, const CActiveMasternodeManager* const mn_activeman,
const CSporkManager& sporkman, const Consensus::LLMQParams& _params, int _quorumIndex);
const CSporkManager& sporkman, const std::unique_ptr<PeerManager>& peerman, const Consensus::LLMQParams& _params, int _quorumIndex);
~CDKGSessionHandler() = default;

void UpdatedBlockTip(const CBlockIndex *pindexNew);
Expand Down
6 changes: 4 additions & 2 deletions src/llmq/dkgsessionmgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ static const std::string DB_ENC_CONTRIB = "qdkg_E";

CDKGSessionManager::CDKGSessionManager(CBLSWorker& _blsWorker, CChainState& chainstate, CConnman& _connman, CDeterministicMNManager& dmnman,
CDKGDebugManager& _dkgDebugManager, CMasternodeMetaMan& mn_metaman, CQuorumBlockProcessor& _quorumBlockProcessor,
const CActiveMasternodeManager* const mn_activeman, const CSporkManager& sporkman, bool unitTests, bool fWipe) :
const CActiveMasternodeManager* const mn_activeman, const CSporkManager& sporkman,
const std::unique_ptr<PeerManager>& peerman, bool unitTests, bool fWipe) :
db(std::make_unique<CDBWrapper>(unitTests ? "" : (GetDataDir() / "llmq/dkgdb"), 1 << 20, unitTests, fWipe)),
blsWorker(_blsWorker),
m_chainstate(chainstate),
Expand All @@ -49,7 +50,8 @@ CDKGSessionManager::CDKGSessionManager(CBLSWorker& _blsWorker, CChainState& chai
for (const auto i : irange::range(session_count)) {
dkgSessionHandlers.emplace(std::piecewise_construct,
std::forward_as_tuple(params.type, i),
std::forward_as_tuple(blsWorker, m_chainstate, connman, dmnman, dkgDebugManager, *this, mn_metaman, quorumBlockProcessor, mn_activeman, spork_manager, params, i));
std::forward_as_tuple(blsWorker, m_chainstate, connman, dmnman, dkgDebugManager, *this, mn_metaman,
quorumBlockProcessor, mn_activeman, spork_manager, peerman, params, i));
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/llmq/dkgsessionmgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ class CDKGSessionManager
public:
CDKGSessionManager(CBLSWorker& _blsWorker, CChainState& chainstate, CConnman& _connman, CDeterministicMNManager& dmnman,
CDKGDebugManager& _dkgDebugManager, CMasternodeMetaMan& mn_metaman, CQuorumBlockProcessor& _quorumBlockProcessor,
const CActiveMasternodeManager* const mn_activeman, const CSporkManager& sporkman, bool unitTests, bool fWipe);
const CActiveMasternodeManager* const mn_activeman, const CSporkManager& sporkman,
const std::unique_ptr<PeerManager>& peerman, bool unitTests, bool fWipe);
~CDKGSessionManager() = default;

void StartThreads();
Expand Down
9 changes: 3 additions & 6 deletions src/llmq/instantsend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1452,12 +1452,9 @@ void CInstantSendManager::AskNodesForLockedTx(const uint256& txid, const CConnma
if (nodesToAskFor.size() >= 4) {
return;
}
if (peerman.CanRelayAddrs(pnode->GetId())) {
LOCK(pnode->m_tx_relay->cs_tx_inventory);
if (pnode->m_tx_relay->filterInventoryKnown.contains(txid)) {
pnode->AddRef();
nodesToAskFor.emplace_back(pnode);
}
if (peerman.IsInvInFilter(pnode->GetId(), txid)) {
pnode->AddRef();
nodesToAskFor.emplace_back(pnode);
}
};

Expand Down
22 changes: 8 additions & 14 deletions src/llmq/signing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -540,8 +540,8 @@ void CRecoveredSigsDb::CleanupOldVotes(int64_t maxAge)
//////////////////

CSigningManager::CSigningManager(CConnman& _connman, const CActiveMasternodeManager* const mn_activeman, const CQuorumManager& _qman,
bool fMemory, bool fWipe) :
db(fMemory, fWipe), connman(_connman), m_mn_activeman(mn_activeman), qman(_qman)
const std::unique_ptr<PeerManager>& peerman, bool fMemory, bool fWipe) :
db(fMemory, fWipe), connman(_connman), m_mn_activeman(mn_activeman), qman(_qman), m_peerman(peerman)
{
}

Expand Down Expand Up @@ -572,18 +572,18 @@ bool CSigningManager::GetRecoveredSigForGetData(const uint256& hash, CRecoveredS
return true;
}

PeerMsgRet CSigningManager::ProcessMessage(const CNode& pfrom, gsl::not_null<PeerManager*> peerman, const std::string& msg_type, CDataStream& vRecv)
PeerMsgRet CSigningManager::ProcessMessage(const CNode& pfrom, const std::string& msg_type, CDataStream& vRecv)
{
if (msg_type == NetMsgType::QSIGREC) {
auto recoveredSig = std::make_shared<CRecoveredSig>();
vRecv >> *recoveredSig;

return ProcessMessageRecoveredSig(pfrom, peerman, recoveredSig);
return ProcessMessageRecoveredSig(pfrom, recoveredSig);
}
return {};
}

PeerMsgRet CSigningManager::ProcessMessageRecoveredSig(const CNode& pfrom, gsl::not_null<PeerManager*> peerman, const std::shared_ptr<const CRecoveredSig>& recoveredSig)
PeerMsgRet CSigningManager::ProcessMessageRecoveredSig(const CNode& pfrom, const std::shared_ptr<const CRecoveredSig>& recoveredSig)
{
{
LOCK(cs_main);
Expand Down Expand Up @@ -615,12 +615,6 @@ PeerMsgRet CSigningManager::ProcessMessageRecoveredSig(const CNode& pfrom, gsl::
return {};
}

if (m_peerman == nullptr) {
m_peerman = peerman;
}
// we should never use one CSigningManager with different PeerManager
assert(m_peerman == peerman);

pendingRecoveredSigs[pfrom.GetId()].emplace_back(recoveredSig);
return {};
}
Expand Down Expand Up @@ -776,7 +770,7 @@ bool CSigningManager::ProcessPendingRecoveredSigs()

if (batchVerifier.badSources.count(nodeId)) {
LogPrint(BCLog::LLMQ, "CSigningManager::%s -- invalid recSig from other node, banning peer=%d\n", __func__, nodeId);
m_peerman.load()->Misbehaving(nodeId, 100);
Assert(m_peerman)->Misbehaving(nodeId, 100);
continue;
}

Expand Down Expand Up @@ -840,9 +834,9 @@ void CSigningManager::ProcessRecoveredSig(const std::shared_ptr<const CRecovered

if (m_mn_activeman != nullptr) {
CInv inv(MSG_QUORUM_RECOVERED_SIG, recoveredSig->GetHash());
connman.ForEachNode([&](CNode* pnode) {
connman.ForEachNode([&](const CNode* pnode) {
if (pnode->fSendRecSigs) {
pnode->PushInventory(inv);
Assert(m_peerman)->PushInventory(pnode->GetId(), inv);
}
});
}
Expand Down
10 changes: 5 additions & 5 deletions src/llmq/signing.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,7 @@ class CSigningManager
CConnman& connman;
const CActiveMasternodeManager* const m_mn_activeman;
const CQuorumManager& qman;

std::atomic<PeerManager*> m_peerman{nullptr};
const std::unique_ptr<PeerManager>& m_peerman;

// Incoming and not verified yet
std::unordered_map<NodeId, std::list<std::shared_ptr<const CRecoveredSig>>> pendingRecoveredSigs GUARDED_BY(cs);
Expand All @@ -179,12 +178,13 @@ class CSigningManager
std::vector<CRecoveredSigsListener*> recoveredSigsListeners GUARDED_BY(cs);

public:
CSigningManager(CConnman& _connman, const CActiveMasternodeManager* const mn_activeman, const CQuorumManager& _qman, bool fMemory, bool fWipe);
CSigningManager(CConnman& _connman, const CActiveMasternodeManager* const mn_activeman, const CQuorumManager& _qman,
const std::unique_ptr<PeerManager>& peerman, bool fMemory, bool fWipe);

bool AlreadyHave(const CInv& inv) const;
bool GetRecoveredSigForGetData(const uint256& hash, CRecoveredSig& ret) const;

PeerMsgRet ProcessMessage(const CNode& pnode, gsl::not_null<PeerManager*> peerman, const std::string& msg_type, CDataStream& vRecv);
PeerMsgRet ProcessMessage(const CNode& pnode, const std::string& msg_type, CDataStream& vRecv);

// This is called when a recovered signature was was reconstructed from another P2P message and is known to be valid
// This is the case for example when a signature appears as part of InstantSend or ChainLocks
Expand All @@ -197,7 +197,7 @@ class CSigningManager
void TruncateRecoveredSig(Consensus::LLMQType llmqType, const uint256& id);

private:
PeerMsgRet ProcessMessageRecoveredSig(const CNode& pfrom, gsl::not_null<PeerManager*> peerman, const std::shared_ptr<const CRecoveredSig>& recoveredSig);
PeerMsgRet ProcessMessageRecoveredSig(const CNode& pfrom, const std::shared_ptr<const CRecoveredSig>& recoveredSig);
static bool PreVerifyRecoveredSig(const CQuorumManager& quorum_manager, const CRecoveredSig& recoveredSig, bool& retBan);

void CollectPendingRecoveredSigsToVerify(size_t maxUniqueSessions,
Expand Down
Loading

0 comments on commit 33098ae

Please sign in to comment.