Skip to content

Commit

Permalink
net: make net interruptible
Browse files Browse the repository at this point in the history
Also now that net threads are interruptible, switch them to use std
threads/binds/mutexes/condvars.
  • Loading branch information
theuni authored and Fuzzbawls committed Aug 25, 2020
1 parent 814d0de commit e24b4cc
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 39 deletions.
5 changes: 3 additions & 2 deletions src/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ void Interrupt()
InterruptRPC();
InterruptREST();
InterruptTorControl();
if (g_connman)
g_connman->Interrupt();
}

/** Preparing steps before shutting down or restarting the wallet */
Expand Down Expand Up @@ -224,7 +226,6 @@ void PrepareShutdown()
GenerateBitcoins(false, NULL, 0);
#endif
MapPort(false);
g_connman->Stop();
g_connman.reset();

DumpMasternodes();
Expand Down Expand Up @@ -1898,7 +1899,7 @@ bool AppInit2()
connOptions.nSendBufferMaxSize = 1000*GetArg("-maxsendbuffer", DEFAULT_MAXSENDBUFFER);
connOptions.nReceiveFloodSize = 1000*GetArg("-maxreceivebuffer", DEFAULT_MAXRECEIVEBUFFER);

if(!connman.Start(threadGroup, scheduler, strNodeError, connOptions))
if (!connman.Start(scheduler, strNodeError, connOptions))
return UIError(strNodeError);

#ifdef ENABLE_WALLET
Expand Down
108 changes: 73 additions & 35 deletions src/net.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1048,7 +1048,7 @@ void CConnman::AcceptConnection(const ListenSocket& hListenSocket) {
void CConnman::ThreadSocketHandler()
{
unsigned int nPrevNodeCount = 0;
while (true) {
while (!interruptNet) {
//
// Disconnect nodes
//
Expand Down Expand Up @@ -1179,8 +1179,9 @@ void CConnman::ThreadSocketHandler()
}

int nSelect = select(have_fds ? hSocketMax + 1 : 0,
&fdsetRecv, &fdsetSend, &fdsetError, &timeout);
boost::this_thread::interruption_point();
&fdsetRecv, &fdsetSend, &fdsetError, &timeout);
if (interruptNet)
return;

if (nSelect == SOCKET_ERROR) {
if (have_fds) {
Expand All @@ -1191,7 +1192,8 @@ void CConnman::ThreadSocketHandler()
}
FD_ZERO(&fdsetSend);
FD_ZERO(&fdsetError);
MilliSleep(timeout.tv_usec / 1000);
if (!interruptNet.sleep_for(std::chrono::milliseconds(timeout.tv_usec/1000)))
return;
}

//
Expand All @@ -1214,7 +1216,8 @@ void CConnman::ThreadSocketHandler()
pnode->AddRef();
}
for (CNode* pnode : vNodesCopy) {
boost::this_thread::interruption_point();
if (interruptNet)
return;

//
// Receive
Expand All @@ -1233,7 +1236,7 @@ void CConnman::ThreadSocketHandler()
if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify))
pnode->CloseSocketDisconnect();
if(notify)
messageHandlerCondition.notify_one();
condMsgProc.notify_one();
pnode->nLastRecv = GetTime();
pnode->nRecvBytes += nBytes;
RecordBytesRecv(nBytes);
Expand Down Expand Up @@ -1424,7 +1427,8 @@ void CConnman::ThreadDNSAddressSeed()
// goal: only query DNS seeds if address need is acute
if ((addrman.size() > 0) &&
(!GetBoolArg("-forcednsseed", DEFAULT_FORCEDNSSEED))) {
MilliSleep(11 * 1000);
if (!interruptNet.sleep_for(std::chrono::seconds(11)))
return;

LOCK(cs_vNodes);
if (vNodes.size() >= 2) {
Expand Down Expand Up @@ -1525,10 +1529,12 @@ void CConnman::ThreadOpenConnections()
CAddress addr(CService(), NODE_NONE);
OpenNetworkConnection(addr, false, NULL, strAddr.c_str());
for (int i = 0; i < 10 && i < nLoop; i++) {
MilliSleep(500);
if (!interruptNet.sleep_for(std::chrono::milliseconds(500)))
return;
}
}
MilliSleep(500);
if (!interruptNet.sleep_for(std::chrono::milliseconds(500)))
return;
}
}

Expand All @@ -1537,13 +1543,15 @@ void CConnman::ThreadOpenConnections()

