Skip to content

Commit

Permalink
Send/Receive multiple messages as part of one P2P message in CSigShar…
Browse files Browse the repository at this point in the history
…esManager (PIVX-Project#2729)

* Return bool in ProcessMessageXXX methods to indicate misbehaviour

* Send/Receive multiple messages as part of one P2P message in CSigSharesManager

Many messages, especially QSIGSHARESINV and QGETSIGSHARES, are very small
by nature (5-14 bytes for a 50 members LLMQ). The message headers are
24 bytes, meaning that we produce a lot of overhead for these small messages.
This sums up quite a bit when thousands of signing sessions are happening
in parallel.

This commit changes all related P2P messages to send a vector of messages
instead of a single message.

* Remove bogus lines

Included these by accident

* Unify handling of BanNode in ProcessMessageXXX methods

* Remove bogus check for fMasternodeMode

* Properly use == instead of misleading >= in SendMessages

* Put "didSend = true" near PushMessage
  • Loading branch information
codablock authored and panleone committed Oct 29, 2024
1 parent e73c238 commit 71092e0
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 51 deletions.
174 changes: 127 additions & 47 deletions src/llmq/quorums_signing_shares.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,34 +222,76 @@ void CSigSharesManager::ProcessMessage(CNode* pfrom, const std::string& strComma
}

if (strCommand == NetMsgType::QSIGSESANN) {
CSigSesAnn ann;
vRecv >> ann;
ProcessMessageSigSesAnn(pfrom, ann, connman);
std::vector<CSigSesAnn> msgs;
vRecv >> msgs;
if (msgs.size() > MAX_MSGS_CNT_QSIGSESANN) {
LogPrintf("llmq", "CSigSharesManager::%s -- too many announcements in QSIGSESANN message. cnt=%d, max=%d, node=%d\n", __func__, msgs.size(), MAX_MSGS_CNT_QSIGSESANN, pfrom->GetId());
BanNode(pfrom->GetId());
return;
}
for (auto& ann : msgs) {
if (!ProcessMessageSigSesAnn(pfrom, ann, connman)) {
BanNode(pfrom->GetId());
return;
}
}
} else if (strCommand == NetMsgType::QSIGSHARESINV) {
CSigSharesInv inv;
vRecv >> inv;
ProcessMessageSigSharesInv(pfrom, inv, connman);
std::vector<CSigSharesInv> msgs;
vRecv >> msgs;
if (msgs.size() > MAX_MSGS_CNT_QSIGSHARESINV) {
LogPrintf("llmq", "CSigSharesManager::%s -- too many invs in QSIGSHARESINV message. cnt=%d, max=%d, node=%d\n", __func__, msgs.size(), MAX_MSGS_CNT_QSIGSHARESINV, pfrom->GetId());
BanNode(pfrom->GetId());
return;
}
for (auto& inv : msgs) {
if (!ProcessMessageSigSharesInv(pfrom, inv, connman)) {
BanNode(pfrom->GetId());
return;
}
}
} else if (strCommand == NetMsgType::QGETSIGSHARES) {
CSigSharesInv inv;
vRecv >> inv;
ProcessMessageGetSigShares(pfrom, inv, connman);
std::vector<CSigSharesInv> msgs;
vRecv >> msgs;
if (msgs.size() > MAX_MSGS_CNT_QGETSIGSHARES) {
LogPrintf("llmq", "CSigSharesManager::%s -- too many invs in QGETSIGSHARES message. cnt=%d, max=%d, node=%d\n", __func__, msgs.size(), MAX_MSGS_CNT_QGETSIGSHARES, pfrom->GetId());
BanNode(pfrom->GetId());
return;
}
for (auto& inv : msgs) {
if (!ProcessMessageGetSigShares(pfrom, inv, connman)) {
BanNode(pfrom->GetId());
return;
}
}
} else if (strCommand == NetMsgType::QBSIGSHARES) {
CBatchedSigShares batchedSigShares;
vRecv >> batchedSigShares;
ProcessMessageBatchedSigShares(pfrom, batchedSigShares, connman);
std::vector<CBatchedSigShares> msgs;
vRecv >> msgs;
size_t totalSigsCount = 0;
for (auto& bs : msgs) {
totalSigsCount += bs.sigShares.size();
}
if (totalSigsCount > MAX_MSGS_TOTAL_BATCHED_SIGS) {
LogPrintf("llmq", "CSigSharesManager::%s -- too many sigs in QBSIGSHARES message. cnt=%d, max=%d, node=%d\n", __func__, msgs.size(), MAX_MSGS_TOTAL_BATCHED_SIGS, pfrom->GetId());
BanNode(pfrom->GetId());
return;
}
for (auto& bs : msgs) {
if (!ProcessMessageBatchedSigShares(pfrom, bs, connman)) {
BanNode(pfrom->GetId());
return;
}
}
}
}

