Skip to content

Commit

Permalink
fix: drop CConnman::mapNodesWithDataToSend, use transport data
Browse files Browse the repository at this point in the history
  • Loading branch information
UdjinM6 authored and kwvg committed Oct 15, 2024
1 parent d39d8a4 commit f9f8805
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 78 deletions.
109 changes: 34 additions & 75 deletions src/net.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -661,13 +661,6 @@ void CNode::CloseSocketDisconnect(CConnman* connman)
connman->mapReceivableNodes.erase(GetId());
connman->mapSendableNodes.erase(GetId());
}
{
LOCK(connman->cs_mapNodesWithDataToSend);
if (connman->mapNodesWithDataToSend.erase(GetId()) != 0) {
// See comment in PushMessage
Release();
}
}

if (connman->m_edge_trig_events && !connman->m_edge_trig_events->UnregisterEvents(m_sock->Get())) {
LogPrint(BCLog::NET, "EdgeTriggeredEvents::UnregisterEvents() failed\n");
Expand Down Expand Up @@ -2521,17 +2514,11 @@ void CConnman::SocketHandler(CMasternodeSync& mn_sync)
LOCK2(m_nodes_mutex, cs_sendable_receivable_nodes);
if (!mapReceivableNodes.empty()) {
return true;
} else if (!mapSendableNodes.empty()) {
if (LOCK(cs_mapNodesWithDataToSend); !mapNodesWithDataToSend.empty()) {
// We must check if at least one of the nodes with pending messages is also
// sendable, as otherwise a single node would be able to make the network
// thread busy with polling.
for (auto& p : mapNodesWithDataToSend) {
if (mapSendableNodes.count(p.first)) {
return true;
break;
}
}
}
for (const auto& p : mapSendableNodes) {
const auto& [to_send, more, _msg_type] = p.second->m_transport->GetBytesToSend(p.second->nSendMsgSize != 0);
if (!to_send.empty()) {
return true;
}
}
return false;
Expand Down Expand Up @@ -2607,52 +2594,30 @@ void CConnman::SocketHandlerConnected(const std::set<SOCKET>& recv_set,
assert(jt.first->second == it->second);
it->second->fCanSendData = true;
}
}

// collect nodes that have a receivable socket
// also clean up mapReceivableNodes from nodes that were receivable in the last iteration but aren't anymore
{
LOCK(cs_sendable_receivable_nodes);

for (auto it = mapReceivableNodes.begin(); it != mapReceivableNodes.end(); ) {
if (!it->second->fHasRecvData) {
it = mapReceivableNodes.erase(it);
} else {
// Implement the following logic:
// * If there is data to send, try sending data. As this only
// happens when optimistic write failed, we choose to first drain the
// write buffer in this case before receiving more. This avoids
// needlessly queueing received data, if the remote peer is not themselves
// receiving data. This means properly utilizing TCP flow control signalling.
// * Otherwise, if there is space left in the receive buffer (!fPauseRecv), try
// receiving data (which should succeed as the socket signalled as receivable).
if (!it->second->fPauseRecv && !it->second->fDisconnect && it->second->nSendMsgSize == 0) {
it->second->AddRef();
vReceivableNodes.emplace(it->second);
}
++it;
}
}
ForEachNode(AllNodes, [&](CNode* pnode) {
const auto& [to_send, more, _msg_type] = pnode->m_transport->GetBytesToSend(pnode->nSendMsgSize != 0);
// Collect nodes that have a receivable socket, implement the following logic:
// * If there is data to send, try sending data. As this only
// happens when optimistic write failed, we choose to first drain the
// write buffer in this case before receiving more. This avoids
// needlessly queueing received data, if the remote peer is not themselves
// receiving data. This means properly utilizing TCP flow control signalling.
// * Otherwise, if there is space left in the receive buffer (!fPauseRecv), try
// receiving data (which should succeed as the socket signalled as receivable).
if (pnode->fHasRecvData && !pnode->fPauseRecv && !pnode->fDisconnect &&
(!pnode->m_transport->ReceivedMessageComplete() || to_send.empty())) {
pnode->AddRef();
vReceivableNodes.emplace(pnode);
}

// collect nodes that have data to send and have a socket with non-empty write buffers
// also clean up mapNodesWithDataToSend from nodes that had messages to send in the last iteration
// but don't have any in this iteration
LOCK(cs_mapNodesWithDataToSend);
for (auto it = mapNodesWithDataToSend.begin(); it != mapNodesWithDataToSend.end(); ) {
const auto& [to_send, more, _msg_type] = it->second->m_transport->GetBytesToSend(it->second->nSendMsgSize != 0);
if (to_send.empty() && !more) {
// See comment in PushMessage
it->second->Release();
it = mapNodesWithDataToSend.erase(it);
} else {
if (it->second->fCanSendData) {
it->second->AddRef();
vSendableNodes.emplace(it->second);
}
++it;
}
// Collect nodes that have data to send and have a socket with non-empty write buffers
if (pnode->fCanSendData && (!pnode->m_transport->ReceivedMessageComplete() || !to_send.empty())) {
pnode->AddRef();
vSendableNodes.emplace(pnode);
}
}
});

for (CNode* pnode : vSendableNodes) {
if (interruptNet) {
Expand Down Expand Up @@ -2724,6 +2689,15 @@ void CConnman::SocketHandlerConnected(const std::set<SOCKET>& recv_set,
++it;
}
}
// clean up mapReceivableNodes from nodes that were receivable in the last iteration but aren't anymore
for (auto it = mapReceivableNodes.begin(); it != mapReceivableNodes.end(); ) {
if (!it->second->fHasRecvData) {
LogPrint(BCLog::NET, "%s -- remove mapReceivableNodes, peer=%d\n", __func__, it->second->GetId());
it = mapReceivableNodes.erase(it);
} else {
++it;
}
}
}
}

Expand Down Expand Up @@ -4259,10 +4233,6 @@ void CConnman::StopNodes()
LOCK(cs_sendable_receivable_nodes);
mapReceivableNodes.clear();
}
{
LOCK(cs_mapNodesWithDataToSend);
mapNodesWithDataToSend.clear();
}
m_nodes_disconnected.clear();
vhListenSocket.clear();
semOutbound.reset();
Expand Down Expand Up @@ -4818,17 +4788,6 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
pnode->vSendMsg.push_back(std::move(msg));
pnode->nSendMsgSize = pnode->vSendMsg.size();