// Minimum time before next feeler connection (in microseconds).
int64_t nNextFeeler = PoissonNextSend(nStart * 1000 * 1000, FEELER_INTERVAL);
while (true) {
while (!interruptNet) {
ProcessOneShot();

MilliSleep(500);
if (!interruptNet.sleep_for(std::chrono::milliseconds(500)))
return;

CSemaphoreGrant grant(*semOutbound);
boost::this_thread::interruption_point();
if (interruptNet)
return;

// Add seed nodes if DNS seeds are all down (an infrastructure attack?).
if (addrman.size() == 0 && (GetTime() - nStart > 60)) {
Expand Down Expand Up @@ -1601,7 +1609,7 @@ void CConnman::ThreadOpenConnections()

int64_t nANow = GetAdjustedTime();
int nTries = 0;
while (true) {
while (!interruptNet) {
CAddrInfo addr = addrman.Select(fFeeler);

// if we selected an invalid address, restart
Expand Down Expand Up @@ -1638,7 +1646,8 @@ void CConnman::ThreadOpenConnections()
if (fFeeler) {
// Add small amount of random noise before connection to avoid synchronization.
int randsleep = GetRandInt(FEELER_SLEEP_WINDOW * 1000);
MilliSleep(randsleep);
if (!interruptNet.sleep_for(std::chrono::milliseconds(randsleep)))
return;
LogPrint(BCLog::NET, "Making feeler connection to %s\n", addrConnect.ToString());
}

Expand Down Expand Up @@ -1715,11 +1724,12 @@ void CConnman::ThreadOpenAddedConnections()
// OpenNetworkConnection can detect existing connections to that IP/port.
CService service(LookupNumeric(info.strAddedNode.c_str(), Params().GetDefaultPort()));
OpenNetworkConnection(CAddress(service, NODE_NONE), false, &grant, info.strAddedNode.c_str(), false);
MilliSleep(500);
if (!interruptNet.sleep_for(std::chrono::milliseconds(500)))
return;
}
}

MilliSleep(120000); // Retry every 2 minutes
if (!interruptNet.sleep_for(std::chrono::minutes(2)))
return;
}
}

Expand All @@ -1729,7 +1739,9 @@ bool CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai
//
// Initiate outbound network connection
//
boost::this_thread::interruption_point();
if (interruptNet) {
return false;
}
if (!pszDest) {
if (IsLocal(addrConnect) ||
FindNode((CNetAddr)addrConnect) || IsBanned(addrConnect) ||
Expand All @@ -1753,13 +1765,9 @@ bool CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai
return true;
}


void CConnman::ThreadMessageHandler()
{
boost::mutex condition_mutex;
boost::unique_lock<boost::mutex> lock(condition_mutex);

while (true) {
while (!flagInterruptMsgProc) {
std::vector<CNode*> vNodesCopy;
{
LOCK(cs_vNodes);
Expand Down Expand Up @@ -1789,14 +1797,16 @@ void CConnman::ThreadMessageHandler()
}
}
}
boost::this_thread::interruption_point();
if (flagInterruptMsgProc)
return;

// Send messages
{
LOCK(pnode->cs_sendProcessing);
GetNodeSignals().SendMessages(pnode, *this);
}
boost::this_thread::interruption_point();
if (flagInterruptMsgProc)
return;
}


Expand All @@ -1806,8 +1816,10 @@ void CConnman::ThreadMessageHandler()
pnode->Release();
}

if (fSleep)
messageHandlerCondition.timed_wait(lock, boost::posix_time::microsec_clock::universal_time() + boost::posix_time::milliseconds(100));
if (fSleep) {
std::unique_lock<std::mutex> lock(mutexMsgProc);
condMsgProc.wait_until(lock, std::chrono::steady_clock::now() + std::chrono::milliseconds(100));
}
}
}

Expand Down Expand Up @@ -1954,14 +1966,15 @@ CConnman::CConnman()
nMaxOutbound = 0;
nBestHeight = 0;
clientInterface = NULL;
flagInterruptMsgProc = false;
}

NodeId CConnman::GetNewNodeId()
{
return nLastNodeId.fetch_add(1, std::memory_order_relaxed);
}

bool CConnman::Start(boost::thread_group& threadGroup, CScheduler& scheduler, std::string& strNodeError, Options connOptions)
bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options connOptions)
{
nTotalBytesRecv = 0;
nTotalBytesSent = 0;
Expand Down Expand Up @@ -2032,24 +2045,26 @@ bool CConnman::Start(boost::thread_group& threadGroup, CScheduler& scheduler, st
//
// Start threads
//
interruptNet.reset();
flagInterruptMsgProc = false;

// Send and receive from sockets, accept connections
threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "net", boost::function<void()>(boost::bind(&CConnman::ThreadSocketHandler, this))));
threadSocketHandler = std::thread(&TraceThread<std::function<void()> >, "net", std::function<void()>(std::bind(&CConnman::ThreadSocketHandler, this)));

if (!GetBoolArg("-dnsseed", true))
LogPrintf("DNS seeding disabled\n");
else
threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "dnsseed", boost::function<void()>(boost::bind(&CConnman::ThreadDNSAddressSeed, this))));
threadDNSAddressSeed = std::thread(&TraceThread<std::function<void()> >, "dnsseed", std::function<void()>(std::bind(&CConnman::ThreadDNSAddressSeed, this)));