void CSigSharesManager::ProcessMessageSigSesAnn(CNode* pfrom, const CSigSesAnn& ann, CConnman& connman)
bool CSigSharesManager::ProcessMessageSigSesAnn(CNode* pfrom, const CSigSesAnn& ann, CConnman& connman)
{
auto llmqType = (Consensus::LLMQType)ann.llmqType;
if (!Params().GetConsensus().llmqs.count(llmqType)) {
BanNode(pfrom->GetId());
return;
return false;
}
if (ann.sessionId == (uint32_t)-1 || ann.quorumHash.IsNull() || ann.id.IsNull() || ann.msgHash.IsNull()) {
BanNode(pfrom->GetId());
return;
return false;
}

LogPrintf("llmq", "CSigSharesManager::%s -- ann={%s}, node=%d\n", __func__, ann.ToString(), pfrom->GetId());
Expand All @@ -259,7 +301,7 @@ void CSigSharesManager::ProcessMessageSigSesAnn(CNode* pfrom, const CSigSesAnn&
// TODO should we ban here?
LogPrintf("CSigSharesManager::%s -- quorum %s not found, node=%d\n", __func__,
ann.quorumHash.ToString(), pfrom->GetId());
return;
return true; // let's still try other announcements from the same message
}

auto signHash = llmq::utils::BuildSignHash(llmqType, ann.quorumHash, ann.id, ann.msgHash);
Expand All @@ -272,37 +314,34 @@ void CSigSharesManager::ProcessMessageSigSesAnn(CNode* pfrom, const CSigSesAnn&
session.recvSessionId = ann.sessionId;
session.quorum = quorum;
nodeState.sessionByRecvId.emplace(ann.sessionId, &session);

return true;
}

bool CSigSharesManager::VerifySigSharesInv(NodeId from, Consensus::LLMQType llmqType, const CSigSharesInv& inv)
{
if (!fMasterNode || activeMasternodeManager->GetProTx().IsNull()) {
return false;
}

size_t quorumSize = (size_t)Params().GetConsensus().llmqs.at(llmqType).size;

if (inv.inv.size() != quorumSize) {
BanNode(from);
return false;
}
return true;
}

void CSigSharesManager::ProcessMessageSigSharesInv(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman)
bool CSigSharesManager::ProcessMessageSigSharesInv(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman)
{
CSigSharesNodeState::SessionInfo sessionInfo;
if (!GetSessionInfoByRecvId(pfrom->GetId(), inv.sessionId, sessionInfo)) {
return;
return true;
}

if (!VerifySigSharesInv(pfrom->GetId(), sessionInfo.llmqType, inv)) {
return;
return false;
}

// TODO for PoSe, we should consider propagating shares even if we already have a recovered sig
if (quorumSigningManager->HasRecoveredSigForSession(sessionInfo.signHash)) {
return;
return true;
}

LogPrintf("llmq", "CSigSharesManager::%s -- signHash=%s, inv={%s}, node=%d\n", __func__,
Expand All @@ -312,26 +351,27 @@ void CSigSharesManager::ProcessMessageSigSharesInv(CNode* pfrom, const CSigShare
auto& nodeState = nodeStates[pfrom->GetId()];
auto session = nodeState.GetSessionByRecvId(inv.sessionId);
if (!session) {
return;
return true;
}
session->announced.Merge(inv);
session->knows.Merge(inv);
return true;
}

void CSigSharesManager::ProcessMessageGetSigShares(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman)
bool CSigSharesManager::ProcessMessageGetSigShares(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman)
{
CSigSharesNodeState::SessionInfo sessionInfo;
if (!GetSessionInfoByRecvId(pfrom->GetId(), inv.sessionId, sessionInfo)) {
return;
return true;
}

if (!VerifySigSharesInv(pfrom->GetId(), sessionInfo.llmqType, inv)) {
return;
return false;
}

// TODO for PoSe, we should consider propagating shares even if we already have a recovered sig
if (quorumSigningManager->HasRecoveredSigForSession(sessionInfo.signHash)) {
return;
return true;
}

LogPrintf("llmq", "CSigSharesManager::%s -- signHash=%s, inv={%s}, node=%d\n", __func__,
Expand All @@ -341,26 +381,23 @@ void CSigSharesManager::ProcessMessageGetSigShares(CNode* pfrom, const CSigShare
auto& nodeState = nodeStates[pfrom->GetId()];
auto session = nodeState.GetSessionByRecvId(inv.sessionId);
if (!session) {
return;
return true;
}
session->requested.Merge(inv);
session->knows.Merge(inv);
return true;
}

void CSigSharesManager::ProcessMessageBatchedSigShares(CNode* pfrom, const CBatchedSigShares& batchedSigShares, CConnman& connman)
bool CSigSharesManager::ProcessMessageBatchedSigShares(CNode* pfrom, const CBatchedSigShares& batchedSigShares, CConnman& connman)
{
CSigSharesNodeState::SessionInfo sessionInfo;
if (!GetSessionInfoByRecvId(pfrom->GetId(), batchedSigShares.sessionId, sessionInfo)) {
return;
return true;
}

bool ban = false;
if (!PreVerifyBatchedSigShares(pfrom->GetId(), sessionInfo, batchedSigShares, ban)) {
if (ban) {
BanNode(pfrom->GetId());
return;
}
return;
return ban;
}

std::vector<CSigShare> sigShares;
Expand Down Expand Up @@ -395,14 +432,15 @@ void CSigSharesManager::ProcessMessageBatchedSigShares(CNode* pfrom, const CBatc
sessionInfo.signHash.ToString(), batchedSigShares.sigShares.size(), sigShares.size(), batchedSigShares.ToInv(sessionInfo.llmqType).ToString(), pfrom->GetId());

if (sigShares.empty()) {
return;
return true;
}

LOCK(cs);
auto& nodeState = nodeStates[pfrom->GetId()];
for (auto& s : sigShares) {
nodeState.pendingIncomingSigShares.Add(s.GetKey(), s);
}
return true;
}

bool CSigSharesManager::PreVerifyBatchedSigShares(NodeId nodeId, const CSigSharesNodeState::SessionInfo& session, const CBatchedSigShares& batchedSigShares, bool& retBan)
Expand Down Expand Up @@ -1004,47 +1042,89 @@ bool CSigSharesManager::SendMessages()

auto it1 = sigSessionAnnouncements.find(pnode->GetId());
if (it1 != sigSessionAnnouncements.end()) {
std::vector<CSigSesAnn> msgs;
msgs.reserve(it1->second.size());
for (auto& sigSesAnn : it1->second) {
LogPrintf("llmq", "CSigSharesManager::SendMessages -- QSIGSESANN signHash=%s, sessionId=%d, node=%d\n",
llmq::utils::BuildSignHash(sigSesAnn).ToString(), sigSesAnn.sessionId, pnode->GetId());
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSESANN, sigSesAnn));
msgs.emplace_back(sigSesAnn);
if (msgs.size() == MAX_MSGS_CNT_QSIGSESANN) {
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSESANN, msgs));
msgs.clear();
didSend = true;
}
}
if (!msgs.empty()) {
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSESANN, msgs));
didSend = true;
}
}

