Skip to content

Commit

Permalink
refactor: move CConnman::(Un)registerEvents to ETE
Browse files Browse the repository at this point in the history
  • Loading branch information
kwvg committed May 14, 2024
1 parent 3a9f386 commit f50c710
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 79 deletions.
91 changes: 15 additions & 76 deletions src/net.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,9 @@ void CNode::CloseSocketDisconnect(CConnman* connman)
}
}

connman->UnregisterEvents(this);
if (connman->m_edge_trig_events && !connman->m_edge_trig_events->UnregisterEvents(hSocket)) {
LogPrint(BCLog::NET, "EdgeTriggeredEvents::UnregisterEvents() failed\n");
}

LogPrint(BCLog::NET, "disconnecting peer=%d\n", id);
CloseSocket(hSocket);
Expand Down Expand Up @@ -1276,7 +1278,12 @@ void CConnman::CreateNodeFromAcceptedSocket(SOCKET hSocket,
LOCK(m_nodes_mutex);
m_nodes.push_back(pnode);
WITH_LOCK(cs_mapSocketToNode, mapSocketToNode.emplace(hSocket, pnode));
RegisterEvents(pnode);
if (m_edge_trig_events) {
LOCK(pnode->cs_hSocket);
if (!m_edge_trig_events->RegisterEvents(pnode->hSocket)) {
LogPrint(BCLog::NET, "EdgeTriggeredEvents::RegisterEvents() failed\n");
}
}
WakeSelect();
}

Expand Down Expand Up @@ -2980,7 +2987,12 @@ void CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai
{
LOCK(m_nodes_mutex);
m_nodes.push_back(pnode);
RegisterEvents(pnode);
if (m_edge_trig_events) {
LOCK(pnode->cs_hSocket);
if (!m_edge_trig_events->RegisterEvents(pnode->hSocket)) {
LogPrint(BCLog::NET, "EdgeTriggeredEvents::RegisterEvents() failed\n");
}
}
WakeSelect();
}
}
Expand Down Expand Up @@ -4193,79 +4205,6 @@ uint64_t CConnman::CalculateKeyedNetGroup(const CAddress& ad) const
return GetDeterministicRandomizer(RANDOMIZER_ID_NETGROUP).Write(vchNetGroup.data(), vchNetGroup.size()).Finalize();
}

void CConnman::RegisterEvents(CNode *pnode)
{
#ifdef USE_KQUEUE
if (socketEventsMode == SocketEventsMode::KQueue) {
LOCK(pnode->cs_hSocket);
assert(pnode->hSocket != INVALID_SOCKET);

struct kevent events[2];
EV_SET(&events[0], pnode->hSocket, EVFILT_READ, EV_ADD, 0, 0, nullptr);
EV_SET(&events[1], pnode->hSocket, EVFILT_WRITE, EV_ADD | EV_CLEAR, 0, 0, nullptr);

int r = kevent(Assert(m_edge_trig_events)->m_fd, events, 2, nullptr, 0, nullptr);
if (r != 0) {
LogPrint(BCLog::NET, "%s -- kevent(%d, %d, %d, ...) failed. error: %s\n", __func__,
m_edge_trig_events->m_fd, EV_ADD, pnode->hSocket, NetworkErrorString(WSAGetLastError()));
}
}
#endif
#ifdef USE_EPOLL
if (socketEventsMode == SocketEventsMode::EPoll) {
LOCK(pnode->cs_hSocket);
assert(pnode->hSocket != INVALID_SOCKET);

epoll_event e;
// We're using edge-triggered mode, so it's important that we drain sockets even if no signals come in
e.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
e.data.fd = pnode->hSocket;

int r = epoll_ctl(Assert(m_edge_trig_events)->m_fd, EPOLL_CTL_ADD, pnode->hSocket, &e);
if (r != 0) {
LogPrint(BCLog::NET, "%s -- epoll_ctl(%d, %d, %d, ...) failed. error: %s\n", __func__,
m_edge_trig_events->m_fd, EPOLL_CTL_ADD, pnode->hSocket, NetworkErrorString(WSAGetLastError()));
}
}
#endif
}

void CConnman::UnregisterEvents(CNode *pnode)
{
#ifdef USE_KQUEUE
if (socketEventsMode == SocketEventsMode::KQueue) {
AssertLockHeld(pnode->cs_hSocket);
if (pnode->hSocket == INVALID_SOCKET) {
return;
}

struct kevent events[2];
EV_SET(&events[0], pnode->hSocket, EVFILT_READ, EV_DELETE, 0, 0, nullptr);
EV_SET(&events[1], pnode->hSocket, EVFILT_WRITE, EV_DELETE, 0, 0, nullptr);

int r = kevent(Assert(m_edge_trig_events)->m_fd, events, 2, nullptr, 0, nullptr);
if (r != 0) {
LogPrint(BCLog::NET, "%s -- kevent(%d, %d, %d, ...) failed. error: %s\n", __func__,
m_edge_trig_events->m_fd, EV_DELETE, pnode->hSocket, NetworkErrorString(WSAGetLastError()));
}
}
#endif
#ifdef USE_EPOLL
if (socketEventsMode == SocketEventsMode::EPoll) {
AssertLockHeld(pnode->cs_hSocket);
if (pnode->hSocket == INVALID_SOCKET) {
return;
}

int r = epoll_ctl(Assert(m_edge_trig_events)->m_fd, EPOLL_CTL_DEL, pnode->hSocket, nullptr);
if (r != 0) {
LogPrint(BCLog::NET, "%s -- epoll_ctl(%d, %d, %d, ...) failed. error: %s\n", __func__,
m_edge_trig_events->m_fd, EPOLL_CTL_DEL, pnode->hSocket, NetworkErrorString(WSAGetLastError()));
}
}
#endif
}