{
LOCK(cs_mapNodesWithDataToSend);
// we're not holding m_nodes_mutex here, so there is a chance of this node being disconnected shortly before
// we get here. Whoever called PushMessage still has a ref to CNode*, but will later Release() it, so we
// might end up having an entry in mapNodesWithDataToSend that is not in m_nodes anymore. We need to
// Add/Release refs when adding/erasing mapNodesWithDataToSend.
if (mapNodesWithDataToSend.emplace(pnode->GetId(), pnode).second) {
pnode->AddRef();
}
}

// If there was nothing to send before, and there is now (predicted by the "more" value
// returned by the GetBytesToSend call above), attempt "optimistic write":
// because the poll/select loop may pause for SELECT_TIMEOUT_MILLISECONDS before actually
Expand Down
3 changes: 0 additions & 3 deletions src/net.h
Original file line number Diff line number Diff line change
Expand Up @@ -1856,9 +1856,6 @@ friend class CNode;
Mutex cs_sendable_receivable_nodes;
std::unordered_map<NodeId, CNode*> mapReceivableNodes GUARDED_BY(cs_sendable_receivable_nodes);
std::unordered_map<NodeId, CNode*> mapSendableNodes GUARDED_BY(cs_sendable_receivable_nodes);
/** Protected by cs_mapNodesWithDataToSend */
std::unordered_map<NodeId, CNode*> mapNodesWithDataToSend GUARDED_BY(cs_mapNodesWithDataToSend);
mutable RecursiveMutex cs_mapNodesWithDataToSend;

std::thread threadDNSAddressSeed;
std::thread threadSocketHandler;
Expand Down

0 comments on commit f9f8805

Please sign in to comment.