auto it = sigSharesToRequest.find(pnode->GetId());
if (it != sigSharesToRequest.end()) {
std::vector<CSigSharesInv> msgs;
for (auto& p : it->second) {
assert(p.second.CountSet() != 0);
LogPrintf("llmq", "CSigSharesManager::SendMessages -- QGETSIGSHARES inv={%s}, node=%d\n",
p.second.ToString(), pnode->GetId());
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QGETSIGSHARES, p.second));
LogPrintf("llmq", "CSigSharesManager::SendMessages -- QGETSIGSHARES signHash=%s, inv={%s}, node=%d\n",
p.first.ToString(), p.second.ToString(), pnode->GetId());
msgs.emplace_back(std::move(p.second));
if (msgs.size() == MAX_MSGS_CNT_QGETSIGSHARES) {
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QGETSIGSHARES, msgs));
msgs.clear();
didSend = true;
}
}
if (!msgs.empty()) {
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QGETSIGSHARES, msgs));
didSend = true;
}
}

auto jt = sigSharesToSend.find(pnode->GetId());
if (jt != sigSharesToSend.end()) {
size_t totalSigsCount = 0;
std::vector<CBatchedSigShares> msgs;
for (auto& p : jt->second) {
assert(!p.second.sigShares.empty());
if (LogAcceptCategory(BCLog::LogFlags::LLMQ)) {
LOCK(cs);
auto session = nodeStates[pnode->GetId()].GetSessionBySignHash(p.first);
assert(session);
LogPrintf("llmq", "CSigSharesManager::SendMessages -- QBSIGSHARES signHash=%s, inv={%s}, node=%d\n",
p.first.ToString(), p.second.ToInv(session->llmqType).ToString(), pnode->GetId());
}
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QBSIGSHARES, p.second));