// Initiate outbound connections from -addnode
threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "addcon", boost::function<void()>(boost::bind(&CConnman::ThreadOpenAddedConnections, this))));
threadOpenAddedConnections = std::thread(&TraceThread<std::function<void()> >, "addcon", std::function<void()>(std::bind(&CConnman::ThreadOpenAddedConnections, this)));

// Initiate outbound connections unless connect=0
if (!mapArgs.count("-connect") || mapMultiArgs["-connect"].size() != 1 || mapMultiArgs["-connect"][0] != "0")
threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "opencon", boost::function<void()>(boost::bind(&CConnman::ThreadOpenConnections, this))));
threadOpenConnections = std::thread(&TraceThread<std::function<void()> >, "opencon", std::function<void()>(std::bind(&CConnman::ThreadOpenConnections, this)));

// Process messages
threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "msghand", boost::function<void()>(boost::bind(&CConnman::ThreadMessageHandler, this))));
threadMessageHandler = std::thread(&TraceThread<std::function<void()> >, "msghand", std::function<void()>(std::bind(&CConnman::ThreadMessageHandler, this)));

// Dump network addresses
scheduler.scheduleEvery(boost::bind(&CConnman::DumpData, this), DUMP_ADDRESSES_INTERVAL);
Expand Down Expand Up @@ -2080,12 +2095,33 @@ void CExplicitNetCleanup::callCleanup()
delete tmp; // Stroustrup's gonna kill me for that
}

void CConnman::Stop()
void CConnman::Interrupt()
{
LogPrintf("%s\n",__func__);
{
std::lock_guard<std::mutex> lock(mutexMsgProc);
flagInterruptMsgProc = true;
}
condMsgProc.notify_all();

interruptNet();

if (semOutbound)
for (int i=0; i<(nMaxOutbound + nMaxFeeler); i++)
semOutbound->post();
}

void CConnman::Stop()
{
if (threadMessageHandler.joinable())
threadMessageHandler.join();
if (threadOpenConnections.joinable())
threadOpenConnections.join();
if (threadOpenAddedConnections.joinable())
threadOpenAddedConnections.join();
if (threadDNSAddressSeed.joinable())
threadDNSAddressSeed.join();
if (threadSocketHandler.joinable())
threadSocketHandler.join();

if (fAddressesInitialized)
{
Expand Down Expand Up @@ -2131,6 +2167,8 @@ void CConnman::DeleteNode(CNode* pnode)

CConnman::~CConnman()
{
Interrupt();
Stop();
}

size_t CConnman::GetAddressCount() const
Expand Down
19 changes: 17 additions & 2 deletions src/net.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@
#include "sync.h"
#include "uint256.h"
#include "utilstrencodings.h"
#include "threadinterrupt.h"

#include <atomic>
#include <deque>
#include <stdint.h>
#include <thread>
#include <memory>
#include <condition_variable>

#ifndef WIN32
#include <arpa/inet.h>
Expand Down Expand Up @@ -127,8 +130,9 @@ class CConnman
};
CConnman();
~CConnman();
bool Start(boost::thread_group& threadGroup, CScheduler& scheduler, std::string& strNodeError, Options options);
bool Start(CScheduler& scheduler, std::string& strNodeError, Options options);
void Stop();
void Interrupt();
bool BindListenPort(const CService &bindAddr, std::string& strError, bool fWhitelisted = false);
bool OpenNetworkConnection(const CAddress& addrConnect, bool fCountFailure, CSemaphoreGrant* grantOutbound = NULL, const char* strDest = NULL, bool fOneShot = false, bool fFeeler = false);
bool CheckIncomingNonce(uint64_t nonce);
Expand Down Expand Up @@ -357,7 +361,6 @@ class CConnman
std::list<CNode*> vNodesDisconnected;
mutable RecursiveMutex cs_vNodes;
std::atomic<NodeId> nLastNodeId;
boost::condition_variable messageHandlerCondition;

/** Services this instance offers */
ServiceFlags nLocalServices;
Expand All @@ -371,6 +374,18 @@ class CConnman
int nMaxFeeler;
std::atomic<int> nBestHeight;
CClientUIInterface* clientInterface;

std::condition_variable condMsgProc;
std::mutex mutexMsgProc;
std::atomic<bool> flagInterruptMsgProc;

CThreadInterrupt interruptNet;

std::thread threadDNSAddressSeed;
std::thread threadSocketHandler;
std::thread threadOpenAddedConnections;
std::thread threadOpenConnections;
std::thread threadMessageHandler;
};
extern std::unique_ptr<CConnman> g_connman;
void Discover(boost::thread_group& threadGroup);
Expand Down

0 comments on commit e24b4cc

Please sign in to comment.