Skip to content

Commit

Permalink
refactor: introduce WakeupPipe, move wakeup select pipe logic there
Browse files Browse the repository at this point in the history
  • Loading branch information
kwvg committed May 14, 2024
1 parent ed7d976 commit b8c3b48
Show file tree
Hide file tree
Showing 5 changed files with 193 additions and 94 deletions.
2 changes: 2 additions & 0 deletions src/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,7 @@ BITCOIN_CORE_H = \
util/ui_change_type.h \
util/url.h \
util/vector.h \
util/wpipe.h \
validation.h \
validationinterface.h \
versionbits.h \
Expand Down Expand Up @@ -797,6 +798,7 @@ libbitcoin_util_a_SOURCES = \
util/thread.cpp \
util/threadnames.cpp \
util/tokenpipe.cpp \
util/wpipe.cpp \
$(BITCOIN_CORE_H)

if USE_LIBEVENT
Expand Down
124 changes: 41 additions & 83 deletions src/net.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <util/thread.h>
#include <util/time.h>
#include <util/translation.h>
#include <util/wpipe.h>
#include <validation.h> // for fDIP0001ActiveAtTip

#include <masternode/meta.h>
Expand Down Expand Up @@ -119,7 +120,7 @@ static const uint64_t SELECT_TIMEOUT_MILLISECONDS = 50;
// We are however still somewhat limited in how long we can sleep as there is periodic work (cleanup) to be done in
// the socket handler thread
static const uint64_t SELECT_TIMEOUT_MILLISECONDS = 500;
#endif
#endif /* USE_WAKEUP_PIPE */

const std::string NET_MESSAGE_COMMAND_OTHER = "*other*";

Expand Down Expand Up @@ -1284,7 +1285,9 @@ void CConnman::CreateNodeFromAcceptedSocket(SOCKET hSocket,
LogPrint(BCLog::NET, "EdgeTriggeredEvents::RegisterEvents() failed\n");
}
}
WakeSelect();
if (m_wakeup_pipe) {
m_wakeup_pipe->Write();
}
}

// We received a new connection, harvest entropy from the time (and our peer count)
Expand Down Expand Up @@ -1569,14 +1572,14 @@ bool CConnman::GenerateSelectSet(const std::vector<CNode*>& nodes,
}
}

#ifdef USE_WAKEUP_PIPE
// We add a pipe to the read set so that the select() call can be woken up from the outside
// This is done when data is added to send buffers (vSendMsg) or when new peers are added
// This is currently only implemented for POSIX compliant systems. This means that Windows will fall back to
// timing out after 50ms and then trying to send. This is ok as we assume that heavy-load daemons are usually
// run on Linux and friends.
recv_set.insert(wakeupPipe[0]);
#endif
if (m_wakeup_pipe) {
// We add a pipe to the read set so that the select() call can be woken up from the outside
// This is done when data is added to send buffers (vSendMsg) or when new peers are added
// This is currently only implemented for POSIX compliant systems. This means that Windows will fall back to
// timing out after 50ms and then trying to send. This is ok as we assume that heavy-load daemons are usually
// run on Linux and friends.
recv_set.insert(m_wakeup_pipe->m_pipe[0]);
}

