From 71092e0f2542e7aa8d008e05f61751139d4a3238 Mon Sep 17 00:00:00 2001 From: Alexander Block Date: Fri, 1 Mar 2019 08:21:09 +0100 Subject: [PATCH] Send/Receive multiple messages as part of one P2P message in CSigSharesManager (#2729) * Return bool in ProcessMessageXXX methods to indicate misbehaviour * Send/Receive multiple messages as part of one P2P message in CSigSharesManager Many messages, especially QSIGSHARESINV and QGETSIGSHARES, are very small by nature (5-14 bytes for a 50 members LLMQ). The message headers are 24 bytes, meaning that we produce a lot of overhead for these small messages. This sums up quite a bit when thousands of signing sessions are happening in parallel. This commit changes all related P2P messages to send a vector of messages instead of a single message. * Remove bogus lines Included these by accident * Unify handling of BanNode in ProcessMessageXXX methods * Remove bogus check for fMasternodeMode * Properly use == instead of misleading >= in SendMessages * Put "didSend = true" near PushMessage --- src/llmq/quorums_signing_shares.cpp | 174 ++++++++++++++++++++-------- src/llmq/quorums_signing_shares.h | 16 ++- 2 files changed, 139 insertions(+), 51 deletions(-) diff --git a/src/llmq/quorums_signing_shares.cpp b/src/llmq/quorums_signing_shares.cpp index 56d95e164793a..6404a556debca 100644 --- a/src/llmq/quorums_signing_shares.cpp +++ b/src/llmq/quorums_signing_shares.cpp @@ -222,34 +222,76 @@ void CSigSharesManager::ProcessMessage(CNode* pfrom, const std::string& strComma } if (strCommand == NetMsgType::QSIGSESANN) { - CSigSesAnn ann; - vRecv >> ann; - ProcessMessageSigSesAnn(pfrom, ann, connman); + std::vector msgs; + vRecv >> msgs; + if (msgs.size() > MAX_MSGS_CNT_QSIGSESANN) { + LogPrintf("llmq", "CSigSharesManager::%s -- too many announcements in QSIGSESANN message. cnt=%d, max=%d, node=%d\n", __func__, msgs.size(), MAX_MSGS_CNT_QSIGSESANN, pfrom->GetId()); + BanNode(pfrom->GetId()); + return; + } + for (auto& ann : msgs) { + if (!ProcessMessageSigSesAnn(pfrom, ann, connman)) { + BanNode(pfrom->GetId()); + return; + } + } } else if (strCommand == NetMsgType::QSIGSHARESINV) { - CSigSharesInv inv; - vRecv >> inv; - ProcessMessageSigSharesInv(pfrom, inv, connman); + std::vector msgs; + vRecv >> msgs; + if (msgs.size() > MAX_MSGS_CNT_QSIGSHARESINV) { + LogPrintf("llmq", "CSigSharesManager::%s -- too many invs in QSIGSHARESINV message. cnt=%d, max=%d, node=%d\n", __func__, msgs.size(), MAX_MSGS_CNT_QSIGSHARESINV, pfrom->GetId()); + BanNode(pfrom->GetId()); + return; + } + for (auto& inv : msgs) { + if (!ProcessMessageSigSharesInv(pfrom, inv, connman)) { + BanNode(pfrom->GetId()); + return; + } + } } else if (strCommand == NetMsgType::QGETSIGSHARES) { - CSigSharesInv inv; - vRecv >> inv; - ProcessMessageGetSigShares(pfrom, inv, connman); + std::vector msgs; + vRecv >> msgs; + if (msgs.size() > MAX_MSGS_CNT_QGETSIGSHARES) { + LogPrintf("llmq", "CSigSharesManager::%s -- too many invs in QGETSIGSHARES message. cnt=%d, max=%d, node=%d\n", __func__, msgs.size(), MAX_MSGS_CNT_QGETSIGSHARES, pfrom->GetId()); + BanNode(pfrom->GetId()); + return; + } + for (auto& inv : msgs) { + if (!ProcessMessageGetSigShares(pfrom, inv, connman)) { + BanNode(pfrom->GetId()); + return; + } + } } else if (strCommand == NetMsgType::QBSIGSHARES) { - CBatchedSigShares batchedSigShares; - vRecv >> batchedSigShares; - ProcessMessageBatchedSigShares(pfrom, batchedSigShares, connman); + std::vector msgs; + vRecv >> msgs; + size_t totalSigsCount = 0; + for (auto& bs : msgs) { + totalSigsCount += bs.sigShares.size(); + } + if (totalSigsCount > MAX_MSGS_TOTAL_BATCHED_SIGS) { + LogPrintf("llmq", "CSigSharesManager::%s -- too many sigs in QBSIGSHARES message. cnt=%d, max=%d, node=%d\n", __func__, msgs.size(), MAX_MSGS_TOTAL_BATCHED_SIGS, pfrom->GetId()); + BanNode(pfrom->GetId()); + return; + } + for (auto& bs : msgs) { + if (!ProcessMessageBatchedSigShares(pfrom, bs, connman)) { + BanNode(pfrom->GetId()); + return; + } + } } } -void CSigSharesManager::ProcessMessageSigSesAnn(CNode* pfrom, const CSigSesAnn& ann, CConnman& connman) +bool CSigSharesManager::ProcessMessageSigSesAnn(CNode* pfrom, const CSigSesAnn& ann, CConnman& connman) { auto llmqType = (Consensus::LLMQType)ann.llmqType; if (!Params().GetConsensus().llmqs.count(llmqType)) { - BanNode(pfrom->GetId()); - return; + return false; } if (ann.sessionId == (uint32_t)-1 || ann.quorumHash.IsNull() || ann.id.IsNull() || ann.msgHash.IsNull()) { - BanNode(pfrom->GetId()); - return; + return false; } LogPrintf("llmq", "CSigSharesManager::%s -- ann={%s}, node=%d\n", __func__, ann.ToString(), pfrom->GetId()); @@ -259,7 +301,7 @@ void CSigSharesManager::ProcessMessageSigSesAnn(CNode* pfrom, const CSigSesAnn& // TODO should we ban here? LogPrintf("CSigSharesManager::%s -- quorum %s not found, node=%d\n", __func__, ann.quorumHash.ToString(), pfrom->GetId()); - return; + return true; // let's still try other announcements from the same message } auto signHash = llmq::utils::BuildSignHash(llmqType, ann.quorumHash, ann.id, ann.msgHash); @@ -272,37 +314,34 @@ void CSigSharesManager::ProcessMessageSigSesAnn(CNode* pfrom, const CSigSesAnn& session.recvSessionId = ann.sessionId; session.quorum = quorum; nodeState.sessionByRecvId.emplace(ann.sessionId, &session); + + return true; } bool CSigSharesManager::VerifySigSharesInv(NodeId from, Consensus::LLMQType llmqType, const CSigSharesInv& inv) { - if (!fMasterNode || activeMasternodeManager->GetProTx().IsNull()) { - return false; - } - size_t quorumSize = (size_t)Params().GetConsensus().llmqs.at(llmqType).size; if (inv.inv.size() != quorumSize) { - BanNode(from); return false; } return true; } -void CSigSharesManager::ProcessMessageSigSharesInv(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman) +bool CSigSharesManager::ProcessMessageSigSharesInv(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman) { CSigSharesNodeState::SessionInfo sessionInfo; if (!GetSessionInfoByRecvId(pfrom->GetId(), inv.sessionId, sessionInfo)) { - return; + return true; } if (!VerifySigSharesInv(pfrom->GetId(), sessionInfo.llmqType, inv)) { - return; + return false; } // TODO for PoSe, we should consider propagating shares even if we already have a recovered sig if (quorumSigningManager->HasRecoveredSigForSession(sessionInfo.signHash)) { - return; + return true; } LogPrintf("llmq", "CSigSharesManager::%s -- signHash=%s, inv={%s}, node=%d\n", __func__, @@ -312,26 +351,27 @@ void CSigSharesManager::ProcessMessageSigSharesInv(CNode* pfrom, const CSigShare auto& nodeState = nodeStates[pfrom->GetId()]; auto session = nodeState.GetSessionByRecvId(inv.sessionId); if (!session) { - return; + return true; } session->announced.Merge(inv); session->knows.Merge(inv); + return true; } -void CSigSharesManager::ProcessMessageGetSigShares(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman) +bool CSigSharesManager::ProcessMessageGetSigShares(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman) { CSigSharesNodeState::SessionInfo sessionInfo; if (!GetSessionInfoByRecvId(pfrom->GetId(), inv.sessionId, sessionInfo)) { - return; + return true; } if (!VerifySigSharesInv(pfrom->GetId(), sessionInfo.llmqType, inv)) { - return; + return false; } // TODO for PoSe, we should consider propagating shares even if we already have a recovered sig if (quorumSigningManager->HasRecoveredSigForSession(sessionInfo.signHash)) { - return; + return true; } LogPrintf("llmq", "CSigSharesManager::%s -- signHash=%s, inv={%s}, node=%d\n", __func__, @@ -341,26 +381,23 @@ void CSigSharesManager::ProcessMessageGetSigShares(CNode* pfrom, const CSigShare auto& nodeState = nodeStates[pfrom->GetId()]; auto session = nodeState.GetSessionByRecvId(inv.sessionId); if (!session) { - return; + return true; } session->requested.Merge(inv); session->knows.Merge(inv); + return true; } -void CSigSharesManager::ProcessMessageBatchedSigShares(CNode* pfrom, const CBatchedSigShares& batchedSigShares, CConnman& connman) +bool CSigSharesManager::ProcessMessageBatchedSigShares(CNode* pfrom, const CBatchedSigShares& batchedSigShares, CConnman& connman) { CSigSharesNodeState::SessionInfo sessionInfo; if (!GetSessionInfoByRecvId(pfrom->GetId(), batchedSigShares.sessionId, sessionInfo)) { - return; + return true; } bool ban = false; if (!PreVerifyBatchedSigShares(pfrom->GetId(), sessionInfo, batchedSigShares, ban)) { - if (ban) { - BanNode(pfrom->GetId()); - return; - } - return; + return ban; } std::vector sigShares; @@ -395,7 +432,7 @@ void CSigSharesManager::ProcessMessageBatchedSigShares(CNode* pfrom, const CBatc sessionInfo.signHash.ToString(), batchedSigShares.sigShares.size(), sigShares.size(), batchedSigShares.ToInv(sessionInfo.llmqType).ToString(), pfrom->GetId()); if (sigShares.empty()) { - return; + return true; } LOCK(cs); @@ -403,6 +440,7 @@ void CSigSharesManager::ProcessMessageBatchedSigShares(CNode* pfrom, const CBatc for (auto& s : sigShares) { nodeState.pendingIncomingSigShares.Add(s.GetKey(), s); } + return true; } bool CSigSharesManager::PreVerifyBatchedSigShares(NodeId nodeId, const CSigSharesNodeState::SessionInfo& session, const CBatchedSigShares& batchedSigShares, bool& retBan) @@ -1004,47 +1042,89 @@ bool CSigSharesManager::SendMessages() auto it1 = sigSessionAnnouncements.find(pnode->GetId()); if (it1 != sigSessionAnnouncements.end()) { + std::vector msgs; + msgs.reserve(it1->second.size()); for (auto& sigSesAnn : it1->second) { LogPrintf("llmq", "CSigSharesManager::SendMessages -- QSIGSESANN signHash=%s, sessionId=%d, node=%d\n", llmq::utils::BuildSignHash(sigSesAnn).ToString(), sigSesAnn.sessionId, pnode->GetId()); - g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSESANN, sigSesAnn)); + msgs.emplace_back(sigSesAnn); + if (msgs.size() == MAX_MSGS_CNT_QSIGSESANN) { + g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSESANN, msgs)); + msgs.clear(); + didSend = true; + } + } + if (!msgs.empty()) { + g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSESANN, msgs)); didSend = true; } } auto it = sigSharesToRequest.find(pnode->GetId()); if (it != sigSharesToRequest.end()) { + std::vector msgs; for (auto& p : it->second) { assert(p.second.CountSet() != 0); - LogPrintf("llmq", "CSigSharesManager::SendMessages -- QGETSIGSHARES inv={%s}, node=%d\n", - p.second.ToString(), pnode->GetId()); - g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QGETSIGSHARES, p.second)); + LogPrintf("llmq", "CSigSharesManager::SendMessages -- QGETSIGSHARES signHash=%s, inv={%s}, node=%d\n", + p.first.ToString(), p.second.ToString(), pnode->GetId()); + msgs.emplace_back(std::move(p.second)); + if (msgs.size() == MAX_MSGS_CNT_QGETSIGSHARES) { + g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QGETSIGSHARES, msgs)); + msgs.clear(); + didSend = true; + } + } + if (!msgs.empty()) { + g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QGETSIGSHARES, msgs)); didSend = true; } } auto jt = sigSharesToSend.find(pnode->GetId()); if (jt != sigSharesToSend.end()) { + size_t totalSigsCount = 0; + std::vector msgs; for (auto& p : jt->second) { assert(!p.second.sigShares.empty()); if (LogAcceptCategory(BCLog::LogFlags::LLMQ)) { LOCK(cs); auto session = nodeStates[pnode->GetId()].GetSessionBySignHash(p.first); + assert(session); LogPrintf("llmq", "CSigSharesManager::SendMessages -- QBSIGSHARES signHash=%s, inv={%s}, node=%d\n", p.first.ToString(), p.second.ToInv(session->llmqType).ToString(), pnode->GetId()); } - g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QBSIGSHARES, p.second)); + + if (totalSigsCount + p.second.sigShares.size() > MAX_MSGS_TOTAL_BATCHED_SIGS) { + g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QBSIGSHARES, msgs)); + msgs.clear(); + totalSigsCount = 0; + didSend = true; + } + totalSigsCount += p.second.sigShares.size(); + msgs.emplace_back(std::move(p.second)); + } + if (!msgs.empty()) { + g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QBSIGSHARES, std::move(msgs))); didSend = true; } } auto kt = sigSharesToAnnounce.find(pnode->GetId()); if (kt != sigSharesToAnnounce.end()) { + std::vector msgs; for (auto& p : kt->second) { assert(p.second.CountSet() != 0); LogPrintf("llmq", "CSigSharesManager::SendMessages -- QSIGSHARESINV signHash=%s, inv={%s}, node=%d\n", p.first.ToString(), p.second.ToString(), pnode->GetId()); - g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARESINV, p.second)); + msgs.emplace_back(std::move(p.second)); + if (msgs.size() == MAX_MSGS_CNT_QSIGSHARESINV) { + g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARESINV, msgs)); + msgs.clear(); + didSend = true; + } + } + if (!msgs.empty()) { + g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARESINV, msgs)); didSend = true; } } diff --git a/src/llmq/quorums_signing_shares.h b/src/llmq/quorums_signing_shares.h index d83e6da8e9995..ed47f92b9193a 100644 --- a/src/llmq/quorums_signing_shares.h +++ b/src/llmq/quorums_signing_shares.h @@ -329,6 +329,13 @@ class CSigSharesManager : public CRecoveredSigsListener static const int64_t SESSION_TOTAL_TIMEOUT = 5 * 60 * 1000; static const int64_t SIG_SHARE_REQUEST_TIMEOUT = 5 * 1000; + // we try to keep total message size below 10k + const size_t MAX_MSGS_CNT_QSIGSESANN = 100; + const size_t MAX_MSGS_CNT_QGETSIGSHARES = 200; + const size_t MAX_MSGS_CNT_QSIGSHARESINV = 200; + // 400 is the maximum quorum size, so this is also the maximum number of sigs we need to support + const size_t MAX_MSGS_TOTAL_BATCHED_SIGS = 400; + private: RecursiveMutex cs; @@ -370,10 +377,11 @@ class CSigSharesManager : public CRecoveredSigsListener void HandleNewRecoveredSig(const CRecoveredSig& recoveredSig); private: - void ProcessMessageSigSesAnn(CNode* pfrom, const CSigSesAnn& ann, CConnman& connman); - void ProcessMessageSigSharesInv(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman); - void ProcessMessageGetSigShares(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman); - void ProcessMessageBatchedSigShares(CNode* pfrom, const CBatchedSigShares& batchedSigShares, CConnman& connman); + // all of these return false when the currently processed message should be aborted (as each message actually contains multiple messages) + bool ProcessMessageSigSesAnn(CNode* pfrom, const CSigSesAnn& ann, CConnman& connman); + bool ProcessMessageSigSharesInv(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman); + bool ProcessMessageGetSigShares(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman); + bool ProcessMessageBatchedSigShares(CNode* pfrom, const CBatchedSigShares& batchedSigShares, CConnman& connman); bool VerifySigSharesInv(NodeId from, Consensus::LLMQType llmqType, const CSigSharesInv& inv); bool PreVerifyBatchedSigShares(NodeId nodeId, const CSigSharesNodeState::SessionInfo& session, const CBatchedSigShares& batchedSigShares, bool& retBan);