diff --git a/src/bitcoin-cli.cpp b/src/bitcoin-cli.cpp index 4b2f6e8461fa8..9ad44928a000e 100644 --- a/src/bitcoin-cli.cpp +++ b/src/bitcoin-cli.cpp @@ -525,7 +525,7 @@ class NetinfoRequestHandler : public BaseRequestHandler const int8_t network_id{NetworkStringToId(network)}; if (network_id == UNKNOWN_NETWORK) continue; const bool is_outbound{!peer["inbound"].get_bool()}; - const bool is_block_relay{!peer["relaytxes"].get_bool()}; + const bool is_block_relay{peer["relaytxes"].isNull() ? false : !peer["relaytxes"].get_bool()}; ++m_counts.at(is_outbound).at(network_id); // in/out by network ++m_counts.at(is_outbound).at(m_networks.size()); // in/out overall ++m_counts.at(2).at(network_id); // total by network diff --git a/src/governance/governance.cpp b/src/governance/governance.cpp index 19348ba303b8f..023d0603e99f6 100644 --- a/src/governance/governance.cpp +++ b/src/governance/governance.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -141,9 +142,9 @@ PeerMsgRet CGovernanceManager::ProcessMessage(CNode& peer, CConnman& connman, Pe LogPrint(BCLog::GOBJECT, "MNGOVERNANCESYNC -- syncing governance objects to our peer %s\n", peer.GetLogString()); if (nProp == uint256()) { - return SyncObjects(peer, connman); + return SyncObjects(peer, peerman, connman); } else { - SyncSingleObjVotes(peer, nProp, filter, connman); + SyncSingleObjVotes(peer, peerman, nProp, filter, connman); } } @@ -858,7 +859,7 @@ bool CGovernanceManager::ConfirmInventoryRequest(const CInv& inv) return true; } -void CGovernanceManager::SyncSingleObjVotes(CNode& peer, const uint256& nProp, const CBloomFilter& filter, CConnman& connman) +void CGovernanceManager::SyncSingleObjVotes(CNode& peer, PeerManager& peerman, const uint256& nProp, const CBloomFilter& filter, CConnman& connman) { // do not provide any data until our node is synced if (!Assert(m_mn_sync)->IsSynced()) return; @@ -899,7 +900,7 @@ void CGovernanceManager::SyncSingleObjVotes(CNode& peer, const uint256& nProp, c if (filter.contains(nVoteHash) || !vote.IsValid(tip_mn_list, onlyVotingKeyAllowed)) { continue; } - peer.PushInventory(CInv(MSG_GOVERNANCE_OBJECT_VOTE, nVoteHash)); + peerman.PushInventory(peer.GetId(), CInv(MSG_GOVERNANCE_OBJECT_VOTE, nVoteHash)); ++nVoteCount; } @@ -908,7 +909,7 @@ void CGovernanceManager::SyncSingleObjVotes(CNode& peer, const uint256& nProp, c LogPrint(BCLog::GOBJECT, "CGovernanceManager::%s -- sent %d votes to peer=%d\n", __func__, nVoteCount, peer.GetId()); } -PeerMsgRet CGovernanceManager::SyncObjects(CNode& peer, CConnman& connman) const +PeerMsgRet CGovernanceManager::SyncObjects(CNode& peer, PeerManager& peerman, CConnman& connman) const { assert(m_netfulfilledman.IsValid()); @@ -959,7 +960,7 @@ PeerMsgRet CGovernanceManager::SyncObjects(CNode& peer, CConnman& connman) const // Push the inventory budget proposal message over to the other client LogPrint(BCLog::GOBJECT, "CGovernanceManager::%s -- syncing govobj: %s, peer=%d\n", __func__, strHash, peer.GetId()); - peer.PushInventory(CInv(MSG_GOVERNANCE_OBJECT, nHash)); + peerman.PushInventory(peer.GetId(), CInv(MSG_GOVERNANCE_OBJECT, nHash)); ++nObjCount; } diff --git a/src/governance/governance.h b/src/governance/governance.h index ddb859603e99a..0747e5dafaafd 100644 --- a/src/governance/governance.h +++ b/src/governance/governance.h @@ -292,8 +292,8 @@ class CGovernanceManager : public GovernanceStore */ bool ConfirmInventoryRequest(const CInv& inv); - void SyncSingleObjVotes(CNode& peer, const uint256& nProp, const CBloomFilter& filter, CConnman& connman); - PeerMsgRet SyncObjects(CNode& peer, CConnman& connman) const; + void SyncSingleObjVotes(CNode& peer, PeerManager& peerman, const uint256& nProp, const CBloomFilter& filter, CConnman& connman); + PeerMsgRet SyncObjects(CNode& peer, PeerManager& peerman, CConnman& connman) const; PeerMsgRet ProcessMessage(CNode& peer, CConnman& connman, PeerManager& peerman, std::string_view msg_type, CDataStream& vRecv); diff --git a/src/init.cpp b/src/init.cpp index f57dd4a5e0eea..d52ab19225c17 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -1256,6 +1256,11 @@ bool AppInitParameterInteraction(const ArgsManager& args) return InitError(Untranslated("Cannot set -bind or -whitebind together with -listen=0")); } + // if listen=0, then disallow listenonion=1 + if (!args.GetBoolArg("-listen", DEFAULT_LISTEN) && args.GetBoolArg("-listenonion", DEFAULT_LISTEN_ONION)) { + return InitError(Untranslated("Cannot set -listen=0 together with -listenonion=1")); + } + // Make sure enough file descriptors are available int nBind = std::max(nUserBind, size_t(1)); nUserMaxConnections = args.GetArg("-maxconnections", DEFAULT_MAX_PEER_CONNECTIONS); diff --git a/src/llmq/context.cpp b/src/llmq/context.cpp index 919c9dce8c61e..220cd2de11562 100644 --- a/src/llmq/context.cpp +++ b/src/llmq/context.cpp @@ -29,13 +29,13 @@ LLMQContext::LLMQContext(CChainState& chainstate, CConnman& connman, CDeterminis llmq::quorumBlockProcessor = std::make_unique(chainstate, dmnman, evo_db, peerman); return llmq::quorumBlockProcessor.get(); }()}, - qdkgsman{std::make_unique(*bls_worker, chainstate, connman, dmnman, *dkg_debugman, mn_metaman, *quorum_block_processor, mn_activeman, sporkman, unit_tests, wipe)}, + qdkgsman{std::make_unique(*bls_worker, chainstate, connman, dmnman, *dkg_debugman, mn_metaman, *quorum_block_processor, mn_activeman, sporkman, peerman, unit_tests, wipe)}, qman{[&]() -> llmq::CQuorumManager* const { assert(llmq::quorumManager == nullptr); llmq::quorumManager = std::make_unique(*bls_worker, chainstate, connman, dmnman, *qdkgsman, evo_db, *quorum_block_processor, mn_activeman, mn_sync, sporkman); return llmq::quorumManager.get(); }()}, - sigman{std::make_unique(connman, mn_activeman, *llmq::quorumManager, unit_tests, wipe)}, + sigman{std::make_unique(connman, mn_activeman, *llmq::quorumManager, peerman, unit_tests, wipe)}, shareman{std::make_unique(connman, *sigman, mn_activeman, *llmq::quorumManager, sporkman, peerman)}, clhandler{[&]() -> llmq::CChainLocksHandler* const { assert(llmq::chainLocksHandler == nullptr); diff --git a/src/llmq/dkgsession.cpp b/src/llmq/dkgsession.cpp index ead2adab8fdeb..bcbf91fe7e0a6 100644 --- a/src/llmq/dkgsession.cpp +++ b/src/llmq/dkgsession.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -1326,10 +1327,10 @@ void CDKGSession::RelayInvToParticipants(const CInv& inv) const myProTxHash.ToString().substr(0, 4), ss.str()); std::stringstream ss2; - connman.ForEachNode([&](CNode* pnode) { + connman.ForEachNode([&](const CNode* pnode) { if (pnode->qwatch || (!pnode->GetVerifiedProRegTxHash().IsNull() && (relayMembers.count(pnode->GetVerifiedProRegTxHash()) != 0))) { - pnode->PushInventory(inv); + Assert(m_peerman)->PushInventory(pnode->GetId(), inv); } if (pnode->GetVerifiedProRegTxHash().IsNull()) { diff --git a/src/llmq/dkgsession.h b/src/llmq/dkgsession.h index 5b19d6a0973da..a5b58d150012e 100644 --- a/src/llmq/dkgsession.h +++ b/src/llmq/dkgsession.h @@ -22,6 +22,7 @@ class CDeterministicMN; class CMasternodeMetaMan; class CSporkManager; class UniValue; +class PeerManager; using CDeterministicMNCPtr = std::shared_ptr; @@ -280,6 +281,7 @@ class CDKGSession CMasternodeMetaMan& m_mn_metaman; const CActiveMasternodeManager* const m_mn_activeman; const CSporkManager& m_sporkman; + const std::unique_ptr& m_peerman; const CBlockIndex* m_quorum_base_block_index{nullptr}; int quorumIndex{0}; @@ -322,9 +324,10 @@ class CDKGSession CDKGSession(const Consensus::LLMQParams& _params, CBLSWorker& _blsWorker, CConnman& _connman, CDeterministicMNManager& dmnman, CDKGSessionManager& _dkgManager, CDKGDebugManager& _dkgDebugManager, CMasternodeMetaMan& mn_metaman, const CActiveMasternodeManager* const mn_activeman, - const CSporkManager& sporkman) : + const CSporkManager& sporkman, const std::unique_ptr& peerman) : params(_params), blsWorker(_blsWorker), cache(_blsWorker), connman(_connman), m_dmnman(dmnman), dkgManager(_dkgManager), - dkgDebugManager(_dkgDebugManager), m_mn_metaman(mn_metaman), m_mn_activeman(mn_activeman), m_sporkman(sporkman) {} + dkgDebugManager(_dkgDebugManager), m_mn_metaman(mn_metaman), m_mn_activeman(mn_activeman), m_sporkman(sporkman), + m_peerman(peerman) {} bool Init(gsl::not_null pQuorumBaseBlockIndex, Span mns, const uint256& _myProTxHash, int _quorumIndex); diff --git a/src/llmq/dkgsessionhandler.cpp b/src/llmq/dkgsessionhandler.cpp index 7b48885bded87..4a95aefde2246 100644 --- a/src/llmq/dkgsessionhandler.cpp +++ b/src/llmq/dkgsessionhandler.cpp @@ -27,7 +27,7 @@ namespace llmq CDKGSessionHandler::CDKGSessionHandler(CBLSWorker& _blsWorker, CChainState& chainstate, CConnman& _connman, CDeterministicMNManager& dmnman, CDKGDebugManager& _dkgDebugManager, CDKGSessionManager& _dkgManager, CMasternodeMetaMan& mn_metaman, CQuorumBlockProcessor& _quorumBlockProcessor, const CActiveMasternodeManager* const mn_activeman, - const CSporkManager& sporkman, const Consensus::LLMQParams& _params, int _quorumIndex) : + const CSporkManager& sporkman, const std::unique_ptr& peerman, const Consensus::LLMQParams& _params, int _quorumIndex) : blsWorker(_blsWorker), m_chainstate(chainstate), connman(_connman), @@ -38,9 +38,10 @@ CDKGSessionHandler::CDKGSessionHandler(CBLSWorker& _blsWorker, CChainState& chai quorumBlockProcessor(_quorumBlockProcessor), m_mn_activeman(mn_activeman), m_sporkman(sporkman), + m_peerman(peerman), params(_params), quorumIndex(_quorumIndex), - curSession(std::make_unique(_params, _blsWorker, _connman, dmnman, _dkgManager, _dkgDebugManager, m_mn_metaman, m_mn_activeman, sporkman)), + curSession(std::make_unique(_params, _blsWorker, _connman, dmnman, _dkgManager, _dkgDebugManager, m_mn_metaman, m_mn_activeman, sporkman, peerman)), pendingContributions((size_t)_params.size * 2, MSG_QUORUM_CONTRIB), // we allow size*2 messages as we need to make sure we see bad behavior (double messages) pendingComplaints((size_t)_params.size * 2, MSG_QUORUM_COMPLAINT), pendingJustifications((size_t)_params.size * 2, MSG_QUORUM_JUSTIFICATION), @@ -188,7 +189,7 @@ void CDKGSessionHandler::StopThread() bool CDKGSessionHandler::InitNewQuorum(const CBlockIndex* pQuorumBaseBlockIndex) { - curSession = std::make_unique(params, blsWorker, connman, m_dmnman, dkgManager, dkgDebugManager, m_mn_metaman, m_mn_activeman, m_sporkman); + curSession = std::make_unique(params, blsWorker, connman, m_dmnman, dkgManager, dkgDebugManager, m_mn_metaman, m_mn_activeman, m_sporkman, m_peerman); if (!DeploymentDIP0003Enforced(pQuorumBaseBlockIndex->nHeight, Params().GetConsensus())) { return false; diff --git a/src/llmq/dkgsessionhandler.h b/src/llmq/dkgsessionhandler.h index fb509be8ec8b8..e7153bc1fcad6 100644 --- a/src/llmq/dkgsessionhandler.h +++ b/src/llmq/dkgsessionhandler.h @@ -130,6 +130,7 @@ class CDKGSessionHandler CQuorumBlockProcessor& quorumBlockProcessor; const CActiveMasternodeManager* const m_mn_activeman; const CSporkManager& m_sporkman; + const std::unique_ptr& m_peerman; const Consensus::LLMQParams params; const int quorumIndex; @@ -151,7 +152,7 @@ class CDKGSessionHandler CDKGSessionHandler(CBLSWorker& _blsWorker, CChainState& chainstate, CConnman& _connman, CDeterministicMNManager& dmnman, CDKGDebugManager& _dkgDebugManager, CDKGSessionManager& _dkgManager, CMasternodeMetaMan& mn_metaman, CQuorumBlockProcessor& _quorumBlockProcessor, const CActiveMasternodeManager* const mn_activeman, - const CSporkManager& sporkman, const Consensus::LLMQParams& _params, int _quorumIndex); + const CSporkManager& sporkman, const std::unique_ptr& peerman, const Consensus::LLMQParams& _params, int _quorumIndex); ~CDKGSessionHandler() = default; void UpdatedBlockTip(const CBlockIndex *pindexNew); diff --git a/src/llmq/dkgsessionmgr.cpp b/src/llmq/dkgsessionmgr.cpp index 9a618440736f1..ede710a0e9031 100644 --- a/src/llmq/dkgsessionmgr.cpp +++ b/src/llmq/dkgsessionmgr.cpp @@ -26,7 +26,8 @@ static const std::string DB_ENC_CONTRIB = "qdkg_E"; CDKGSessionManager::CDKGSessionManager(CBLSWorker& _blsWorker, CChainState& chainstate, CConnman& _connman, CDeterministicMNManager& dmnman, CDKGDebugManager& _dkgDebugManager, CMasternodeMetaMan& mn_metaman, CQuorumBlockProcessor& _quorumBlockProcessor, - const CActiveMasternodeManager* const mn_activeman, const CSporkManager& sporkman, bool unitTests, bool fWipe) : + const CActiveMasternodeManager* const mn_activeman, const CSporkManager& sporkman, + const std::unique_ptr& peerman, bool unitTests, bool fWipe) : db(std::make_unique(unitTests ? "" : (GetDataDir() / "llmq/dkgdb"), 1 << 20, unitTests, fWipe)), blsWorker(_blsWorker), m_chainstate(chainstate), @@ -49,7 +50,8 @@ CDKGSessionManager::CDKGSessionManager(CBLSWorker& _blsWorker, CChainState& chai for (const auto i : irange::range(session_count)) { dkgSessionHandlers.emplace(std::piecewise_construct, std::forward_as_tuple(params.type, i), - std::forward_as_tuple(blsWorker, m_chainstate, connman, dmnman, dkgDebugManager, *this, mn_metaman, quorumBlockProcessor, mn_activeman, spork_manager, params, i)); + std::forward_as_tuple(blsWorker, m_chainstate, connman, dmnman, dkgDebugManager, *this, mn_metaman, + quorumBlockProcessor, mn_activeman, spork_manager, peerman, params, i)); } } } diff --git a/src/llmq/dkgsessionmgr.h b/src/llmq/dkgsessionmgr.h index 6077b8373cfa6..92094063a8344 100644 --- a/src/llmq/dkgsessionmgr.h +++ b/src/llmq/dkgsessionmgr.h @@ -69,7 +69,8 @@ class CDKGSessionManager public: CDKGSessionManager(CBLSWorker& _blsWorker, CChainState& chainstate, CConnman& _connman, CDeterministicMNManager& dmnman, CDKGDebugManager& _dkgDebugManager, CMasternodeMetaMan& mn_metaman, CQuorumBlockProcessor& _quorumBlockProcessor, - const CActiveMasternodeManager* const mn_activeman, const CSporkManager& sporkman, bool unitTests, bool fWipe); + const CActiveMasternodeManager* const mn_activeman, const CSporkManager& sporkman, + const std::unique_ptr& peerman, bool unitTests, bool fWipe); ~CDKGSessionManager() = default; void StartThreads(); diff --git a/src/llmq/instantsend.cpp b/src/llmq/instantsend.cpp index 76d42113cb19b..3487d33f86025 100644 --- a/src/llmq/instantsend.cpp +++ b/src/llmq/instantsend.cpp @@ -1452,12 +1452,9 @@ void CInstantSendManager::AskNodesForLockedTx(const uint256& txid, const CConnma if (nodesToAskFor.size() >= 4) { return; } - if (peerman.CanRelayAddrs(pnode->GetId())) { - LOCK(pnode->m_tx_relay->cs_tx_inventory); - if (pnode->m_tx_relay->filterInventoryKnown.contains(txid)) { - pnode->AddRef(); - nodesToAskFor.emplace_back(pnode); - } + if (peerman.IsInvInFilter(pnode->GetId(), txid)) { + pnode->AddRef(); + nodesToAskFor.emplace_back(pnode); } }; diff --git a/src/llmq/signing.cpp b/src/llmq/signing.cpp index 4e4590c4de52e..f7e3a08881050 100644 --- a/src/llmq/signing.cpp +++ b/src/llmq/signing.cpp @@ -540,8 +540,8 @@ void CRecoveredSigsDb::CleanupOldVotes(int64_t maxAge) ////////////////// CSigningManager::CSigningManager(CConnman& _connman, const CActiveMasternodeManager* const mn_activeman, const CQuorumManager& _qman, - bool fMemory, bool fWipe) : - db(fMemory, fWipe), connman(_connman), m_mn_activeman(mn_activeman), qman(_qman) + const std::unique_ptr& peerman, bool fMemory, bool fWipe) : + db(fMemory, fWipe), connman(_connman), m_mn_activeman(mn_activeman), qman(_qman), m_peerman(peerman) { } @@ -572,18 +572,18 @@ bool CSigningManager::GetRecoveredSigForGetData(const uint256& hash, CRecoveredS return true; } -PeerMsgRet CSigningManager::ProcessMessage(const CNode& pfrom, gsl::not_null peerman, const std::string& msg_type, CDataStream& vRecv) +PeerMsgRet CSigningManager::ProcessMessage(const CNode& pfrom, const std::string& msg_type, CDataStream& vRecv) { if (msg_type == NetMsgType::QSIGREC) { auto recoveredSig = std::make_shared(); vRecv >> *recoveredSig; - return ProcessMessageRecoveredSig(pfrom, peerman, recoveredSig); + return ProcessMessageRecoveredSig(pfrom, recoveredSig); } return {}; } -PeerMsgRet CSigningManager::ProcessMessageRecoveredSig(const CNode& pfrom, gsl::not_null peerman, const std::shared_ptr& recoveredSig) +PeerMsgRet CSigningManager::ProcessMessageRecoveredSig(const CNode& pfrom, const std::shared_ptr& recoveredSig) { { LOCK(cs_main); @@ -615,12 +615,6 @@ PeerMsgRet CSigningManager::ProcessMessageRecoveredSig(const CNode& pfrom, gsl:: return {}; } - if (m_peerman == nullptr) { - m_peerman = peerman; - } - // we should never use one CSigningManager with different PeerManager - assert(m_peerman == peerman); - pendingRecoveredSigs[pfrom.GetId()].emplace_back(recoveredSig); return {}; } @@ -776,7 +770,7 @@ bool CSigningManager::ProcessPendingRecoveredSigs() if (batchVerifier.badSources.count(nodeId)) { LogPrint(BCLog::LLMQ, "CSigningManager::%s -- invalid recSig from other node, banning peer=%d\n", __func__, nodeId); - m_peerman.load()->Misbehaving(nodeId, 100); + Assert(m_peerman)->Misbehaving(nodeId, 100); continue; } @@ -840,9 +834,9 @@ void CSigningManager::ProcessRecoveredSig(const std::shared_ptrGetHash()); - connman.ForEachNode([&](CNode* pnode) { + connman.ForEachNode([&](const CNode* pnode) { if (pnode->fSendRecSigs) { - pnode->PushInventory(inv); + Assert(m_peerman)->PushInventory(pnode->GetId(), inv); } }); } diff --git a/src/llmq/signing.h b/src/llmq/signing.h index 92102dbe855b3..847b9685851b7 100644 --- a/src/llmq/signing.h +++ b/src/llmq/signing.h @@ -165,8 +165,7 @@ class CSigningManager CConnman& connman; const CActiveMasternodeManager* const m_mn_activeman; const CQuorumManager& qman; - - std::atomic m_peerman{nullptr}; + const std::unique_ptr& m_peerman; // Incoming and not verified yet std::unordered_map>> pendingRecoveredSigs GUARDED_BY(cs); @@ -179,12 +178,13 @@ class CSigningManager std::vector recoveredSigsListeners GUARDED_BY(cs); public: - CSigningManager(CConnman& _connman, const CActiveMasternodeManager* const mn_activeman, const CQuorumManager& _qman, bool fMemory, bool fWipe); + CSigningManager(CConnman& _connman, const CActiveMasternodeManager* const mn_activeman, const CQuorumManager& _qman, + const std::unique_ptr& peerman, bool fMemory, bool fWipe); bool AlreadyHave(const CInv& inv) const; bool GetRecoveredSigForGetData(const uint256& hash, CRecoveredSig& ret) const; - PeerMsgRet ProcessMessage(const CNode& pnode, gsl::not_null peerman, const std::string& msg_type, CDataStream& vRecv); + PeerMsgRet ProcessMessage(const CNode& pnode, const std::string& msg_type, CDataStream& vRecv); // This is called when a recovered signature was was reconstructed from another P2P message and is known to be valid // This is the case for example when a signature appears as part of InstantSend or ChainLocks @@ -197,7 +197,7 @@ class CSigningManager void TruncateRecoveredSig(Consensus::LLMQType llmqType, const uint256& id); private: - PeerMsgRet ProcessMessageRecoveredSig(const CNode& pfrom, gsl::not_null peerman, const std::shared_ptr& recoveredSig); + PeerMsgRet ProcessMessageRecoveredSig(const CNode& pfrom, const std::shared_ptr& recoveredSig); static bool PreVerifyRecoveredSig(const CQuorumManager& quorum_manager, const CRecoveredSig& recoveredSig, bool& retBan); void CollectPendingRecoveredSigsToVerify(size_t maxUniqueSessions, diff --git a/src/net.cpp b/src/net.cpp index 499fd05f91818..e4680255e282f 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -654,14 +654,8 @@ void CNode::copyStats(CNodeStats &stats, const std::vector &m_asmap) X(addrBind); stats.m_network = ConnectedThroughNetwork(); stats.m_mapped_as = addr.GetMappedAS(m_asmap); - if (!IsBlockOnlyConn()) { - LOCK(m_tx_relay->cs_filter); - stats.fRelayTxes = m_tx_relay->fRelayTxes; - } else { - stats.fRelayTxes = false; - } - X(nLastSend); - X(nLastRecv); + X(m_last_send); + X(m_last_recv); X(nLastTXTime); X(nLastBlockTime); X(nTimeConnected); @@ -710,7 +704,7 @@ bool CNode::ReceiveMsgBytes(Span msg_bytes, bool& complete) // TODO: use mocktime here after bitcoin#19499 is backported const auto time = std::chrono::microseconds(GetTimeMicros()); LOCK(cs_vRecv); - nLastRecv = std::chrono::duration_cast(time).count(); + m_last_recv = std::chrono::duration_cast(time); nRecvBytes += msg_bytes.size(); while (msg_bytes.size() > 0) { // absorb network data @@ -881,7 +875,7 @@ size_t CConnman::SocketSendData(CNode& node) nBytes = send(node.hSocket, reinterpret_cast(data.data()) + node.nSendOffset, data.size() - node.nSendOffset, MSG_NOSIGNAL | MSG_DONTWAIT); } if (nBytes > 0) { - node.nLastSend = GetTimeSeconds(); + node.m_last_send = GetTime(); node.nSendBytes += nBytes; node.nSendOffset += nBytes; nSentSize += nBytes; @@ -957,7 +951,7 @@ static bool CompareNodeTXTime(const NodeEvictionCandidate &a, const NodeEviction { // There is a fall-through here because it is common for a node to have more than a few peers that have not yet relayed txn. if (a.nLastTXTime != b.nLastTXTime) return a.nLastTXTime < b.nLastTXTime; - if (a.fRelayTxes != b.fRelayTxes) return b.fRelayTxes; + if (a.m_relay_txs != b.m_relay_txs) return b.m_relay_txs; if (a.fBloomFilter != b.fBloomFilter) return a.fBloomFilter; return a.nTimeConnected > b.nTimeConnected; } @@ -965,7 +959,7 @@ static bool CompareNodeTXTime(const NodeEvictionCandidate &a, const NodeEviction // Pick out the potential block-relay only peers, and sort them by last block time. static bool CompareNodeBlockRelayOnlyTime(const NodeEvictionCandidate &a, const NodeEvictionCandidate &b) { - if (a.fRelayTxes != b.fRelayTxes) return a.fRelayTxes; + if (a.m_relay_txs != b.m_relay_txs) return a.m_relay_txs; if (a.nLastBlockTime != b.nLastBlockTime) return a.nLastBlockTime < b.nLastBlockTime; if (a.fRelevantServices != b.fRelevantServices) return b.fRelevantServices; return a.nTimeConnected > b.nTimeConnected; @@ -1034,7 +1028,7 @@ void ProtectEvictionCandidatesByRatio(std::vector& vEvict // Protect up to 8 non-tx-relay peers that have sent us novel blocks. const size_t erase_size = std::min(size_t(8), vEvictionCandidates.size()); EraseLastKElements(vEvictionCandidates, CompareNodeBlockRelayOnlyTime, erase_size, - [](const NodeEvictionCandidate& n) { return !n.fRelayTxes && n.fRelevantServices; }); + [](const NodeEvictionCandidate& n) { return !n.m_relay_txs && n.fRelevantServices; }); // Protect 4 nodes that most recently sent us novel blocks. // An attacker cannot manipulate this metric without performing useful work. @@ -1120,18 +1114,11 @@ bool CConnman::AttemptToEvictConnection() } } - bool peer_relay_txes = false; - bool peer_filter_not_null = false; - if (!node->IsBlockOnlyConn()) { - LOCK(node->m_tx_relay->cs_filter); - peer_relay_txes = node->m_tx_relay->fRelayTxes; - peer_filter_not_null = node->m_tx_relay->pfilter != nullptr; - } NodeEvictionCandidate candidate = {node->GetId(), node->nTimeConnected, node->m_min_ping_time, node->nLastBlockTime, node->nLastTXTime, HasAllDesirableServiceFlags(node->nServices), - peer_relay_txes, peer_filter_not_null, node->nKeyedNetGroup, - node->m_prefer_evict, node->addr.IsLocal(), + node->m_relays_txs.load(), node->m_bloom_filter_loaded.load(), + node->nKeyedNetGroup, node->m_prefer_evict, node->addr.IsLocal(), node->m_inbound_onion}; vEvictionCandidates.push_back(candidate); } @@ -1524,31 +1511,33 @@ void CConnman::CalculateNumConnectionsChangedStats() statsClient.gauge("peers.torConnections", torNodes, 1.0f); } -bool CConnman::ShouldRunInactivityChecks(const CNode& node, int64_t now) const +bool CConnman::ShouldRunInactivityChecks(const CNode& node, std::chrono::seconds now) const { - return node.nTimeConnected + m_peer_connect_timeout < now; + return std::chrono::seconds{node.nTimeConnected} + m_peer_connect_timeout < now; } bool CConnman::InactivityCheck(const CNode& node) const { - // Use non-mockable system time (otherwise these timers will pop when we - // use setmocktime in the tests). - int64_t now = GetTimeSeconds(); + // Tests that see disconnects after using mocktime can start nodes with a + // large timeout. For example, -peertimeout=999999999. + const auto now{GetTime()}; + const auto last_send{node.m_last_send.load()}; + const auto last_recv{node.m_last_recv.load()}; if (!ShouldRunInactivityChecks(node, now)) return false; - if (node.nLastRecv == 0 || node.nLastSend == 0) { - LogPrint(BCLog::NET, "socket no message in first %i seconds, %d %d peer=%d\n", m_peer_connect_timeout, node.nLastRecv != 0, node.nLastSend != 0, node.GetId()); + if (last_recv.count() == 0 || last_send.count() == 0) { + LogPrint(BCLog::NET, "socket no message in first %i seconds, %d %d peer=%d\n", count_seconds(m_peer_connect_timeout), last_recv.count() != 0, last_send.count() != 0, node.GetId()); return true; } - if (now > node.nLastSend + TIMEOUT_INTERVAL) { - LogPrint(BCLog::NET, "socket sending timeout: %is peer=%d\n", now - node.nLastSend, node.GetId()); + if (now > last_send + TIMEOUT_INTERVAL) { + LogPrint(BCLog::NET, "socket sending timeout: %is peer=%d\n", count_seconds(now - last_send), node.GetId()); return true; } - if (now > node.nLastRecv + TIMEOUT_INTERVAL) { - LogPrint(BCLog::NET, "socket receive timeout: %is peer=%d\n", now - node.nLastRecv, node.GetId()); + if (now > last_recv + TIMEOUT_INTERVAL) { + LogPrint(BCLog::NET, "socket receive timeout: %is peer=%d\n", count_seconds(now - last_recv), node.GetId()); return true; } @@ -4266,7 +4255,11 @@ void CConnman::UnregisterEvents(CNode *pnode) } #endif } -void CaptureMessage(const CAddress& addr, const std::string& msg_type, const Span& data, bool is_incoming) + +void CaptureMessageToFile(const CAddress& addr, + const std::string& msg_type, + Span data, + bool is_incoming) { // Note: This function captures the message at the time of processing, // not at socket receive/send time. @@ -4293,3 +4286,9 @@ void CaptureMessage(const CAddress& addr, const std::string& msg_type, const Spa ser_writedata32(f, size); f.write(AsBytes(data)); } + +std::function data, + bool is_incoming)> + CaptureMessage = CaptureMessageToFile; diff --git a/src/net.h b/src/net.h index 72a329fceee57..59057603253cb 100644 --- a/src/net.h +++ b/src/net.h @@ -36,6 +36,7 @@ #include #include #include +#include #include #include #include @@ -63,7 +64,7 @@ static const bool DEFAULT_WHITELISTRELAY = true; static const bool DEFAULT_WHITELISTFORCERELAY = false; /** Time after which to disconnect, after waiting for a ping response (or inactivity). */ -static const int TIMEOUT_INTERVAL = 20 * 60; +static constexpr std::chrono::minutes TIMEOUT_INTERVAL{20}; /** Time to wait since nTimeConnected before disconnecting a probe node. **/ static const int PROBE_WAIT_INTERVAL = 5; /** Minimum time between warnings printed to log. */ @@ -274,9 +275,8 @@ class CNodeStats public: NodeId nodeid; ServiceFlags nServices; - bool fRelayTxes; - int64_t nLastSend; - int64_t nLastRecv; + std::chrono::seconds m_last_send; + std::chrono::seconds m_last_recv; int64_t nLastTXTime; int64_t nLastBlockTime; int64_t nTimeConnected; @@ -459,8 +459,8 @@ class CNode uint64_t nRecvBytes GUARDED_BY(cs_vRecv){0}; - std::atomic nLastSend{0}; - std::atomic nLastRecv{0}; + std::atomic m_last_send{0s}; + std::atomic m_last_recv{0s}; //! Unix epoch time at peer connection, in seconds. const int64_t nTimeConnected; std::atomic nTimeOffset{0}; @@ -580,34 +580,16 @@ class CNode assert(false); } - struct TxRelay { - mutable RecursiveMutex cs_filter; - // We use fRelayTxes for two purposes - - // a) it allows us to not relay tx invs before receiving the peer's version message - // b) the peer may tell us in its version message that we should not relay tx invs - // unless it loads a bloom filter. - bool fRelayTxes GUARDED_BY(cs_filter){false}; - std::unique_ptr pfilter PT_GUARDED_BY(cs_filter) GUARDED_BY(cs_filter){nullptr}; - - mutable RecursiveMutex cs_tx_inventory; - // inventory based relay - CRollingBloomFilter filterInventoryKnown GUARDED_BY(cs_tx_inventory){50000, 0.000001}; - // Set of transaction ids we still have to announce. - // They are sorted by the mempool before relay, so the order is not important. - std::set setInventoryTxToSend GUARDED_BY(cs_tx_inventory); - // List of non-tx/non-block inventory items - std::vector vInventoryOtherToSend GUARDED_BY(cs_tx_inventory); - // Used for BIP35 mempool sending, also protected by cs_tx_inventory - bool fSendMempool GUARDED_BY(cs_tx_inventory){false}; - // Last time a "MEMPOOL" request was serviced. - std::atomic m_last_mempool_req{0s}; - std::chrono::microseconds nNextInvSend{0}; - }; +public: + /** Whether we should relay transactions to this peer (their version + * message did not include fRelay=false and this is not a block-relay-only + * connection). This only changes from false to true. It will never change + * back to false. Used only in inbound eviction logic. */ + std::atomic_bool m_relays_txs{false}; - // in bitcoin: m_tx_relay == nullptr if we're not relaying transactions with this peer - // in dash: m_tx_relay should never be nullptr, we don't relay transactions if - // `IsBlockOnlyConn() == true` is instead - std::unique_ptr m_tx_relay{std::make_unique()}; + /** Whether this peer has loaded a bloom filter. Used only in inbound + * eviction logic. */ + std::atomic_bool m_bloom_filter_loaded{false}; /** UNIX epoch time of the last block received from this peer that we had * not yet seen (e.g. not already received from another peer), that passed @@ -694,33 +676,6 @@ class CNode nRefCount--; } - void AddKnownInventory(const uint256& hash) - { - LOCK(m_tx_relay->cs_tx_inventory); - m_tx_relay->filterInventoryKnown.insert(hash); - } - - void PushInventory(const CInv& inv) - { - ASSERT_IF_DEBUG(inv.type != MSG_BLOCK); - if (inv.type == MSG_BLOCK) { - LogPrintf("%s -- WARNING: using PushInventory for BLOCK inv, peer=%d\n", __func__, id); - return; - } - - LOCK(m_tx_relay->cs_tx_inventory); - if (m_tx_relay->filterInventoryKnown.contains(inv.hash)) { - LogPrint(BCLog::NET, "%s -- skipping known inv: %s peer=%d\n", __func__, inv.ToString(), id); - return; - } - LogPrint(BCLog::NET, "%s -- adding new inv: %s peer=%d\n", __func__, inv.ToString(), id); - if (inv.type == MSG_TX || inv.type == MSG_DSTX) { - m_tx_relay->setInventoryTxToSend.insert(inv.hash); - return; - } - m_tx_relay->vInventoryOtherToSend.push_back(inv); - } - void CloseSocketDisconnect(CConnman* connman); void copyStats(CNodeStats &stats, const std::vector &m_asmap); @@ -926,7 +881,7 @@ friend class CNode; m_msgproc = connOptions.m_msgproc; nSendBufferMaxSize = connOptions.nSendBufferMaxSize; nReceiveFloodSize = connOptions.nReceiveFloodSize; - m_peer_connect_timeout = connOptions.m_peer_connect_timeout; + m_peer_connect_timeout = std::chrono::seconds{connOptions.m_peer_connect_timeout}; { LOCK(cs_totalBytesSent); nMaxOutboundLimit = connOptions.nMaxOutboundLimit; @@ -1235,7 +1190,7 @@ friend class CNode; void SetAsmap(std::vector asmap) { addrman.m_asmap = std::move(asmap); } /** Return true if we should disconnect the peer for failing an inactivity check. */ - bool ShouldRunInactivityChecks(const CNode& node, int64_t secs_now) const; + bool ShouldRunInactivityChecks(const CNode& node, std::chrono::seconds now) const; private: struct ListenSocket { @@ -1351,7 +1306,7 @@ friend class CNode; uint64_t nMaxOutboundLimit GUARDED_BY(cs_totalBytesSent); // P2P timeout in seconds - int64_t m_peer_connect_timeout; + std::chrono::seconds m_peer_connect_timeout; // Whitelisted ranges. Any node connecting from these is automatically // whitelisted (as well as those connecting to whitelisted binds). @@ -1528,7 +1483,17 @@ friend class CNode; std::chrono::microseconds PoissonNextSend(std::chrono::microseconds now, std::chrono::seconds average_interval); /** Dump binary message to file, with timestamp */ -void CaptureMessage(const CAddress& addr, const std::string& msg_type, const Span& data, bool is_incoming); +void CaptureMessageToFile(const CAddress& addr, + const std::string& msg_type, + Span data, + bool is_incoming); + +/** Defaults to `CaptureMessageToFile()`, but can be overridden by unit tests. */ +extern std::function data, + bool is_incoming)> + CaptureMessage; struct NodeEvictionCandidate { @@ -1538,7 +1503,7 @@ struct NodeEvictionCandidate int64_t nLastBlockTime; int64_t nLastTXTime; bool fRelevantServices; - bool fRelayTxes; + bool m_relay_txs; bool fBloomFilter; uint64_t nKeyedNetGroup; bool prefer_evict; diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 77e551eb2893b..7f79ab4892282 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -262,6 +262,35 @@ struct Peer { /** Whether a ping has been requested by the user */ std::atomic m_ping_queued{false}; + struct TxRelay { + mutable RecursiveMutex m_bloom_filter_mutex; + // We use m_relay_txs for two purposes - + // a) it allows us to not relay tx invs before receiving the peer's version message + // b) the peer may tell us in its version message that we should not relay tx invs + // unless it loads a bloom filter. + bool m_relay_txs GUARDED_BY(m_bloom_filter_mutex){false}; + std::unique_ptr m_bloom_filter PT_GUARDED_BY(m_bloom_filter_mutex) GUARDED_BY(m_bloom_filter_mutex){nullptr}; + + mutable RecursiveMutex m_tx_inventory_mutex; + // inventory based relay + CRollingBloomFilter m_tx_inventory_known_filter GUARDED_BY(m_tx_inventory_mutex){50000, 0.000001}; + // Set of transaction ids we still have to announce. + // They are sorted by the mempool before relay, so the order is not important. + std::set m_tx_inventory_to_send GUARDED_BY(m_tx_inventory_mutex); + // List of non-tx/non-block inventory items + std::vector vInventoryOtherToSend GUARDED_BY(m_tx_inventory_mutex); + // Used for BIP35 mempool sending, also protected by m_tx_inventory_mutex + bool m_send_mempool GUARDED_BY(m_tx_inventory_mutex){false}; + // Last time a "MEMPOOL" request was serviced. + std::atomic m_last_mempool_req{0s}; + std::chrono::microseconds m_next_inv_send_time{0}; + }; + + // in bitcoin: m_tx_relay == nullptr if we're not relaying transactions with this peer + // in dash: m_tx_relay should never be nullptr, we don't relay transactions if + // `IsBlockOnlyConn() == true` is instead + std::unique_ptr m_tx_relay{std::make_unique()}; + /** A vector of addresses to send to the peer, limited to MAX_ADDR_TO_SEND. */ std::vector m_addrs_to_send; /** Probabilistic filter to track recent addr messages relayed with this @@ -289,6 +318,8 @@ struct Peer { * This field must correlate with whether m_addr_known has been * initialized.*/ std::atomic_bool m_addr_relay_enabled{false}; + /** Whether a Peer can only be relayed blocks */ + const bool m_block_relay_only{false}; /** Whether a getaddr request to this peer is outstanding. */ bool m_getaddr_sent{false}; /** Guards address sending timers. */ @@ -320,8 +351,10 @@ struct Peer { /** Work queue of items requested by this peer **/ std::deque m_getdata_requests GUARDED_BY(m_getdata_requests_mutex); - explicit Peer(NodeId id) + explicit Peer(NodeId id, bool block_relay_only) : m_id(id) + , m_tx_relay(std::make_unique()) + , m_block_relay_only{block_relay_only} {} }; @@ -358,6 +391,7 @@ class PeerManagerImpl final : public PeerManager bool GetNodeStateStats(NodeId nodeid, CNodeStateStats& stats) const override; bool IgnoresIncomingTxs() override { return m_ignore_incoming_txs; } void SendPings() override; + void PushInventory(NodeId nodeid, const CInv& inv) override; void RelayInv(CInv &inv, const int minProtoVersion) override; void RelayInvFiltered(CInv &inv, const CTransaction &relatedTx, const int minProtoVersion) override; void RelayInvFiltered(CInv &inv, const uint256 &relatedTxHash, const int minProtoVersion) override; @@ -367,7 +401,7 @@ class PeerManagerImpl final : public PeerManager void ProcessMessage(CNode& pfrom, const std::string& msg_type, CDataStream& vRecv, const std::chrono::microseconds time_received, const std::atomic& interruptMsgProc) override; bool IsBanned(NodeId pnode) override EXCLUSIVE_LOCKS_REQUIRED(cs_main); - bool CanRelayAddrs(NodeId pnode) const override; + bool IsInvInFilter(NodeId nodeid, const uint256& hash) const override; private: /** Helper to process result of external handlers of message */ @@ -430,7 +464,7 @@ class PeerManagerImpl final : public PeerManager void SendBlockTransactions(CNode& pfrom, const CBlock& block, const BlockTransactionsRequest& req); /** Send a version message to a peer */ - void PushNodeVersion(CNode& pnode, int64_t nTime); + void PushNodeVersion(CNode& pnode, const Peer& peer); /** Send a ping message every PING_INTERVAL or if requested via RPC. May * mark the peer to be disconnected if a ping has timed out. @@ -902,6 +936,39 @@ static void PushAddress(Peer& peer, const CAddress& addr, FastRandomContext& ins } } +static void AddKnownInv(Peer& peer, const uint256& hash) +{ + // Dash always initializes m_tx_relay + assert(peer.m_tx_relay != nullptr); + + LOCK(peer.m_tx_relay->m_tx_inventory_mutex); + peer.m_tx_relay->m_tx_inventory_known_filter.insert(hash); +} + +static void PushInv(Peer& peer, const CInv& inv) +{ + // Dash always initializes m_tx_relay + assert(peer.m_tx_relay != nullptr); + + ASSERT_IF_DEBUG(inv.type != MSG_BLOCK); + if (inv.type == MSG_BLOCK) { + LogPrintf("%s -- WARNING: using PushInv for BLOCK inv, peer=%d\n", __func__, peer.m_id); + return; + } + + LOCK(peer.m_tx_relay->m_tx_inventory_mutex); + if (peer.m_tx_relay->m_tx_inventory_known_filter.contains(inv.hash)) { + LogPrint(BCLog::NET, "%s -- skipping known inv: %s peer=%d\n", __func__, inv.ToString(), peer.m_id); + return; + } + LogPrint(BCLog::NET, "%s -- adding new inv: %s peer=%d\n", __func__, inv.ToString(), peer.m_id); + if (inv.type == MSG_TX || inv.type == MSG_DSTX) { + peer.m_tx_relay->m_tx_inventory_to_send.insert(inv.hash); + return; + } + peer.m_tx_relay->vInventoryOtherToSend.push_back(inv); +} + static void UpdatePreferredDownload(const CNode& node, CNodeState* state) EXCLUSIVE_LOCKS_REQUIRED(cs_main) { nPreferredDownload -= state->fPreferredDownload; @@ -1148,7 +1215,7 @@ void PeerManagerImpl::FindNextBlocksToDownload(NodeId nodeid, unsigned int count } } // namespace -void PeerManagerImpl::PushNodeVersion(CNode& pnode, int64_t nTime) +void PeerManagerImpl::PushNodeVersion(CNode& pnode, const Peer& peer) { const auto& params = Params(); @@ -1156,6 +1223,7 @@ void PeerManagerImpl::PushNodeVersion(CNode& pnode, int64_t nTime) // services we were offering when the CNode object was created for this // peer. ServiceFlags nLocalNodeServices = pnode.GetLocalServices(); + const int64_t nTime{count_seconds(GetTime())}; uint64_t nonce = pnode.GetLocalNonce(); const int nNodeStartingHeight{m_best_height}; NodeId nodeid = pnode.GetId(); @@ -1342,13 +1410,13 @@ void PeerManagerImpl::InitializeNode(CNode *pnode) { LOCK(cs_main); mapNodeState.emplace_hint(mapNodeState.end(), std::piecewise_construct, std::forward_as_tuple(nodeid), std::forward_as_tuple(pnode->IsInboundConn())); } + PeerRef peer = std::make_shared(nodeid, /* block_relay_only = */ pnode->IsBlockOnlyConn()); { - PeerRef peer = std::make_shared(nodeid); LOCK(m_peer_mutex); - m_peer_map.emplace_hint(m_peer_map.end(), nodeid, std::move(peer)); + m_peer_map.emplace_hint(m_peer_map.end(), nodeid, peer); } if (!pnode->IsInboundConn()) { - PushNodeVersion(*pnode, GetTime()); + PushNodeVersion(*pnode, *peer); } } @@ -1473,6 +1541,12 @@ bool PeerManagerImpl::GetNodeStateStats(NodeId nodeid, CNodeStateStats& stats) c ping_wait = GetTime() - peer->m_ping_start.load(); } + if (!peer->m_block_relay_only) { + stats.m_relay_txs = WITH_LOCK(peer->m_tx_relay->m_bloom_filter_mutex, return peer->m_tx_relay->m_relay_txs); + } else { + stats.m_relay_txs = false; + } + stats.m_ping_wait = ping_wait; stats.m_addr_processed = peer->m_addr_processed.load(); stats.m_addr_rate_limited = peer->m_addr_rate_limited.load(); @@ -1661,14 +1735,6 @@ bool PeerManagerImpl::IsBanned(NodeId pnode) return false; } -bool PeerManagerImpl::CanRelayAddrs(NodeId pnode) const -{ - PeerRef peer = GetPeerRef(pnode); - if (peer == nullptr) - return false; - return peer->m_addr_relay_enabled; -} - bool PeerManagerImpl::MaybePunishNodeForBlock(NodeId nodeid, const BlockValidationState& state, bool via_compact_block, const std::string& message) { @@ -2112,59 +2178,95 @@ void PeerManagerImpl::SendPings() for(auto& it : m_peer_map) it.second->m_ping_queued = true; } -void PeerManagerImpl::RelayInv(CInv &inv, const int minProtoVersion) { +bool PeerManagerImpl::IsInvInFilter(NodeId nodeid, const uint256& hash) const +{ + PeerRef peer = GetPeerRef(nodeid); + if (peer == nullptr) + return false; + if (peer->m_block_relay_only) + return false; + LOCK(peer->m_tx_relay->m_tx_inventory_mutex); + return peer->m_tx_relay->m_tx_inventory_known_filter.contains(hash); +} + +void PeerManagerImpl::PushInventory(NodeId nodeid, const CInv& inv) +{ + // TODO: Get rid of this function at some point + PeerRef peer = GetPeerRef(nodeid); + if (peer == nullptr) + return; + PushInv(*peer, inv); +} + +void PeerManagerImpl::RelayInv(CInv &inv, const int minProtoVersion) +{ + // TODO: Migrate to iteration through m_peer_map m_connman.ForEachNode([&](CNode* pnode) { if (pnode->nVersion < minProtoVersion || !pnode->CanRelay()) return; - pnode->PushInventory(inv); + + PeerRef peer = GetPeerRef(pnode->GetId()); + if (peer == nullptr) return; + PushInv(*peer, inv); }); } void PeerManagerImpl::RelayInvFiltered(CInv &inv, const CTransaction& relatedTx, const int minProtoVersion) { + // TODO: Migrate to iteration through m_peer_map m_connman.ForEachNode([&](CNode* pnode) { if (pnode->nVersion < minProtoVersion || !pnode->CanRelay() || pnode->IsBlockOnlyConn()) { return; } + + PeerRef peer = GetPeerRef(pnode->GetId()); + if (peer == nullptr) return; { - LOCK(pnode->m_tx_relay->cs_filter); - if (!pnode->m_tx_relay->fRelayTxes) { + LOCK(peer->m_tx_relay->m_bloom_filter_mutex); + if (!peer->m_tx_relay->m_relay_txs) { return; } - if (pnode->m_tx_relay->pfilter && !pnode->m_tx_relay->pfilter->IsRelevantAndUpdate(relatedTx)) { + if (peer->m_tx_relay->m_bloom_filter && !peer->m_tx_relay->m_bloom_filter->IsRelevantAndUpdate(relatedTx)) { return; } } - pnode->PushInventory(inv); + PushInv(*peer, inv); }); } void PeerManagerImpl::RelayInvFiltered(CInv &inv, const uint256& relatedTxHash, const int minProtoVersion) { + // TODO: Migrate to iteration through m_peer_map m_connman.ForEachNode([&](CNode* pnode) { if (pnode->nVersion < minProtoVersion || !pnode->CanRelay() || pnode->IsBlockOnlyConn()) { return; } + + PeerRef peer = GetPeerRef(pnode->GetId()); + if (peer == nullptr) return; { - LOCK(pnode->m_tx_relay->cs_filter); - if (!pnode->m_tx_relay->fRelayTxes) { + LOCK(peer->m_tx_relay->m_bloom_filter_mutex); + if (!peer->m_tx_relay->m_relay_txs) { return; } - if (pnode->m_tx_relay->pfilter && !pnode->m_tx_relay->pfilter->contains(relatedTxHash)) { + if (peer->m_tx_relay->m_bloom_filter && !peer->m_tx_relay->m_bloom_filter->contains(relatedTxHash)) { return; } } - pnode->PushInventory(inv); + PushInv(*peer, inv); }); } void PeerManagerImpl::RelayTransaction(const uint256& txid) { - CInv inv(m_cj_ctx->dstxman->GetDSTX(txid) ? MSG_DSTX : MSG_TX, txid); - m_connman.ForEachNode([&inv](CNode* pnode) - { - pnode->PushInventory(inv); - }); + LOCK(m_peer_mutex); + for(auto& it : m_peer_map) { + Peer& peer = *it.second; + if (!peer.m_tx_relay) continue; + + const CInv inv{m_cj_ctx->dstxman->GetDSTX(txid) ? MSG_DSTX : MSG_TX, txid}; + PushInv(peer, inv); + }; } void PeerManagerImpl::RelayAddress(NodeId originator, @@ -2298,10 +2400,10 @@ void PeerManagerImpl::ProcessGetBlockData(CNode& pfrom, Peer& peer, const CInv& bool sendMerkleBlock = false; CMerkleBlock merkleBlock; if (!pfrom.IsBlockOnlyConn()) { - LOCK(pfrom.m_tx_relay->cs_filter); - if (pfrom.m_tx_relay->pfilter) { + LOCK(peer.m_tx_relay->m_bloom_filter_mutex); + if (peer.m_tx_relay->m_bloom_filter) { sendMerkleBlock = true; - merkleBlock = CMerkleBlock(*pblock, *pfrom.m_tx_relay->pfilter); + merkleBlock = CMerkleBlock(*pblock, *peer.m_tx_relay->m_bloom_filter); } } if (sendMerkleBlock) { @@ -2400,7 +2502,7 @@ void PeerManagerImpl::ProcessGetData(CNode& pfrom, Peer& peer, const std::atomic const std::chrono::seconds now = GetTime(); // Get last mempool request time - const std::chrono::seconds mempool_req = !pfrom.IsBlockOnlyConn() ? pfrom.m_tx_relay->m_last_mempool_req.load() + const std::chrono::seconds mempool_req = !pfrom.IsBlockOnlyConn() ? peer.m_tx_relay->m_last_mempool_req.load() : std::chrono::seconds::min(); // Process as many TX items from the front of the getdata queue as @@ -2462,7 +2564,7 @@ void PeerManagerImpl::ProcessGetData(CNode& pfrom, Peer& peer, const std::atomic for (const uint256& parent_txid : parent_ids_to_add) { // Relaying a transaction with a recent but unconfirmed parent. - if (WITH_LOCK(pfrom.m_tx_relay->cs_tx_inventory, return !pfrom.m_tx_relay->filterInventoryKnown.contains(parent_txid))) { + if (WITH_LOCK(peer.m_tx_relay->m_tx_inventory_mutex, return !peer.m_tx_relay->m_tx_inventory_known_filter.contains(parent_txid))) { LOCK(cs_main); State(pfrom.GetId())->m_recently_announced_invs.insert(parent_txid); } @@ -3220,8 +3322,9 @@ void PeerManagerImpl::ProcessMessage( } // Be shy and don't send version until we hear - if (pfrom.IsInboundConn()) - PushNodeVersion(pfrom, GetAdjustedTime()); + if (pfrom.IsInboundConn()) { + PushNodeVersion(pfrom, *peer); + } if (Params().NetworkIDString() == CBaseChainParams::DEVNET) { if (cleanSubVer.find(strprintf("devnet.%s", gArgs.GetDevNetName())) == std::string::npos) { @@ -3266,8 +3369,11 @@ void PeerManagerImpl::ProcessMessage( pfrom.m_limited_node = (!(nServices & NODE_NETWORK) && (nServices & NODE_NETWORK_LIMITED)); if (!pfrom.IsBlockOnlyConn()) { - LOCK(pfrom.m_tx_relay->cs_filter); - pfrom.m_tx_relay->fRelayTxes = fRelay; // set to true after we get the first filter* message + { + LOCK(peer->m_tx_relay->m_bloom_filter_mutex); + peer->m_tx_relay->m_relay_txs = fRelay; // set to true after we get the first filter* message + } + if (fRelay) pfrom.m_relays_txs = true; } // Potentially mark this peer as a preferred download peer. @@ -3656,7 +3762,7 @@ void PeerManagerImpl::ProcessMessage( MSG_SPORK }; - pfrom.AddKnownInventory(inv.hash); + AddKnownInv(*peer, inv.hash); if (fBlocksOnly && NetMessageViolatesBlocksOnly(inv.GetCommand())) { LogPrint(BCLog::NET, "%s (%s) inv sent in violation of protocol, disconnecting peer=%d\n", inv.GetCommand(), inv.hash.ToString(), pfrom.GetId()); pfrom.fDisconnect = true; @@ -3932,7 +4038,7 @@ void PeerManagerImpl::ProcessMessage( const CTransaction& tx = *ptx; const uint256& txid = ptx->GetHash(); - pfrom.AddKnownInventory(txid); + AddKnownInv(*peer, txid); CInv inv(nInvType, tx.GetHash()); { @@ -4026,11 +4132,11 @@ void PeerManagerImpl::ProcessMessage( for (const uint256& parent_txid : unique_parents) { CInv _inv(MSG_TX, parent_txid); - pfrom.AddKnownInventory(_inv.hash); + AddKnownInv(*peer, _inv.hash); if (!AlreadyHave(_inv)) RequestObject(State(pfrom.GetId()), _inv, current_time, is_masternode); // We don't know if the previous tx was a regular or a mixing one, try both CInv _inv2(MSG_DSTX, parent_txid); - pfrom.AddKnownInventory(_inv2.hash); + AddKnownInv(*peer, _inv2.hash); if (!AlreadyHave(_inv2)) RequestObject(State(pfrom.GetId()), _inv2, current_time, is_masternode); } AddOrphanTx(ptx, pfrom.GetId()); @@ -4488,8 +4594,8 @@ void PeerManagerImpl::ProcessMessage( } if (!pfrom.IsBlockOnlyConn()) { - LOCK(pfrom.m_tx_relay->cs_tx_inventory); - pfrom.m_tx_relay->fSendMempool = true; + LOCK(peer->m_tx_relay->m_tx_inventory_mutex); + peer->m_tx_relay->m_send_mempool = true; } return; } @@ -4583,9 +4689,13 @@ void PeerManagerImpl::ProcessMessage( } else if (!pfrom.IsBlockOnlyConn()) { - LOCK(pfrom.m_tx_relay->cs_filter); - pfrom.m_tx_relay->pfilter.reset(new CBloomFilter(filter)); - pfrom.m_tx_relay->fRelayTxes = true; + { + LOCK(peer->m_tx_relay->m_bloom_filter_mutex); + peer->m_tx_relay->m_bloom_filter.reset(new CBloomFilter(filter)); + peer->m_tx_relay->m_relay_txs = true; + } + pfrom.m_bloom_filter_loaded = true; + pfrom.m_relays_txs = true; } return; } @@ -4605,9 +4715,9 @@ void PeerManagerImpl::ProcessMessage( if (vData.size() > MAX_SCRIPT_ELEMENT_SIZE) { bad = true; } else if (!pfrom.IsBlockOnlyConn()) { - LOCK(pfrom.m_tx_relay->cs_filter); - if (pfrom.m_tx_relay->pfilter) { - pfrom.m_tx_relay->pfilter->insert(vData); + LOCK(peer->m_tx_relay->m_bloom_filter_mutex); + if (peer->m_tx_relay->m_bloom_filter) { + peer->m_tx_relay->m_bloom_filter->insert(vData); } else { bad = true; } @@ -4627,9 +4737,14 @@ void PeerManagerImpl::ProcessMessage( if (pfrom.IsBlockOnlyConn()) { return; } - LOCK(pfrom.m_tx_relay->cs_filter); - pfrom.m_tx_relay->pfilter = nullptr; - pfrom.m_tx_relay->fRelayTxes = true; + + { + LOCK(peer->m_tx_relay->m_bloom_filter_mutex); + peer->m_tx_relay->m_bloom_filter = nullptr; + peer->m_tx_relay->m_relay_txs = true; + } + pfrom.m_bloom_filter_loaded = false; + pfrom.m_relays_txs = true; return; } @@ -4747,7 +4862,7 @@ void PeerManagerImpl::ProcessMessage( ProcessPeerMsgRet(m_llmq_ctx->qdkgsman->ProcessMessage(pfrom, this, is_masternode, msg_type, vRecv), pfrom); ProcessPeerMsgRet(m_llmq_ctx->qman->ProcessMessage(pfrom, msg_type, vRecv), pfrom); m_llmq_ctx->shareman->ProcessMessage(pfrom, m_sporkman, msg_type, vRecv); - ProcessPeerMsgRet(m_llmq_ctx->sigman->ProcessMessage(pfrom, this, msg_type, vRecv), pfrom); + ProcessPeerMsgRet(m_llmq_ctx->sigman->ProcessMessage(pfrom, msg_type, vRecv), pfrom); ProcessPeerMsgRet(m_llmq_ctx->clhandler->ProcessMessage(pfrom, msg_type, vRecv), pfrom); ProcessPeerMsgRet(m_llmq_ctx->isman->ProcessMessage(pfrom, msg_type, vRecv), pfrom); return; @@ -5063,9 +5178,10 @@ void PeerManagerImpl::CheckForStaleTipAndEvictPeers() void PeerManagerImpl::MaybeSendPing(CNode& node_to, Peer& peer, std::chrono::microseconds now) { - if (m_connman.ShouldRunInactivityChecks(node_to, std::chrono::duration_cast(now).count()) && + if (m_connman.ShouldRunInactivityChecks(node_to, std::chrono::duration_cast(now)) && peer.m_ping_nonce_sent && - now > peer.m_ping_start.load() + std::chrono::seconds{TIMEOUT_INTERVAL}) { + now > peer.m_ping_start.load() + TIMEOUT_INTERVAL) + { // The ping timeout is using mocktime. To disable the check during // testing, increase -peertimeout. LogPrint(BCLog::NET, "ping timeout: %fs peer=%d\n", 0.000001 * count_microseconds(now - peer.m_ping_start.load()), peer.m_id); @@ -5431,8 +5547,8 @@ bool PeerManagerImpl::SendMessages(CNode* pto) size_t reserve = INVENTORY_BROADCAST_MAX_PER_1MB_BLOCK * MaxBlockSize() / 1000000; if (!pto->IsBlockOnlyConn()) { - LOCK(pto->m_tx_relay->cs_tx_inventory); - reserve = std::min(pto->m_tx_relay->setInventoryTxToSend.size(), reserve); + LOCK(peer->m_tx_relay->m_tx_inventory_mutex); + reserve = std::min(peer->m_tx_relay->m_tx_inventory_to_send.size(), reserve); } reserve = std::max(reserve, peer->m_blocks_for_inv_relay.size()); reserve = std::min(reserve, MAX_INV_SZ); @@ -5448,9 +5564,9 @@ bool PeerManagerImpl::SendMessages(CNode* pto) } peer->m_blocks_for_inv_relay.clear(); - auto queueAndMaybePushInv = [this, pto, &vInv, &msgMaker](const CInv& invIn) { - AssertLockHeld(pto->m_tx_relay->cs_tx_inventory); - pto->m_tx_relay->filterInventoryKnown.insert(invIn.hash); + auto queueAndMaybePushInv = [this, pto, peer, &vInv, &msgMaker](const CInv& invIn) { + AssertLockHeld(peer->m_tx_relay->m_tx_inventory_mutex); + peer->m_tx_relay->m_tx_inventory_known_filter.insert(invIn.hash); LogPrint(BCLog::NET, "SendMessages -- queued inv: %s index=%d peer=%d\n", invIn.ToString(), vInv.size(), pto->GetId()); // Responses to MEMPOOL requests bypass the m_recently_announced_invs filter. vInv.push_back(invIn); @@ -5462,41 +5578,41 @@ bool PeerManagerImpl::SendMessages(CNode* pto) }; if (!pto->IsBlockOnlyConn()) { - LOCK(pto->m_tx_relay->cs_tx_inventory); + LOCK(peer->m_tx_relay->m_tx_inventory_mutex); // Check whether periodic sends should happen // Note: If this node is running in a Masternode mode, it makes no sense to delay outgoing txes // because we never produce any txes ourselves i.e. no privacy is lost in this case. bool fSendTrickle = pto->HasPermission(NetPermissionFlags::NoBan) || is_masternode; - if (pto->m_tx_relay->nNextInvSend < current_time) { + if (peer->m_tx_relay->m_next_inv_send_time < current_time) { fSendTrickle = true; if (pto->IsInboundConn()) { - pto->m_tx_relay->nNextInvSend = m_connman.PoissonNextSendInbound(current_time, INBOUND_INVENTORY_BROADCAST_INTERVAL); + peer->m_tx_relay->m_next_inv_send_time = m_connman.PoissonNextSendInbound(current_time, INBOUND_INVENTORY_BROADCAST_INTERVAL); } else { // Use half the delay for Masternode outbound peers, as there is less privacy concern for them. - pto->m_tx_relay->nNextInvSend = pto->GetVerifiedProRegTxHash().IsNull() ? - PoissonNextSend(current_time, OUTBOUND_INVENTORY_BROADCAST_INTERVAL) : - PoissonNextSend(current_time, OUTBOUND_INVENTORY_BROADCAST_INTERVAL / 2); + peer->m_tx_relay->m_next_inv_send_time = pto->GetVerifiedProRegTxHash().IsNull() ? + PoissonNextSend(current_time, OUTBOUND_INVENTORY_BROADCAST_INTERVAL) : + PoissonNextSend(current_time, OUTBOUND_INVENTORY_BROADCAST_INTERVAL / 2); } } // Time to send but the peer has requested we not relay transactions. if (fSendTrickle) { - LOCK(pto->m_tx_relay->cs_filter); - if (!pto->m_tx_relay->fRelayTxes) pto->m_tx_relay->setInventoryTxToSend.clear(); + LOCK(peer->m_tx_relay->m_bloom_filter_mutex); + if (!peer->m_tx_relay->m_relay_txs) peer->m_tx_relay->m_tx_inventory_to_send.clear(); } // Respond to BIP35 mempool requests - if (fSendTrickle && pto->m_tx_relay->fSendMempool) { + if (fSendTrickle && peer->m_tx_relay->m_send_mempool) { auto vtxinfo = m_mempool.infoAll(); - pto->m_tx_relay->fSendMempool = false; + peer->m_tx_relay->m_send_mempool = false; - LOCK(pto->m_tx_relay->cs_filter); + LOCK(peer->m_tx_relay->m_bloom_filter_mutex); // Send invs for txes and corresponding IS-locks for (const auto& txinfo : vtxinfo) { const uint256& hash = txinfo.tx->GetHash(); - pto->m_tx_relay->setInventoryTxToSend.erase(hash); - if (pto->m_tx_relay->pfilter && !pto->m_tx_relay->pfilter->IsRelevantAndUpdate(*txinfo.tx)) continue; + peer->m_tx_relay->m_tx_inventory_to_send.erase(hash); + if (peer->m_tx_relay->m_bloom_filter && !peer->m_tx_relay->m_bloom_filter->IsRelevantAndUpdate(*txinfo.tx)) continue; int nInvType = m_cj_ctx->dstxman->GetDSTX(hash) ? MSG_DSTX : MSG_TX; queueAndMaybePushInv(CInv(nInvType, hash)); @@ -5513,17 +5629,17 @@ bool PeerManagerImpl::SendMessages(CNode* pto) uint256 chainlockHash = ::SerializeHash(clsig); queueAndMaybePushInv(CInv(MSG_CLSIG, chainlockHash)); } - pto->m_tx_relay->m_last_mempool_req = std::chrono::duration_cast(current_time); + peer->m_tx_relay->m_last_mempool_req = std::chrono::duration_cast(current_time); } // Determine transactions to relay if (fSendTrickle) { - LOCK(pto->m_tx_relay->cs_filter); + LOCK(peer->m_tx_relay->m_bloom_filter_mutex); // Produce a vector with all candidates for sending std::vector::iterator> vInvTx; - vInvTx.reserve(pto->m_tx_relay->setInventoryTxToSend.size()); - for (std::set::iterator it = pto->m_tx_relay->setInventoryTxToSend.begin(); it != pto->m_tx_relay->setInventoryTxToSend.end(); it++) { + vInvTx.reserve(peer->m_tx_relay->m_tx_inventory_to_send.size()); + for (std::set::iterator it = peer->m_tx_relay->m_tx_inventory_to_send.begin(); it != peer->m_tx_relay->m_tx_inventory_to_send.end(); it++) { vInvTx.push_back(it); } // Topologically and fee-rate sort the inventory we send for privacy and priority reasons. @@ -5540,9 +5656,9 @@ bool PeerManagerImpl::SendMessages(CNode* pto) vInvTx.pop_back(); uint256 hash = *it; // Remove it from the to-be-sent set - pto->m_tx_relay->setInventoryTxToSend.erase(it); + peer->m_tx_relay->m_tx_inventory_to_send.erase(it); // Check if not in the filter already - if (pto->m_tx_relay->filterInventoryKnown.contains(hash)) { + if (peer->m_tx_relay->m_tx_inventory_known_filter.contains(hash)) { continue; } // Not in the mempool anymore? don't bother sending it. @@ -5550,7 +5666,7 @@ bool PeerManagerImpl::SendMessages(CNode* pto) if (!txinfo.tx) { continue; } - if (pto->m_tx_relay->pfilter && !pto->m_tx_relay->pfilter->IsRelevantAndUpdate(*txinfo.tx)) continue; + if (peer->m_tx_relay->m_bloom_filter && !peer->m_tx_relay->m_bloom_filter->IsRelevantAndUpdate(*txinfo.tx)) continue; // Send State(pto->GetId())->m_recently_announced_invs.insert(hash); nRelayedTransactions++; @@ -5575,15 +5691,15 @@ bool PeerManagerImpl::SendMessages(CNode* pto) { // Send non-tx/non-block inventory items - LOCK2(pto->m_tx_relay->cs_tx_inventory, pto->m_tx_relay->cs_filter); + LOCK2(peer->m_tx_relay->m_tx_inventory_mutex, peer->m_tx_relay->m_bloom_filter_mutex); - bool fSendIS = pto->m_tx_relay->fRelayTxes && !pto->IsBlockRelayOnly(); + bool fSendIS = peer->m_tx_relay->m_relay_txs && !pto->IsBlockRelayOnly(); - for (const auto& inv : pto->m_tx_relay->vInventoryOtherToSend) { - if (!pto->m_tx_relay->fRelayTxes && NetMessageViolatesBlocksOnly(inv.GetCommand())) { + for (const auto& inv : peer->m_tx_relay->vInventoryOtherToSend) { + if (!peer->m_tx_relay->m_relay_txs && NetMessageViolatesBlocksOnly(inv.GetCommand())) { continue; } - if (pto->m_tx_relay->filterInventoryKnown.contains(inv.hash)) { + if (peer->m_tx_relay->m_tx_inventory_known_filter.contains(inv.hash)) { continue; } if (!fSendIS && inv.type == MSG_ISDLOCK) { @@ -5591,7 +5707,7 @@ bool PeerManagerImpl::SendMessages(CNode* pto) } queueAndMaybePushInv(inv); } - pto->m_tx_relay->vInventoryOtherToSend.clear(); + peer->m_tx_relay->vInventoryOtherToSend.clear(); } } if (!vInv.empty()) diff --git a/src/net_processing.h b/src/net_processing.h index f5826dd40d21f..47bccd83bf8f9 100644 --- a/src/net_processing.h +++ b/src/net_processing.h @@ -47,6 +47,7 @@ struct CNodeStateStats { int m_starting_height = -1; std::chrono::microseconds m_ping_wait; std::vector vHeightInFlight; + bool m_relay_txs; uint64_t m_addr_processed = 0; uint64_t m_addr_rate_limited = 0; bool m_addr_relay_enabled{false}; @@ -74,6 +75,12 @@ class PeerManager : public CValidationInterface, public NetEventsInterface /** Send ping message to all peers */ virtual void SendPings() = 0; + /** Is an inventory in the known inventory filter. Used by InstantSend. */ + virtual bool IsInvInFilter(NodeId nodeid, const uint256& hash) const = 0; + + /** Broadcast inventory message to a specific peer. */ + virtual void PushInventory(NodeId nodeid, const CInv& inv) = 0; + /** Relay inventories to all peers */ virtual void RelayInv(CInv &inv, const int minProtoVersion = MIN_PEER_PROTO_VERSION) = 0; virtual void RelayInvFiltered(CInv &inv, const CTransaction &relatedTx, @@ -110,9 +117,6 @@ class PeerManager : public CValidationInterface, public NetEventsInterface virtual bool IsBanned(NodeId pnode) = 0; - /* Can we send addr messages to a peer. Used by InstantSend. */ - virtual bool CanRelayAddrs(NodeId pnode) const = 0; - /** Whether we've completed initial sync yet, for determining when to turn * on extra block-relay-only peers. */ bool m_initial_sync_finished{false}; diff --git a/src/qt/forms/debugwindow.ui b/src/qt/forms/debugwindow.ui index 90f6d8eb34777..c4e9239b53a11 100644 --- a/src/qt/forms/debugwindow.ui +++ b/src/qt/forms/debugwindow.ui @@ -1270,10 +1270,10 @@ - Elapsed time since a novel transaction accepted into our mempool was received from this peer. + Elapsed time since a novel transaction accepted into our mempool was received from this peer. - Last Tx + Last Transaction @@ -1507,6 +1507,84 @@ + + + Whether we relay addresses to this peer. + + + Address Relay + + + + + + + IBeamCursor + + + N/A + + + Qt::PlainText + + + Qt::LinksAccessibleByMouse|Qt::TextSelectableByKeyboard|Qt::TextSelectableByMouse + + + + + + + Total number of addresses processed, excluding those dropped due to rate-limiting. + + + Addresses Processed + + + + + + + IBeamCursor + + + N/A + + + Qt::PlainText + + + Qt::LinksAccessibleByMouse|Qt::TextSelectableByKeyboard|Qt::TextSelectableByMouse + + + + + + + Total number of addresses dropped due to rate-limiting. + + + Addresses Rate-Limited + + + + + + + IBeamCursor + + + N/A + + + Qt::PlainText + + + Qt::LinksAccessibleByMouse|Qt::TextSelectableByKeyboard|Qt::TextSelectableByMouse + + + + Qt::Vertical diff --git a/src/qt/guiutil.cpp b/src/qt/guiutil.cpp index 8f79a7de3e721..f33a02244d876 100644 --- a/src/qt/guiutil.cpp +++ b/src/qt/guiutil.cpp @@ -23,6 +23,7 @@ #include