return !recv_set.empty() || !send_set.empty() || !error_set.empty();
}
Expand All @@ -1594,9 +1597,8 @@ void CConnman::SocketEventsKqueue(std::set<SOCKET>& recv_set,
timeout.tv_sec = only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS / 1000;
timeout.tv_nsec = (only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS % 1000) * 1000 * 1000;

wakeupSelectNeeded = true;
int n = kevent(Assert(m_edge_trig_events)->m_fd, nullptr, 0, events, maxEvents, &timeout);
wakeupSelectNeeded = false;
int n{-1};
ToggleWakeupPipe([&](){n = kevent(Assert(m_edge_trig_events)->m_fd, nullptr, 0, events, maxEvents, &timeout);});
if (n == -1) {
LogPrintf("kevent wait error\n");
} else if (n > 0) {
Expand Down Expand Up @@ -1628,9 +1630,8 @@ void CConnman::SocketEventsEpoll(std::set<SOCKET>& recv_set,
const size_t maxEvents = 64;
epoll_event events[maxEvents];

wakeupSelectNeeded = true;
int n = epoll_wait(Assert(m_edge_trig_events)->m_fd, events, maxEvents, only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS);
wakeupSelectNeeded = false;
int n{-1};
ToggleWakeupPipe([&](){n = epoll_wait(Assert(m_edge_trig_events)->m_fd, events, maxEvents, only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS);});
for (int i = 0; i < n; i++) {
auto& e = events[i];
if((e.events & EPOLLERR) || (e.events & EPOLLHUP)) {
Expand Down Expand Up @@ -1685,9 +1686,8 @@ void CConnman::SocketEventsPoll(const std::vector<CNode*>& nodes,
vpollfds.push_back(std::move(it.second));
}

wakeupSelectNeeded = true;
int r = poll(vpollfds.data(), vpollfds.size(), only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS);
wakeupSelectNeeded = false;
int r{-1};
ToggleWakeupPipe([&](){r = poll(vpollfds.data(), vpollfds.size(), only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS);});
if (r < 0) {
return;
}
Expand Down Expand Up @@ -1744,9 +1744,8 @@ void CConnman::SocketEventsSelect(const std::vector<CNode*>& nodes,
hSocketMax = std::max(hSocketMax, hSocket);
}

wakeupSelectNeeded = true;
int nSelect = select(hSocketMax + 1, &fdsetRecv, &fdsetSend, &fdsetError, &timeout);
wakeupSelectNeeded = false;
int nSelect{-1};
ToggleWakeupPipe([&](){nSelect = select(hSocketMax + 1, &fdsetRecv, &fdsetSend, &fdsetError, &timeout);});
if (interruptNet)
return;

Expand Down Expand Up @@ -1849,18 +1848,10 @@ void CConnman::SocketHandler(CMasternodeSync& mn_sync)
// empty sets.
SocketEvents(snap.Nodes(), recv_set, send_set, error_set, only_poll);

#ifdef USE_WAKEUP_PIPE
// drain the wakeup pipe
if (recv_set.count(wakeupPipe[0])) {
char buf[128];
while (true) {
int r = read(wakeupPipe[0], buf, sizeof(buf));
if (r <= 0) {
break;
}
}
// Drain the wakeup pipe
if (m_wakeup_pipe && recv_set.count(m_wakeup_pipe->m_pipe[0])) {
m_wakeup_pipe->Drain();
}
#endif

// Service (send/receive) each of the already connected nodes.
SocketHandlerConnected(recv_set, send_set, error_set);
Expand Down Expand Up @@ -2138,22 +2129,6 @@ void CConnman::WakeMessageHandler()
condMsgProc.notify_one();
}

void CConnman::WakeSelect()
{
#ifdef USE_WAKEUP_PIPE
if (wakeupPipe[1] == -1) {
return;
}

char buf{0};
if (write(wakeupPipe[1], &buf, sizeof(buf)) != 1) {
LogPrint(BCLog::NET, "write to wakeupPipe failed\n");
}
#endif

wakeupSelectNeeded = false;
}

void CConnman::ThreadDNSAddressSeed()
{
FastRandomContext rng;
Expand Down Expand Up @@ -2993,7 +2968,9 @@ void CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai
LogPrint(BCLog::NET, "EdgeTriggeredEvents::RegisterEvents() failed\n");
}
}
WakeSelect();
if (m_wakeup_pipe) {
m_wakeup_pipe->Write();
}
}
}

Expand Down Expand Up @@ -3373,23 +3350,13 @@ bool CConnman::Start(CDeterministicMNManager& dmnman, CMasternodeMetaMan& mn_met
}

#ifdef USE_WAKEUP_PIPE
if (pipe(wakeupPipe) != 0) {
wakeupPipe[0] = wakeupPipe[1] = -1;
LogPrint(BCLog::NET, "pipe() for wakeupPipe failed\n");
} else {
int fFlags = fcntl(wakeupPipe[0], F_GETFL, 0);
if (fcntl(wakeupPipe[0], F_SETFL, fFlags | O_NONBLOCK) == -1) {
LogPrint(BCLog::NET, "fcntl for O_NONBLOCK on wakeupPipe failed\n");
}
fFlags = fcntl(wakeupPipe[1], F_GETFL, 0);
if (fcntl(wakeupPipe[1], F_SETFL, fFlags | O_NONBLOCK) == -1) {
LogPrint(BCLog::NET, "fcntl for O_NONBLOCK on wakeupPipe failed\n");
}
if (m_edge_trig_events && !m_edge_trig_events->RegisterPipe(wakeupPipe[0])) {
LogPrint(BCLog::NET, "EdgeTriggeredEvents::RegisterPipe() failed\n");
}
m_wakeup_pipe = std::make_unique<WakeupPipe>(m_edge_trig_events.get());
if (!m_wakeup_pipe->IsValid()) {
/* We log the error but do not halt initialization */
LogPrintf("Unable to initialize WakeupPipe instance\n");
m_wakeup_pipe.reset();
}
#endif
#endif /* USE_WAKEUP_PIPE */

// Send and receive from sockets, accept connections
threadSocketHandler = std::thread(&util::TraceThread, "net", [this, &mn_sync] { ThreadSocketHandler(mn_sync); });
Expand Down Expand Up @@ -3555,21 +3522,12 @@ void CConnman::StopNodes()
vhListenSocket.clear();
semOutbound.reset();
semAddnode.reset();

if (m_edge_trig_events) {
#ifdef USE_WAKEUP_PIPE
if (!m_edge_trig_events->UnregisterPipe(wakeupPipe[0])) {
LogPrintf("EdgeTriggeredEvents::UnregisterPipe() failed\n");
}
#endif
m_edge_trig_events.reset();
}

#ifdef USE_WAKEUP_PIPE
if (wakeupPipe[0] != -1) close(wakeupPipe[0]);
if (wakeupPipe[1] != -1) close(wakeupPipe[1]);
wakeupPipe[0] = wakeupPipe[1] = -1;
#endif
/**
* m_wakeup_pipe must be reset *before* m_edge_trig_events as it may
* attempt to call EdgeTriggeredEvents::UnregisterPipe() in its destructor
*/
m_wakeup_pipe.reset();
m_edge_trig_events.reset();
}

void CConnman::DeleteNode(CNode* pnode)
Expand Down Expand Up @@ -4082,8 +4040,8 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
}

// wake up select() call in case there was no pending data before (so it was not selecting this socket for sending)
if (!hasPendingData && wakeupSelectNeeded)
WakeSelect();
if (!hasPendingData && (m_wakeup_pipe && m_wakeup_pipe->m_need_wakeup.load()))
m_wakeup_pipe->Write();
}
}