void CaptureMessageToFile(const CAddress& addr,
const std::string& msg_type,
Span<const unsigned char> data,
Expand Down
3 changes: 0 additions & 3 deletions src/net.h
Original file line number Diff line number Diff line change
Expand Up @@ -1371,9 +1371,6 @@ friend class CNode;
// Whether the node should be passed out in ForEach* callbacks
static bool NodeFullyConnected(const CNode* pnode);

void RegisterEvents(CNode* pnode);
void UnregisterEvents(CNode* pnode);

// Network usage totals
mutable Mutex m_total_bytes_sent_mutex;
std::atomic<uint64_t> nTotalBytesRecv{0};
Expand Down
77 changes: 77 additions & 0 deletions src/util/edge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,80 @@ bool EdgeTriggeredEvents::RemoveSocket(SOCKET socket) const
}
return true;
}

bool EdgeTriggeredEvents::RegisterEvents(SOCKET socket) const
{
assert(m_valid && socket != INVALID_SOCKET);

if (m_mode == SocketEventsMode::EPoll) {
#ifdef USE_EPOLL
epoll_event e;
// We're using edge-triggered mode, so it's important that we drain sockets even if no signals come in
e.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
e.data.fd = socket;

if (epoll_ctl(m_fd, EPOLL_CTL_ADD, socket, &e) != 0) {
LogPrintf("Failed to register events for socket -- epoll_ctl(%d, %d, %d, ...) returned error: %s\n",
m_fd, EPOLL_CTL_ADD, socket, NetworkErrorString(WSAGetLastError()));
return false;
}
#else
assert(false);
#endif /* USE_EPOLL */
} else if (m_mode == SocketEventsMode::KQueue) {
#ifdef USE_KQUEUE
struct kevent events[2];
EV_SET(&events[0], socket, EVFILT_READ, EV_ADD, 0, 0, nullptr);
EV_SET(&events[1], socket, EVFILT_WRITE, EV_ADD | EV_CLEAR, 0, 0, nullptr);

if (kevent(m_fd, events, 2, nullptr, 0, nullptr) != 0) {
LogPrintf("Failed to register events for socket -- kevent(%d, %d, %d, ...) returned error: %s\n",
m_fd, EV_ADD, socket, NetworkErrorString(WSAGetLastError()));
return false;
}
#else
assert(false);
#endif /* USE_KQUEUE */
} else {
assert(false);
}
return true;
}

bool EdgeTriggeredEvents::UnregisterEvents(SOCKET socket) const
{
assert(m_valid);

if (socket == INVALID_SOCKET) {
LogPrintf("Cannot unregister events for invalid socket\n");
return false;
}

if (m_mode == SocketEventsMode::EPoll) {
#ifdef USE_EPOLL
if (epoll_ctl(m_fd, EPOLL_CTL_DEL, socket, nullptr) != 0) {
LogPrintf("Failed to unregister events for socket -- epoll_ctl(%d, %d, %d, ...) returned error: %s\n",
m_fd, EPOLL_CTL_DEL, socket, NetworkErrorString(WSAGetLastError()));
return false;
}
#else
assert(false);
#endif /* USE_EPOLL */
} else if (m_mode == SocketEventsMode::KQueue) {
#ifdef USE_KQUEUE
struct kevent events[2];
EV_SET(&events[0], socket, EVFILT_READ, EV_DELETE, 0, 0, nullptr);
EV_SET(&events[1], socket, EVFILT_WRITE, EV_DELETE, 0, 0, nullptr);
if (kevent(m_fd, events, 2, nullptr, 0, nullptr) != 0) {
LogPrintf("Failed to unregister events for socket -- kevent(%d, %d, %d, ...) returned error: %s\n",
m_fd, EV_DELETE, socket, NetworkErrorString(WSAGetLastError()));
return false;
}
#else
assert(false);
#endif /* USE_KQUEUE */
} else {
assert(false);
}
return true;
}
5 changes: 5 additions & 0 deletions src/util/edge.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ class EdgeTriggeredEvents

bool IsValid() const { return m_valid; }

/* Register events for socket */
bool RegisterEvents(SOCKET socket) const;
/* Unregister events for socket */
bool UnregisterEvents(SOCKET socket) const;

public:
/* File descriptor used to interact with events mode */
int m_fd{-1};
Expand Down

0 comments on commit f50c710

Please sign in to comment.