if (totalSigsCount + p.second.sigShares.size() > MAX_MSGS_TOTAL_BATCHED_SIGS) {
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QBSIGSHARES, msgs));
msgs.clear();
totalSigsCount = 0;
didSend = true;
}
totalSigsCount += p.second.sigShares.size();
msgs.emplace_back(std::move(p.second));
}
if (!msgs.empty()) {
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QBSIGSHARES, std::move(msgs)));
didSend = true;
}
}

auto kt = sigSharesToAnnounce.find(pnode->GetId());
if (kt != sigSharesToAnnounce.end()) {
std::vector<CSigSharesInv> msgs;
for (auto& p : kt->second) {
assert(p.second.CountSet() != 0);
LogPrintf("llmq", "CSigSharesManager::SendMessages -- QSIGSHARESINV signHash=%s, inv={%s}, node=%d\n",
p.first.ToString(), p.second.ToString(), pnode->GetId());
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARESINV, p.second));
msgs.emplace_back(std::move(p.second));
if (msgs.size() == MAX_MSGS_CNT_QSIGSHARESINV) {
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARESINV, msgs));
msgs.clear();
didSend = true;
}
}
if (!msgs.empty()) {
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARESINV, msgs));
didSend = true;
}
}
Expand Down
16 changes: 12 additions & 4 deletions src/llmq/quorums_signing_shares.h
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,13 @@ class CSigSharesManager : public CRecoveredSigsListener
static const int64_t SESSION_TOTAL_TIMEOUT = 5 * 60 * 1000;
static const int64_t SIG_SHARE_REQUEST_TIMEOUT = 5 * 1000;

// we try to keep total message size below 10k
const size_t MAX_MSGS_CNT_QSIGSESANN = 100;
const size_t MAX_MSGS_CNT_QGETSIGSHARES = 200;
const size_t MAX_MSGS_CNT_QSIGSHARESINV = 200;
// 400 is the maximum quorum size, so this is also the maximum number of sigs we need to support
const size_t MAX_MSGS_TOTAL_BATCHED_SIGS = 400;

private:
RecursiveMutex cs;

Expand Down Expand Up @@ -370,10 +377,11 @@ class CSigSharesManager : public CRecoveredSigsListener
void HandleNewRecoveredSig(const CRecoveredSig& recoveredSig);

private:
void ProcessMessageSigSesAnn(CNode* pfrom, const CSigSesAnn& ann, CConnman& connman);
void ProcessMessageSigSharesInv(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman);
void ProcessMessageGetSigShares(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman);
void ProcessMessageBatchedSigShares(CNode* pfrom, const CBatchedSigShares& batchedSigShares, CConnman& connman);
// all of these return false when the currently processed message should be aborted (as each message actually contains multiple messages)
bool ProcessMessageSigSesAnn(CNode* pfrom, const CSigSesAnn& ann, CConnman& connman);
bool ProcessMessageSigSharesInv(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman);
bool ProcessMessageGetSigShares(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman);
bool ProcessMessageBatchedSigShares(CNode* pfrom, const CBatchedSigShares& batchedSigShares, CConnman& connman);

bool VerifySigSharesInv(NodeId from, Consensus::LLMQType llmqType, const CSigSharesInv& inv);
bool PreVerifyBatchedSigShares(NodeId nodeId, const CSigSharesNodeState::SessionInfo& session, const CBatchedSigShares& batchedSigShares, bool& retBan);
Expand Down

0 comments on commit 71092e0

Please sign in to comment.