Expand Down
23 changes: 12 additions & 11 deletions src/net.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <util/check.h>
#include <util/edge.h>
#include <util/system.h>
#include <util/wpipe.h>
#include <consensus/params.h>

#include <atomic>
Expand All @@ -45,10 +46,6 @@
#include <queue>
#include <vector>

#ifndef WIN32
#define USE_WAKEUP_PIPE
#endif

class CConnman;
class CDeterministicMNList;
class CDeterministicMNManager;
Expand Down Expand Up @@ -1168,7 +1165,6 @@ friend class CNode;
unsigned int GetReceiveFloodSize() const;

void WakeMessageHandler() EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc);
void WakeSelect() EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc);

/** Attempts to obfuscate tx time through exponentially distributed emitting.
Works assuming that a single interval is used.
Expand Down Expand Up @@ -1505,14 +1501,19 @@ friend class CNode;
*/
std::unique_ptr<i2p::sam::Session> m_i2p_sam_session;

#ifdef USE_WAKEUP_PIPE
/** a pipe which is added to select() calls to wakeup before the timeout */
int wakeupPipe[2]{-1,-1};
#endif
std::atomic<bool> wakeupSelectNeeded{false};

SocketEventsMode socketEventsMode;
std::unique_ptr<EdgeTriggeredEvents> m_edge_trig_events{nullptr};
std::unique_ptr<WakeupPipe> m_wakeup_pipe{nullptr};

template <typename Callable>
void ToggleWakeupPipe(Callable&& func)
{
if (m_wakeup_pipe) {
m_wakeup_pipe->Toggle(func);
} else {
func();
}
}

Mutex cs_sendable_receivable_nodes;
std::unordered_map<NodeId, CNode*> mapReceivableNodes GUARDED_BY(cs_sendable_receivable_nodes);
Expand Down
79 changes: 79 additions & 0 deletions src/util/wpipe.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright (c) 2020-2024 The Dash Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.

#include <util/wpipe.h>

#include <logging.h>
#include <util/edge.h>

WakeupPipe::WakeupPipe(EdgeTriggeredEvents* edge_trig_events)
: m_edge_trig_events{edge_trig_events}
{
#ifdef USE_WAKEUP_PIPE
if (pipe(m_pipe.data()) != 0) {
LogPrintf("Unable to initialize WakeupPipe, pipe() for m_pipe failed\n");
return;
}
for (size_t idx = 0; idx < m_pipe.size(); idx++) {
int flags = fcntl(m_pipe[idx], F_GETFL, 0);
if (fcntl(m_pipe[idx], F_SETFL, flags | O_NONBLOCK) == -1) {
LogPrintf("Unable to initialize WakeupPipe, fcntl for O_NONBLOCK on m_pipe[%d] failed\n", idx);
return;
}
}
if (edge_trig_events && !edge_trig_events->RegisterPipe(m_pipe[0])) {
LogPrintf("Unable to initialize WakeupPipe, EdgeTriggeredEvents::RegisterPipe() failed\n");
return;
}
m_valid = true;
#else
LogPrintf("Attempting to initialize WakeupPipe without support compiled in!\n");
#endif /* USE_WAKEUP_PIPE */
}

WakeupPipe::~WakeupPipe()
{
if (m_valid) {
#ifdef USE_WAKEUP_PIPE
if (m_edge_trig_events && !m_edge_trig_events->UnregisterPipe(m_pipe[0])) {
LogPrintf("Destroying WakeupPipe instance, EdgeTriggeredEvents::UnregisterPipe() failed\n");
}
close(m_pipe[0]);
close(m_pipe[1]);
#else
assert(false);
#endif /* USE_WAKEUP_PIPE */
}
}

void WakeupPipe::Drain() const
{
#ifdef USE_WAKEUP_PIPE
assert(m_valid && m_pipe[0] != -1);

int ret{0};
std::array<uint8_t, 128> buf;
do {
ret = read(m_pipe[0], buf.data(), buf.size());
} while (ret > 0);
#else
assert(false);
#endif /* USE_WAKEUP_PIPE */
}

void WakeupPipe::Write()
{
#ifdef USE_WAKEUP_PIPE
assert(m_valid && m_pipe[1] != -1);

std::array<uint8_t, 1> buf;
if (write(m_pipe[1], buf.data(), buf.size()) != 1) {
LogPrintf("Write to m_pipe[1] failed\n");
}

m_need_wakeup = false;
#else
assert(false);
#endif /* USE_WAKEUP_PIPE */
}
Loading

0 comments on commit b8c3b48

Please sign in to comment.