From d5b557fa62c570cc8d6b9a1ecd1254371f3dbf6c Mon Sep 17 00:00:00 2001 From: Peter Bushnell Date: Fri, 17 Jun 2022 08:59:42 +0100 Subject: [PATCH 01/13] Remove boost::chrono and boost::sleep --- build_msvc/defi_config.h | 6 ---- configure.ac | 51 ---------------------------------- src/bench/examples.cpp | 2 +- src/defi-cli.cpp | 2 +- src/defid.cpp | 2 +- src/httprpc.cpp | 2 +- src/init.cpp | 1 + src/random.cpp | 2 +- src/rpc/server.cpp | 2 +- src/scheduler.cpp | 42 ++++++++++++---------------- src/scheduler.h | 31 +++++++++++---------- src/test/checkqueue_tests.cpp | 2 +- src/test/reverselock_tests.cpp | 43 +++++++++++++++++++++++----- src/test/scheduler_tests.cpp | 35 ++++++++--------------- src/test/setup_common.cpp | 1 + src/test/util_tests.cpp | 4 +-- src/util/time.cpp | 23 +++------------ src/util/time.h | 4 +-- src/wallet/db.cpp | 4 +-- test/lint/lint-includes.sh | 1 - 20 files changed, 100 insertions(+), 160 deletions(-) diff --git a/build_msvc/defi_config.h b/build_msvc/defi_config.h index 89f27811fd..f67c87adcb 100644 --- a/build_msvc/defi_config.h +++ b/build_msvc/defi_config.h @@ -330,12 +330,6 @@ /* Define if the visibility attribute is supported. */ #define HAVE_VISIBILITY_ATTRIBUTE 1 -/* Define this symbol if boost sleep works */ -/* #undef HAVE_WORKING_BOOST_SLEEP */ - -/* Define this symbol if boost sleep_for works */ -#define HAVE_WORKING_BOOST_SLEEP_FOR 1 - /* Define to the sub-directory where libtool stores uninstalled libraries. */ #define LT_OBJDIR ".libs/" diff --git a/configure.ac b/configure.ac index f163c70b24..676649f4a8 100644 --- a/configure.ac +++ b/configure.ac @@ -1189,57 +1189,6 @@ AC_LINK_IFELSE([AC_LANG_PROGRAM([[ LIBS="$TEMP_LIBS" CPPFLAGS="$TEMP_CPPFLAGS" -dnl Boost >= 1.50 uses sleep_for rather than the now-deprecated sleep, however -dnl it was broken from 1.50 to 1.52 when backed by nanosleep. Use sleep_for if -dnl a working version is available, else fall back to sleep. sleep was removed -dnl after 1.56. -dnl If neither is available, abort. -TEMP_LIBS="$LIBS" -LIBS="$BOOST_LIBS $LIBS" -TEMP_CPPFLAGS="$CPPFLAGS" -CPPFLAGS="$CPPFLAGS $BOOST_CPPFLAGS" -AC_LINK_IFELSE([AC_LANG_PROGRAM([[ - #include - #include - ]],[[ - #if BOOST_VERSION >= 105000 && (!defined(BOOST_HAS_NANOSLEEP) || BOOST_VERSION >= 105200) - boost::this_thread::sleep_for(boost::chrono::milliseconds(0)); - #else - choke me - #endif - ]])], - [boost_sleep=yes; - AC_DEFINE(HAVE_WORKING_BOOST_SLEEP_FOR, 1, [Define this symbol if boost sleep_for works])], - [boost_sleep=no]) -LIBS="$TEMP_LIBS" -CPPFLAGS="$TEMP_CPPFLAGS" - -if test x$boost_sleep != xyes; then -TEMP_LIBS="$LIBS" -LIBS="$BOOST_LIBS $LIBS" -TEMP_CPPFLAGS="$CPPFLAGS" -CPPFLAGS="$CPPFLAGS $BOOST_CPPFLAGS" -AC_LINK_IFELSE([AC_LANG_PROGRAM([[ - #include - #include - #include - ]],[[ - #if BOOST_VERSION <= 105600 - boost::this_thread::sleep(boost::posix_time::milliseconds(0)); - #else - choke me - #endif - ]])], - [boost_sleep=yes; AC_DEFINE(HAVE_WORKING_BOOST_SLEEP, 1, [Define this symbol if boost sleep works])], - [boost_sleep=no]) -LIBS="$TEMP_LIBS" -CPPFLAGS="$TEMP_CPPFLAGS" -fi - -if test x$boost_sleep != xyes; then - AC_MSG_ERROR(No working boost sleep implementation found.) -fi - fi if test x$use_pkgconfig = xyes; then diff --git a/src/bench/examples.cpp b/src/bench/examples.cpp index e4d1708aff..f03eaf4173 100644 --- a/src/bench/examples.cpp +++ b/src/bench/examples.cpp @@ -10,7 +10,7 @@ static void Sleep100ms(benchmark::State& state) { while (state.KeepRunning()) { - MilliSleep(100); + UninterruptibleSleep(std::chrono::milliseconds{100}); } } diff --git a/src/defi-cli.cpp b/src/defi-cli.cpp index 57a8e43041..5e7a40fc8c 100644 --- a/src/defi-cli.cpp +++ b/src/defi-cli.cpp @@ -483,7 +483,7 @@ static int CommandLineRPC(int argc, char *argv[]) } catch (const CConnectionFailed&) { if (fWait) - MilliSleep(1000); + UninterruptibleSleep(std::chrono::milliseconds{1000}); else throw; } diff --git a/src/defid.cpp b/src/defid.cpp index 356253fbdf..c6931ebb4d 100644 --- a/src/defid.cpp +++ b/src/defid.cpp @@ -47,7 +47,7 @@ static void WaitForShutdown() { while (!ShutdownRequested()) { - MilliSleep(200); + UninterruptibleSleep(std::chrono::milliseconds{200}); } Interrupt(); } diff --git a/src/httprpc.cpp b/src/httprpc.cpp index 9fe0ce1e4d..52213d166b 100644 --- a/src/httprpc.cpp +++ b/src/httprpc.cpp @@ -197,7 +197,7 @@ static bool HTTPReq_JSONRPC(HTTPRequest* req, const std::string &) /* Deter brute-forcing If this results in a DoS the user really shouldn't have their RPC port exposed. */ - MilliSleep(250); + UninterruptibleSleep(std::chrono::milliseconds{250}); req->WriteHeader("WWW-Authenticate", WWW_AUTH_HEADER_DATA); req->WriteReply(HTTP_UNAUTHORIZED); diff --git a/src/init.cpp b/src/init.cpp index 0c922d905a..baf1e2d69c 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -236,6 +236,7 @@ void Shutdown(InitInterfaces& interfaces) // After everything has been shut down, but before things get flushed, stop the // CScheduler/checkqueue threadGroup + scheduler.stop(); threadGroup.interrupt_all(); threadGroup.join_all(); diff --git a/src/random.cpp b/src/random.cpp index 5f789d1101..e0f86c7829 100644 --- a/src/random.cpp +++ b/src/random.cpp @@ -542,7 +542,7 @@ static void SeedSleep(CSHA512& hasher, RNGState& rng) SeedTimestamp(hasher); // Sleep for 1ms - MilliSleep(1); + UninterruptibleSleep(std::chrono::milliseconds{1}); // High-precision timestamp after sleeping (as we commit to both the time before and after, this measures the delay) SeedTimestamp(hasher); diff --git a/src/rpc/server.cpp b/src/rpc/server.cpp index 6d237de480..094643b603 100644 --- a/src/rpc/server.cpp +++ b/src/rpc/server.cpp @@ -171,7 +171,7 @@ UniValue stop(const JSONRPCRequest& jsonRequest) // this reply will get back to the client. StartShutdown(); if (jsonRequest.params[0].isNum()) { - MilliSleep(jsonRequest.params[0].get_int()); + UninterruptibleSleep(std::chrono::milliseconds{jsonRequest.params[0].get_int()}); } return "Defi server stopping"; } diff --git a/src/scheduler.cpp b/src/scheduler.cpp index 8c8473bf14..fa84be8d28 100644 --- a/src/scheduler.cpp +++ b/src/scheduler.cpp @@ -20,18 +20,9 @@ CScheduler::~CScheduler() } -#if BOOST_VERSION < 105000 -static boost::system_time toPosixTime(const boost::chrono::system_clock::time_point& t) -{ - // Creating the posix_time using from_time_t loses sub-second precision. So rather than exporting the time_point to time_t, - // start with a posix_time at the epoch (0) and add the milliseconds that have passed since then. - return boost::posix_time::from_time_t(0) + boost::posix_time::milliseconds(boost::chrono::duration_cast(t.time_since_epoch()).count()); -} -#endif - void CScheduler::serviceQueue() { - boost::unique_lock lock(newTaskMutex); + WAIT_LOCK(newTaskMutex, lock); ++nThreadsServicingQueue; // newTaskMutex is locked throughout this loop EXCEPT @@ -40,7 +31,7 @@ void CScheduler::serviceQueue() while (!shouldStop()) { try { if (!shouldStop() && taskQueue.empty()) { - reverse_lock > rlock(lock); + reverse_lock > rlock(lock); // Use this chance to get more entropy RandAddSeedSleep(); } @@ -51,13 +42,14 @@ void CScheduler::serviceQueue() // Wait until either there is a new task, or until // the time of the first item on the queue: - // Some boost versions have a conflicting overload of wait_until that returns void. - // Explicitly use a template here to avoid hitting that overload. + while (!shouldStop() && !taskQueue.empty()) { - boost::chrono::system_clock::time_point timeToWaitFor = taskQueue.begin()->first; - if (newTaskScheduled.wait_until<>(lock, timeToWaitFor) == boost::cv_status::timeout) + std::chrono::system_clock::time_point timeToWaitFor = taskQueue.begin()->first; + if (newTaskScheduled.wait_until(lock, timeToWaitFor) == std::cv_status::timeout) { break; // Exit loop after timeout, it means we reached the time of the event + } } + // If there are multiple threads, the queue can empty while we're waiting (another // thread may service the task we were waiting on). if (shouldStop() || taskQueue.empty()) @@ -69,7 +61,7 @@ void CScheduler::serviceQueue() { // Unlock before calling f, so it can reschedule itself or another task // without deadlocking: - reverse_lock > rlock(lock); + reverse_lock > rlock(lock); f(); } } catch (...) { @@ -84,7 +76,7 @@ void CScheduler::serviceQueue() void CScheduler::stop(bool drain) { { - boost::unique_lock lock(newTaskMutex); + LOCK(newTaskMutex); if (drain) stopWhenEmpty = true; else @@ -93,10 +85,10 @@ void CScheduler::stop(bool drain) newTaskScheduled.notify_all(); } -void CScheduler::schedule(CScheduler::Function f, boost::chrono::system_clock::time_point t) +void CScheduler::schedule(CScheduler::Function f, std::chrono::system_clock::time_point t) { { - boost::unique_lock lock(newTaskMutex); + LOCK(newTaskMutex); taskQueue.insert(std::make_pair(t, f)); } newTaskScheduled.notify_one(); @@ -104,7 +96,7 @@ void CScheduler::schedule(CScheduler::Function f, boost::chrono::system_clock::t void CScheduler::scheduleFromNow(CScheduler::Function f, int64_t deltaMilliSeconds) { - schedule(f, boost::chrono::system_clock::now() + boost::chrono::milliseconds(deltaMilliSeconds)); + schedule(f, std::chrono::system_clock::now() + std::chrono::milliseconds(deltaMilliSeconds)); } static void Repeat(CScheduler* s, CScheduler::Function f, int64_t deltaMilliSeconds) @@ -118,10 +110,10 @@ void CScheduler::scheduleEvery(CScheduler::Function f, int64_t deltaMilliSeconds scheduleFromNow(std::bind(&Repeat, this, f, deltaMilliSeconds), deltaMilliSeconds); } -size_t CScheduler::getQueueInfo(boost::chrono::system_clock::time_point &first, - boost::chrono::system_clock::time_point &last) const +size_t CScheduler::getQueueInfo(std::chrono::system_clock::time_point &first, + std::chrono::system_clock::time_point &last) const { - boost::unique_lock lock(newTaskMutex); + LOCK(newTaskMutex); size_t result = taskQueue.size(); if (!taskQueue.empty()) { first = taskQueue.begin()->first; @@ -131,7 +123,7 @@ size_t CScheduler::getQueueInfo(boost::chrono::system_clock::time_point &first, } bool CScheduler::AreThreadsServicingQueue() const { - boost::unique_lock lock(newTaskMutex); + LOCK(newTaskMutex); return nThreadsServicingQueue; } @@ -145,7 +137,7 @@ void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue() { if (m_are_callbacks_running) return; if (m_callbacks_pending.empty()) return; } - m_pscheduler->schedule(std::bind(&SingleThreadedSchedulerClient::ProcessQueue, this)); + m_pscheduler->schedule(std::bind(&SingleThreadedSchedulerClient::ProcessQueue, this), std::chrono::system_clock::now()); } void SingleThreadedSchedulerClient::ProcessQueue() { diff --git a/src/scheduler.h b/src/scheduler.h index 65cf1a78d8..3c84d79cdb 100644 --- a/src/scheduler.h +++ b/src/scheduler.h @@ -7,11 +7,12 @@ // // NOTE: -// boost::thread / boost::chrono should be ported to std::thread / std::chrono +// boost::thread should be ported to std::thread // when we support C++11. // -#include -#include +#include +#include +#include #include #include @@ -27,8 +28,8 @@ // s->scheduleFromNow(std::bind(Class::func, this, argument), 3); // boost::thread* t = new boost::thread(std::bind(CScheduler::serviceQueue, s)); // -// ... then at program shutdown, clean up the thread running serviceQueue: -// t->interrupt(); +// ... then at program shutdown, make sure to call stop() to clean up the thread(s) running serviceQueue: +// s->stop(); // t->join(); // delete t; // delete s; // Must be done after thread is interrupted/joined. @@ -43,7 +44,7 @@ class CScheduler typedef std::function Function; // Call func at/after time t - void schedule(Function f, boost::chrono::system_clock::time_point t=boost::chrono::system_clock::now()); + void schedule(Function f, std::chrono::system_clock::time_point t); // Convenience method: call f once deltaMilliSeconds from now void scheduleFromNow(Function f, int64_t deltaMilliSeconds); @@ -68,20 +69,20 @@ class CScheduler // Returns number of tasks waiting to be serviced, // and first and last task times - size_t getQueueInfo(boost::chrono::system_clock::time_point &first, - boost::chrono::system_clock::time_point &last) const; + size_t getQueueInfo(std::chrono::system_clock::time_point &first, + std::chrono::system_clock::time_point &last) const; // Returns true if there are threads actively running in serviceQueue() bool AreThreadsServicingQueue() const; private: - std::multimap taskQueue; - boost::condition_variable newTaskScheduled; - mutable boost::mutex newTaskMutex; - int nThreadsServicingQueue; - bool stopRequested; - bool stopWhenEmpty; - bool shouldStop() const { return stopRequested || (stopWhenEmpty && taskQueue.empty()); } + mutable Mutex newTaskMutex; + std::condition_variable newTaskScheduled; + std::multimap taskQueue GUARDED_BY(newTaskMutex); + int nThreadsServicingQueue GUARDED_BY(newTaskMutex); + bool stopRequested GUARDED_BY(newTaskMutex); + bool stopWhenEmpty GUARDED_BY(newTaskMutex); + bool shouldStop() const EXCLUSIVE_LOCKS_REQUIRED(newTaskMutex) { return stopRequested || (stopWhenEmpty && taskQueue.empty()); } }; /** diff --git a/src/test/checkqueue_tests.cpp b/src/test/checkqueue_tests.cpp index 2dbf469f05..1e56d387d1 100644 --- a/src/test/checkqueue_tests.cpp +++ b/src/test/checkqueue_tests.cpp @@ -393,7 +393,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueueControl_Locks) CCheckQueueControl control(queue.get()); // While sleeping, no other thread should execute to this point auto observed = ++nThreads; - MilliSleep(10); + UninterruptibleSleep(std::chrono::milliseconds{10}); fails += observed != nThreads; }); } diff --git a/src/test/reverselock_tests.cpp b/src/test/reverselock_tests.cpp index 5d424df275..212f5a747e 100644 --- a/src/test/reverselock_tests.cpp +++ b/src/test/reverselock_tests.cpp @@ -11,21 +11,50 @@ BOOST_FIXTURE_TEST_SUITE(reverselock_tests, BasicTestingSetup) BOOST_AUTO_TEST_CASE(reverselock_basics) { - boost::mutex mutex; - boost::unique_lock lock(mutex); + Mutex mutex; + std::unique_lock lock(mutex); BOOST_CHECK(lock.owns_lock()); { - reverse_lock > rlock(lock); + reverse_lock> rlock(lock); BOOST_CHECK(!lock.owns_lock()); } BOOST_CHECK(lock.owns_lock()); } +BOOST_AUTO_TEST_CASE(reverselock_multiple) +{ + Mutex mutex2; + Mutex mutex; + std::unique_lock lock2(mutex2); + std::unique_lock lock(mutex); + + // Make sure undoing two locks succeeds + { + reverse_lock> rlock(lock); + BOOST_CHECK(!lock.owns_lock()); + reverse_lock> rlock2(lock2); + BOOST_CHECK(!lock2.owns_lock()); + } + BOOST_CHECK(lock.owns_lock()); + BOOST_CHECK(lock2.owns_lock()); +} + BOOST_AUTO_TEST_CASE(reverselock_errors) { - boost::mutex mutex; - boost::unique_lock lock(mutex); + Mutex mutex2; + Mutex mutex; + std::unique_lock lock2(mutex2); + std::unique_lock lock(mutex); + +#ifdef DEBUG_LOCKORDER + // Make sure trying to reverse lock a previous lock fails + try { + reverse_lock> rlock2(lock2); + BOOST_CHECK(false); // REVERSE_LOCK(lock2) succeeded + } catch(...) { } + BOOST_CHECK(lock2.owns_lock()); +#endif // Make sure trying to reverse lock an unlocked lock fails lock.unlock(); @@ -34,7 +63,7 @@ BOOST_AUTO_TEST_CASE(reverselock_errors) bool failed = false; try { - reverse_lock > rlock(lock); + reverse_lock> rlock(lock); } catch(...) { failed = true; } @@ -49,7 +78,7 @@ BOOST_AUTO_TEST_CASE(reverselock_errors) lock.lock(); BOOST_CHECK(lock.owns_lock()); { - reverse_lock > rlock(lock); + reverse_lock> rlock(lock); BOOST_CHECK(!lock.owns_lock()); } diff --git a/src/test/scheduler_tests.cpp b/src/test/scheduler_tests.cpp index 9a18ad2b3a..8492ba1d46 100644 --- a/src/test/scheduler_tests.cpp +++ b/src/test/scheduler_tests.cpp @@ -4,6 +4,7 @@ #include #include +#include #include @@ -12,31 +13,19 @@ BOOST_AUTO_TEST_SUITE(scheduler_tests) -static void microTask(CScheduler& s, boost::mutex& mutex, int& counter, int delta, boost::chrono::system_clock::time_point rescheduleTime) +static void microTask(CScheduler& s, boost::mutex& mutex, int& counter, int delta, std::chrono::system_clock::time_point rescheduleTime) { { boost::unique_lock lock(mutex); counter += delta; } - boost::chrono::system_clock::time_point noTime = boost::chrono::system_clock::time_point::min(); + std::chrono::system_clock::time_point noTime = std::chrono::system_clock::time_point::min(); if (rescheduleTime != noTime) { CScheduler::Function f = std::bind(µTask, std::ref(s), std::ref(mutex), std::ref(counter), -delta + 1, noTime); s.schedule(f, rescheduleTime); } } -static void MicroSleep(uint64_t n) -{ -#if defined(HAVE_WORKING_BOOST_SLEEP_FOR) - boost::this_thread::sleep_for(boost::chrono::microseconds(n)); -#elif defined(HAVE_WORKING_BOOST_SLEEP) - boost::this_thread::sleep(boost::posix_time::microseconds(n)); -#else - //should never get here - #error missing boost sleep implementation -#endif -} - BOOST_AUTO_TEST_CASE(manythreads) { // Stress test: hundreds of microsecond-scheduled tasks, @@ -58,15 +47,15 @@ BOOST_AUTO_TEST_CASE(manythreads) auto randomMsec = [](FastRandomContext& rc) -> int { return -11 + (int)rc.randrange(1012); }; // [-11, 1000] auto randomDelta = [](FastRandomContext& rc) -> int { return -1000 + (int)rc.randrange(2001); }; // [-1000, 1000] - boost::chrono::system_clock::time_point start = boost::chrono::system_clock::now(); - boost::chrono::system_clock::time_point now = start; - boost::chrono::system_clock::time_point first, last; + std::chrono::system_clock::time_point start = std::chrono::system_clock::now(); + std::chrono::system_clock::time_point now = start; + std::chrono::system_clock::time_point first, last; size_t nTasks = microTasks.getQueueInfo(first, last); BOOST_CHECK(nTasks == 0); for (int i = 0; i < 100; ++i) { - boost::chrono::system_clock::time_point t = now + boost::chrono::microseconds(randomMsec(rng)); - boost::chrono::system_clock::time_point tReschedule = now + boost::chrono::microseconds(500 + randomMsec(rng)); + std::chrono::system_clock::time_point t = now + std::chrono::microseconds(randomMsec(rng)); + std::chrono::system_clock::time_point tReschedule = now + std::chrono::microseconds(500 + randomMsec(rng)); int whichCounter = zeroToNine(rng); CScheduler::Function f = std::bind(µTask, std::ref(microTasks), std::ref(counterMutex[whichCounter]), std::ref(counter[whichCounter]), @@ -83,15 +72,15 @@ BOOST_AUTO_TEST_CASE(manythreads) for (int i = 0; i < 5; i++) microThreads.create_thread(std::bind(&CScheduler::serviceQueue, µTasks)); - MicroSleep(600); - now = boost::chrono::system_clock::now(); + UninterruptibleSleep(std::chrono::microseconds{600}); + now = std::chrono::system_clock::now(); // More threads and more tasks: for (int i = 0; i < 5; i++) microThreads.create_thread(std::bind(&CScheduler::serviceQueue, µTasks)); for (int i = 0; i < 100; i++) { - boost::chrono::system_clock::time_point t = now + boost::chrono::microseconds(randomMsec(rng)); - boost::chrono::system_clock::time_point tReschedule = now + boost::chrono::microseconds(500 + randomMsec(rng)); + std::chrono::system_clock::time_point t = now + std::chrono::microseconds(randomMsec(rng)); + std::chrono::system_clock::time_point tReschedule = now + std::chrono::microseconds(500 + randomMsec(rng)); int whichCounter = zeroToNine(rng); CScheduler::Function f = std::bind(µTask, std::ref(microTasks), std::ref(counterMutex[whichCounter]), std::ref(counter[whichCounter]), diff --git a/src/test/setup_common.cpp b/src/test/setup_common.cpp index affeeeaf4b..ccf7211a63 100644 --- a/src/test/setup_common.cpp +++ b/src/test/setup_common.cpp @@ -143,6 +143,7 @@ TestingSetup::TestingSetup(const std::string& chainName) : BasicTestingSetup(cha TestingSetup::~TestingSetup() { + scheduler.stop(); threadGroup.interrupt_all(); threadGroup.join_all(); GetMainSignals().FlushBackgroundCallbacks(); diff --git a/src/test/util_tests.cpp b/src/test/util_tests.cpp index 279ac355b9..0383fa185d 100644 --- a/src/test/util_tests.cpp +++ b/src/test/util_tests.cpp @@ -1110,7 +1110,7 @@ BOOST_AUTO_TEST_CASE(util_time_GetTime) SetMockTime(111); // Check that mock time does not change after a sleep for (const auto& num_sleep : {0, 1}) { - MilliSleep(num_sleep); + UninterruptibleSleep(std::chrono::milliseconds{num_sleep}); BOOST_CHECK_EQUAL(111, GetTime()); // Deprecated time getter BOOST_CHECK_EQUAL(111, GetTime().count()); BOOST_CHECK_EQUAL(111000, GetTime().count()); @@ -1121,7 +1121,7 @@ BOOST_AUTO_TEST_CASE(util_time_GetTime) // Check that system time changes after a sleep const auto ms_0 = GetTime(); const auto us_0 = GetTime(); - MilliSleep(1); + UninterruptibleSleep(std::chrono::milliseconds{1}); BOOST_CHECK(ms_0 < GetTime()); BOOST_CHECK(us_0 < GetTime()); } diff --git a/src/util/time.cpp b/src/util/time.cpp index 8ebbec4c48..b8882df295 100644 --- a/src/util/time.cpp +++ b/src/util/time.cpp @@ -11,10 +11,13 @@ #include #include -#include #include +#include + #include +void UninterruptibleSleep(const std::chrono::microseconds& n) { std::this_thread::sleep_for(n); } + static std::atomic nMockTime(0); //!< For unit testing int64_t GetTime() @@ -72,24 +75,6 @@ int64_t GetSystemTimeInSeconds() return GetTimeMicros()/1000000; } -void MilliSleep(int64_t n) -{ - -/** - * Boost's sleep_for was uninterruptible when backed by nanosleep from 1.50 - * until fixed in 1.52. Use the deprecated sleep method for the broken case. - * See: https://svn.boost.org/trac/boost/ticket/7238 - */ -#if defined(HAVE_WORKING_BOOST_SLEEP_FOR) - boost::this_thread::sleep_for(boost::chrono::milliseconds(n)); -#elif defined(HAVE_WORKING_BOOST_SLEEP) - boost::this_thread::sleep(boost::posix_time::milliseconds(n)); -#else -//should never get here -#error missing boost sleep implementation -#endif -} - std::string FormatISO8601DateTime(int64_t nTime) { struct tm ts; time_t time_val = nTime; diff --git a/src/util/time.h b/src/util/time.h index 9c1da50e65..63adbef464 100644 --- a/src/util/time.h +++ b/src/util/time.h @@ -10,6 +10,8 @@ #include #include +void UninterruptibleSleep(const std::chrono::microseconds& n); + /** * DEPRECATED * Use either GetSystemTimeInSeconds (not mockable) or GetTime (mockable) @@ -28,8 +30,6 @@ void SetMockTime(int64_t nMockTimeIn); /** For testing */ int64_t GetMockTime(); -void MilliSleep(int64_t n); - /** Return system time (or mocked time, if set) */ template T GetTime(); diff --git a/src/wallet/db.cpp b/src/wallet/db.cpp index fd018564fe..42d8d5aa2e 100644 --- a/src/wallet/db.cpp +++ b/src/wallet/db.cpp @@ -759,7 +759,7 @@ bool BerkeleyBatch::Rewrite(BerkeleyDatabase& database, const char* pszSkip) return fSuccess; } } - MilliSleep(100); + UninterruptibleSleep(std::chrono::milliseconds{100}); } } @@ -891,7 +891,7 @@ bool BerkeleyDatabase::Backup(const std::string& strDest) } } } - MilliSleep(100); + UninterruptibleSleep(std::chrono::milliseconds{100}); } } diff --git a/test/lint/lint-includes.sh b/test/lint/lint-includes.sh index 99ad13f29a..9c3026c64c 100755 --- a/test/lint/lint-includes.sh +++ b/test/lint/lint-includes.sh @@ -56,7 +56,6 @@ EXPECTED_BOOST_INCLUDES=( boost/algorithm/string/replace.hpp boost/algorithm/string/split.hpp boost/asio.hpp - boost/chrono/chrono.hpp boost/circular_buffer.hpp boost/date_time/posix_time/posix_time.hpp boost/filesystem.hpp From 64eec2560817956c4d349360707d0bdf601b8ccf Mon Sep 17 00:00:00 2001 From: Peter Bushnell Date: Wed, 8 Sep 2021 15:45:21 +0100 Subject: [PATCH 02/13] Remove Boost mutex --- src/bench/checkqueue.cpp | 11 ++--- src/checkqueue.h | 88 +++++++++++++++++++++++----------- src/init.cpp | 31 +++++++----- src/test/checkqueue_tests.cpp | 58 ++++++---------------- src/test/setup_common.cpp | 7 +-- src/test/transaction_tests.cpp | 7 +-- src/validation.cpp | 17 ++++--- src/validation.h | 15 ++++-- test/lint/lint-includes.sh | 1 - 9 files changed, 122 insertions(+), 113 deletions(-) diff --git a/src/bench/checkqueue.cpp b/src/bench/checkqueue.cpp index 684b45f654..ac4551bd8d 100644 --- a/src/bench/checkqueue.cpp +++ b/src/bench/checkqueue.cpp @@ -6,10 +6,9 @@ #include #include #include -#include -#include #include +#include static const int MIN_CORES = 2; static const size_t BATCHES = 101; @@ -36,10 +35,7 @@ static void CCheckQueueSpeedPrevectorJob(benchmark::State& state) void swap(PrevectorJob& x){p.swap(x.p);}; }; CCheckQueue queue {QUEUE_BATCH_SIZE}; - boost::thread_group tg; - for (auto x = 0; x < std::max(MIN_CORES, GetNumCores()); ++x) { - tg.create_thread([&]{queue.Thread();}); - } + queue.StartWorkerThreads(GetNumCores() - 1); while (state.KeepRunning()) { // Make insecure_rand here so that each iteration is identical. FastRandomContext insecure_rand(true); @@ -55,7 +51,6 @@ static void CCheckQueueSpeedPrevectorJob(benchmark::State& state) // it is done explicitly here for clarity control.Wait(); } - tg.interrupt_all(); - tg.join_all(); + queue.StopWorkerThreads(); } BENCHMARK(CCheckQueueSpeedPrevectorJob, 1400); diff --git a/src/checkqueue.h b/src/checkqueue.h index 4cdc74e565..19599d556b 100644 --- a/src/checkqueue.h +++ b/src/checkqueue.h @@ -6,13 +6,12 @@ #define DEFI_CHECKQUEUE_H #include +#include +#include #include #include -#include -#include - template class CCheckQueueControl; @@ -31,67 +30,69 @@ class CCheckQueue { private: //! Mutex to protect the inner state - boost::mutex mutex; + Mutex m_mutex; //! Worker threads block on this when out of work - boost::condition_variable condWorker; + std::condition_variable m_worker_cv; //! Master thread blocks on this when out of work - boost::condition_variable condMaster; + std::condition_variable m_master_cv; //! The queue of elements to be processed. //! As the order of booleans doesn't matter, it is used as a LIFO (stack) - std::vector queue; + std::vector queue GUARDED_BY(m_mutex); //! The number of workers (including the master) that are idle. - int nIdle; + int nIdle GUARDED_BY(m_mutex){0}; //! The total number of workers (including the master). - int nTotal; + int nTotal GUARDED_BY(m_mutex){0}; //! The temporary evaluation result. - bool fAllOk; + bool fAllOk GUARDED_BY(m_mutex){true}; /** * Number of verifications that haven't completed yet. * This includes elements that are no longer queued, but still in the * worker's own batches. */ - unsigned int nTodo; + unsigned int nTodo GUARDED_BY(m_mutex){0}; //! The maximum number of elements to be processed in one batch - unsigned int nBatchSize; + const unsigned int nBatchSize; + + std::vector m_worker_threads; + bool m_request_stop GUARDED_BY(m_mutex){false}; /** Internal function that does bulk of the verification work. */ - bool Loop(bool fMaster = false) + bool Loop(bool fMaster) { - boost::condition_variable& cond = fMaster ? condMaster : condWorker; + std::condition_variable& cond = fMaster ? m_master_cv : m_worker_cv; std::vector vChecks; vChecks.reserve(nBatchSize); unsigned int nNow = 0; bool fOk = true; do { { - boost::unique_lock lock(mutex); + WAIT_LOCK(m_mutex, lock); // first do the clean-up of the previous loop run (allowing us to do it in the same critsect) if (nNow) { fAllOk &= fOk; nTodo -= nNow; if (nTodo == 0 && !fMaster) // We processed the last element; inform the master it can exit and return the result - condMaster.notify_one(); + m_master_cv.notify_one(); } else { // first iteration nTotal++; } // logically, the do loop starts here - while (queue.empty()) { + while (queue.empty() && !m_request_stop) { if (fMaster && nTodo == 0) { nTotal--; bool fRet = fAllOk; // reset the status for new work later - if (fMaster) - fAllOk = true; + fAllOk = true; // return the current status return fRet; } @@ -99,6 +100,10 @@ class CCheckQueue cond.wait(lock); // wait nIdle--; } + if (m_request_stop) { + return false; + } + // Decide how many work units to process now. // * Do not try to do everything at once, but aim for increasingly smaller batches so // all workers finish approximately simultaneously. @@ -125,15 +130,27 @@ class CCheckQueue public: //! Mutex to ensure only one concurrent CCheckQueueControl - boost::mutex ControlMutex; + Mutex m_control_mutex; //! Create a new check queue - explicit CCheckQueue(unsigned int nBatchSizeIn) : nIdle(0), nTotal(0), fAllOk(true), nTodo(0), nBatchSize(nBatchSizeIn) {} + explicit CCheckQueue(unsigned int nBatchSizeIn) : nBatchSize(nBatchSizeIn) {} - //! Worker thread - void Thread() + //! Create a pool of new worker threads. + void StartWorkerThreads(const int threads_num) { - Loop(); + { + LOCK(m_mutex); + nIdle = 0; + nTotal = 0; + fAllOk = true; + } + assert(m_worker_threads.empty()); + for (int n = 0; n < threads_num; ++n) { + m_worker_threads.emplace_back([this, n]() { + util::ThreadRename(strprintf("scriptch.%i", n)); + Loop(false /* worker thread */); + }); + } } //! Wait until execution finishes, and return whether all evaluations were successful. @@ -145,20 +162,33 @@ class CCheckQueue //! Add a batch of checks to the queue void Add(std::vector& vChecks) { - boost::unique_lock lock(mutex); + LOCK(m_mutex); for (T& check : vChecks) { queue.push_back(T()); check.swap(queue.back()); } nTodo += vChecks.size(); if (vChecks.size() == 1) - condWorker.notify_one(); + m_worker_cv.notify_one(); else if (vChecks.size() > 1) - condWorker.notify_all(); + m_worker_cv.notify_all(); + } + + //! Stop all of the worker threads. + void StopWorkerThreads() + { + WITH_LOCK(m_mutex, m_request_stop = true); + m_worker_cv.notify_all(); + for (std::thread& t : m_worker_threads) { + t.join(); + } + m_worker_threads.clear(); + WITH_LOCK(m_mutex, m_request_stop = false); } ~CCheckQueue() { + assert(m_worker_threads.empty()); } }; @@ -182,7 +212,7 @@ class CCheckQueueControl { // passed queue is supposed to be unused, or nullptr if (pqueue != nullptr) { - ENTER_CRITICAL_SECTION(pqueue->ControlMutex); + ENTER_CRITICAL_SECTION(pqueue->m_control_mutex); } } @@ -206,7 +236,7 @@ class CCheckQueueControl if (!fDone) Wait(); if (pqueue != nullptr) { - LEAVE_CRITICAL_SECTION(pqueue->ControlMutex); + LEAVE_CRITICAL_SECTION(pqueue->m_control_mutex); } } }; diff --git a/src/init.cpp b/src/init.cpp index baf1e2d69c..0c672b4373 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -239,6 +239,7 @@ void Shutdown(InitInterfaces& interfaces) scheduler.stop(); threadGroup.interrupt_all(); threadGroup.join_all(); + StopScriptCheckWorkerThreads(); // After the threads that potentially access these pointers have been stopped, // destruct and reset all to nullptr. @@ -1164,15 +1165,6 @@ bool AppInitParameterInteraction() incrementalRelayFee = CFeeRate(n); } - // -par=0 means autodetect, but nScriptCheckThreads==0 means no concurrency - nScriptCheckThreads = gArgs.GetArg("-par", DEFAULT_SCRIPTCHECK_THREADS); - if (nScriptCheckThreads <= 0) - nScriptCheckThreads += GetNumCores(); - if (nScriptCheckThreads <= 1) - nScriptCheckThreads = 0; - else if (nScriptCheckThreads > MAX_SCRIPTCHECK_THREADS) - nScriptCheckThreads = MAX_SCRIPTCHECK_THREADS; - // block pruning; get the amount of disk space (in MiB) to allot for block & undo files int64_t nPruneArg = gArgs.GetArg("-prune", 0); if (nPruneArg < 0) { @@ -1410,10 +1402,23 @@ bool AppInitMain(InitInterfaces& interfaces) InitSignatureCache(); InitScriptExecutionCache(); - LogPrintf("Using %u threads for script verification\n", nScriptCheckThreads); - if (nScriptCheckThreads) { - for (int i=0; i= 1) { + g_parallel_script_checks = true; + StartScriptCheckWorkerThreads(script_threads); } // Start the lightweight task scheduler thread diff --git a/src/test/checkqueue_tests.cpp b/src/test/checkqueue_tests.cpp index 1e56d387d1..54346698b3 100644 --- a/src/test/checkqueue_tests.cpp +++ b/src/test/checkqueue_tests.cpp @@ -4,7 +4,6 @@ #include #include -#include #include #include @@ -18,11 +17,10 @@ #include -// BasicTestingSetup not sufficient because nScriptCheckThreads is not set -// otherwise. BOOST_FIXTURE_TEST_SUITE(checkqueue_tests, TestingSetup) static const unsigned int QUEUE_BATCH_SIZE = 128; +static const int SCRIPT_CHECK_THREADS = 3; struct FakeCheck { bool operator()() @@ -147,10 +145,7 @@ typedef CCheckQueue FrozenCleanup_Queue; static void Correct_Queue_range(std::vector range) { auto small_queue = std::make_unique(QUEUE_BATCH_SIZE); - boost::thread_group tg; - for (auto x = 0; x < nScriptCheckThreads; ++x) { - tg.create_thread([&]{small_queue->Thread();}); - } + small_queue->StartWorkerThreads(SCRIPT_CHECK_THREADS); // Make vChecks here to save on malloc (this test can be slow...) std::vector vChecks; for (const size_t i : range) { @@ -167,8 +162,7 @@ static void Correct_Queue_range(std::vector range) BOOST_REQUIRE_EQUAL(FakeCheckCheckCompletion::n_calls, i); } } - tg.interrupt_all(); - tg.join_all(); + small_queue->StopWorkerThreads(); } /** Test that 0 checks is correct @@ -211,11 +205,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_Correct_Random) BOOST_AUTO_TEST_CASE(test_CheckQueue_Catches_Failure) { auto fail_queue = std::make_unique(QUEUE_BATCH_SIZE); - - boost::thread_group tg; - for (auto x = 0; x < nScriptCheckThreads; ++x) { - tg.create_thread([&]{fail_queue->Thread();}); - } + fail_queue->StartWorkerThreads(SCRIPT_CHECK_THREADS); for (size_t i = 0; i < 1001; ++i) { CCheckQueueControl control(fail_queue.get()); @@ -236,18 +226,14 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_Catches_Failure) BOOST_REQUIRE(success); } } - tg.interrupt_all(); - tg.join_all(); + fail_queue->StopWorkerThreads(); } // Test that a block validation which fails does not interfere with // future blocks, ie, the bad state is cleared. BOOST_AUTO_TEST_CASE(test_CheckQueue_Recovers_From_Failure) { auto fail_queue = std::make_unique(QUEUE_BATCH_SIZE); - boost::thread_group tg; - for (auto x = 0; x < nScriptCheckThreads; ++x) { - tg.create_thread([&]{fail_queue->Thread();}); - } + fail_queue->StartWorkerThreads(SCRIPT_CHECK_THREADS); for (auto times = 0; times < 10; ++times) { for (const bool end_fails : {true, false}) { @@ -262,8 +248,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_Recovers_From_Failure) BOOST_REQUIRE(r != end_fails); } } - tg.interrupt_all(); - tg.join_all(); + fail_queue->StopWorkerThreads(); } // Test that unique checks are actually all called individually, rather than @@ -272,11 +257,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_Recovers_From_Failure) BOOST_AUTO_TEST_CASE(test_CheckQueue_UniqueCheck) { auto queue = std::make_unique(QUEUE_BATCH_SIZE); - boost::thread_group tg; - for (auto x = 0; x < nScriptCheckThreads; ++x) { - tg.create_thread([&]{queue->Thread();}); - - } + queue->StartWorkerThreads(SCRIPT_CHECK_THREADS); size_t COUNT = 100000; size_t total = COUNT; @@ -295,8 +276,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_UniqueCheck) for (size_t i = 0; i < COUNT; ++i) r = r && UniqueCheck::results.count(i) == 1; BOOST_REQUIRE(r); - tg.interrupt_all(); - tg.join_all(); + queue->StopWorkerThreads(); } @@ -308,10 +288,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_UniqueCheck) BOOST_AUTO_TEST_CASE(test_CheckQueue_Memory) { auto queue = std::make_unique(QUEUE_BATCH_SIZE); - boost::thread_group tg; - for (auto x = 0; x < nScriptCheckThreads; ++x) { - tg.create_thread([&]{queue->Thread();}); - } + queue->StartWorkerThreads(SCRIPT_CHECK_THREADS); for (size_t i = 0; i < 1000; ++i) { size_t total = i; { @@ -330,8 +307,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_Memory) } BOOST_REQUIRE_EQUAL(MemoryCheck::fake_allocated_memory, 0U); } - tg.interrupt_all(); - tg.join_all(); + queue->StopWorkerThreads(); } // Test that a new verification cannot occur until all checks @@ -339,11 +315,8 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_Memory) BOOST_AUTO_TEST_CASE(test_CheckQueue_FrozenCleanup) { auto queue = std::make_unique(QUEUE_BATCH_SIZE); - boost::thread_group tg; bool fails = false; - for (auto x = 0; x < nScriptCheckThreads; ++x) { - tg.create_thread([&]{queue->Thread();}); - } + queue->StartWorkerThreads(SCRIPT_CHECK_THREADS); std::thread t0([&]() { CCheckQueueControl control(queue.get()); std::vector vChecks(1); @@ -362,7 +335,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_FrozenCleanup) } // Try to get control of the queue a bunch of times for (auto x = 0; x < 100 && !fails; ++x) { - fails = queue->ControlMutex.try_lock(); + fails = queue->m_control_mutex.try_lock(); } { // Unfreeze (we need lock n case of spurious wakeup) @@ -373,9 +346,8 @@ BOOST_AUTO_TEST_CASE(test_CheckQueue_FrozenCleanup) FrozenCleanupCheck::cv.notify_one(); // Wait for control to finish t0.join(); - tg.interrupt_all(); - tg.join_all(); BOOST_REQUIRE(!fails); + queue->StopWorkerThreads(); } @@ -426,7 +398,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueueControl_Locks) cv.wait(l, [&](){return has_lock;}); bool fails = false; for (auto x = 0; x < 100 && !fails; ++x) { - fails = queue->ControlMutex.try_lock(); + fails = queue->m_control_mutex.try_lock(); } has_tried = true; cv.notify_one(); diff --git a/src/test/setup_common.cpp b/src/test/setup_common.cpp index ccf7211a63..2066f7dc7e 100644 --- a/src/test/setup_common.cpp +++ b/src/test/setup_common.cpp @@ -133,9 +133,9 @@ TestingSetup::TestingSetup(const std::string& chainName) : BasicTestingSetup(cha throw std::runtime_error(strprintf("ActivateBestChain failed. (%s)", FormatStateMessage(state))); } - nScriptCheckThreads = 3; - for (int i = 0; i < nScriptCheckThreads - 1; i++) - threadGroup.create_thread([i]() { return ThreadScriptCheck(i); }); + constexpr int script_check_threads = 2; + StartScriptCheckWorkerThreads(script_check_threads); + g_parallel_script_checks = true; g_banman = std::make_unique(GetDataDir() / "banlist.dat", nullptr, DEFAULT_MISBEHAVING_BANTIME); g_connman = std::make_unique(0x1337, 0x1337); // Deterministic randomness for tests. @@ -146,6 +146,7 @@ TestingSetup::~TestingSetup() scheduler.stop(); threadGroup.interrupt_all(); threadGroup.join_all(); + StopScriptCheckWorkerThreads(); GetMainSignals().FlushBackgroundCallbacks(); GetMainSignals().UnregisterBackgroundSignalScheduler(); g_connman.reset(); diff --git a/src/test/transaction_tests.cpp b/src/test/transaction_tests.cpp index 039d6811fc..c1fda18ec9 100644 --- a/src/test/transaction_tests.cpp +++ b/src/test/transaction_tests.cpp @@ -464,12 +464,10 @@ BOOST_AUTO_TEST_CASE(test_big_witness_transaction) // check all inputs concurrently, with the cache PrecomputedTransactionData txdata(tx); - boost::thread_group threadGroup; CCheckQueue scriptcheckqueue(128); CCheckQueueControl control(&scriptcheckqueue); - for (int i=0; i<20; i++) - threadGroup.create_thread(std::bind(&CCheckQueue::Thread, std::ref(scriptcheckqueue))); + scriptcheckqueue.StartWorkerThreads(20); std::vector coins; for(uint32_t i = 0; i < mtx.vin.size(); i++) { @@ -492,8 +490,7 @@ BOOST_AUTO_TEST_CASE(test_big_witness_transaction) bool controlCheck = control.Wait(); assert(controlCheck); - threadGroup.interrupt_all(); - threadGroup.join_all(); + scriptcheckqueue.StopWorkerThreads(); } SignatureData CombineSignatures(const CMutableTransaction& input1, const CMutableTransaction& input2, const CTransactionRef tx) diff --git a/src/validation.cpp b/src/validation.cpp index cc7c089ff5..5733e63ee9 100644 --- a/src/validation.cpp +++ b/src/validation.cpp @@ -130,7 +130,7 @@ CBlockIndex *pindexBestHeader = nullptr; Mutex g_best_block_mutex; std::condition_variable g_best_block_cv; uint256 g_best_block; -int nScriptCheckThreads = 0; +bool g_parallel_script_checks{false}; std::atomic_bool fImporting(false); std::atomic_bool fReindex(false); bool fHavePruned = false; @@ -1936,9 +1936,14 @@ static bool WriteUndoDataForBlock(const CBlockUndo& blockundo, CValidationState& static CCheckQueue scriptcheckqueue(128); -void ThreadScriptCheck(int worker_num) { - util::ThreadRename(strprintf("scriptch.%i", worker_num)); - scriptcheckqueue.Thread(); +void StartScriptCheckWorkerThreads(int threads_num) +{ + scriptcheckqueue.StartWorkerThreads(threads_num); +} + +void StopScriptCheckWorkerThreads() +{ + scriptcheckqueue.StopWorkerThreads(); } VersionBitsCache versionbitscache GUARDED_BY(cs_main); @@ -2715,7 +2720,7 @@ bool CChainState::ConnectBlock(const CBlock& block, CValidationState& state, CBl CBlockUndo blockundo; - CCheckQueueControl control(fScriptChecks && nScriptCheckThreads ? &scriptcheckqueue : nullptr); + CCheckQueueControl control(fScriptChecks && g_parallel_script_checks ? &scriptcheckqueue : nullptr); std::vector> writeBurnEntries; std::vector prevheights; @@ -2784,7 +2789,7 @@ bool CChainState::ConnectBlock(const CBlock& block, CValidationState& state, CBl { std::vector vChecks; bool fCacheResults = fJustCheck; /* Don't cache results if we're actually connecting blocks (still consult the cache, though) */ - if (!CheckInputs(tx, state, view, fScriptChecks, flags, fCacheResults, fCacheResults, txdata[i], nScriptCheckThreads ? &vChecks : nullptr)) { + if (!CheckInputs(tx, state, view, fScriptChecks, flags, fCacheResults, fCacheResults, txdata[i], g_parallel_script_checks ? &vChecks : nullptr)) { if (state.GetReason() == ValidationInvalidReason::TX_NOT_STANDARD) { // CheckInputs may return NOT_STANDARD for extra flags we passed, // but we can't return that, as it's not defined for a block, so diff --git a/src/validation.h b/src/validation.h index 72c1c2bf6a..b059d11723 100644 --- a/src/validation.h +++ b/src/validation.h @@ -84,8 +84,8 @@ static const unsigned int BLOCKFILE_CHUNK_SIZE = 0x2000000; // 32 MiB /** The pre-allocation chunk size for rev?????.dat files (since 0.8) */ static const unsigned int UNDOFILE_CHUNK_SIZE = 0x200000; // 2 MiB -/** Maximum number of script-checking threads allowed */ -static const int MAX_SCRIPTCHECK_THREADS = 16; +/** Maximum number of dedicated script-checking threads allowed */ +static const int MAX_SCRIPTCHECK_THREADS = 15; /** -par default (number of script-checking threads, 0 = auto) */ static const int DEFAULT_SCRIPTCHECK_THREADS = 0; /** Number of blocks that can be requested at any given time from a single peer. */ @@ -155,9 +155,12 @@ typedef std::unordered_map BlockMap; extern Mutex g_best_block_mutex; extern std::condition_variable g_best_block_cv; extern uint256 g_best_block; +/** Whether there are dedicated script-checking threads running. + * False indicates all script checking is done on the main threadMessageHandler thread. + */ +extern bool g_parallel_script_checks; extern std::atomic_bool fImporting; extern std::atomic_bool fReindex; -extern int nScriptCheckThreads; extern bool fRequireStandard; extern bool fCheckBlockIndex; @@ -269,8 +272,10 @@ bool LoadBlockIndex(const CChainParams& chainparams) EXCLUSIVE_LOCKS_REQUIRED(cs bool LoadChainTip(const CChainParams& chainparams) EXCLUSIVE_LOCKS_REQUIRED(cs_main); /** Unload database information */ void UnloadBlockIndex(); -/** Run an instance of the script checking thread */ -void ThreadScriptCheck(int worker_num); +/** Run instances of script checking worker threads */ +void StartScriptCheckWorkerThreads(int threads_num); +/** Stop all of the script checking worker threads */ +void StopScriptCheckWorkerThreads(); /** Retrieve a transaction (from memory pool, or from disk, if possible) */ bool GetTransaction(const uint256& hash, CTransactionRef& tx, const Consensus::Params& params, uint256& hashBlock, const CBlockIndex* const blockIndex = nullptr); /** diff --git a/test/lint/lint-includes.sh b/test/lint/lint-includes.sh index 9c3026c64c..b72ee4a844 100755 --- a/test/lint/lint-includes.sh +++ b/test/lint/lint-includes.sh @@ -79,7 +79,6 @@ EXPECTED_BOOST_INCLUDES=( boost/test/unit_test.hpp boost/thread.hpp boost/thread/condition_variable.hpp - boost/thread/mutex.hpp boost/thread/thread.hpp ) From 878b876d174992bde3a35e8e57d6261d45e18622 Mon Sep 17 00:00:00 2001 From: Peter Bushnell Date: Thu, 9 Sep 2021 10:16:52 +0100 Subject: [PATCH 03/13] Remove Boost thread_group --- src/addrman.cpp | 3 ++- src/init.cpp | 32 ++++++++++++++------------------ src/scheduler.cpp | 13 ------------- src/scheduler.h | 27 +++++++++++++++++---------- src/script/sigcache.cpp | 4 +++- src/test/checkqueue_tests.cpp | 16 ++++++++++------ src/test/cuckoocache_tests.cpp | 2 ++ src/test/scheduler_tests.cpp | 34 +++++++++++++++++++++------------- src/test/setup_common.cpp | 4 +--- src/test/setup_common.h | 4 +--- src/util/system.h | 7 ------- test/lint/lint-includes.sh | 1 - 12 files changed, 71 insertions(+), 76 deletions(-) diff --git a/src/addrman.cpp b/src/addrman.cpp index 96252bd93f..e82e0dfc32 100644 --- a/src/addrman.cpp +++ b/src/addrman.cpp @@ -5,6 +5,7 @@ #include +#include #include #include @@ -59,7 +60,7 @@ double CAddrInfo::GetChance(int64_t nNow) const fChance *= 0.01; // deprioritize 66% after each failed attempt, but at most 1/28th to avoid the search taking forever or overly penalizing outages. - fChance *= pow(0.66, std::min(nAttempts, 8)); + fChance *= std::pow(0.66, std::min(nAttempts, 8)); return fChance; } diff --git a/src/init.cpp b/src/init.cpp index 0c672b4373..f8874ae59d 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -9,7 +9,6 @@ #include -#include #include #include #include @@ -22,7 +21,6 @@ #include #include #include -#include #include #include #include @@ -74,10 +72,7 @@ #include #endif -#include #include -#include -#include #if ENABLE_ZMQ #include @@ -160,7 +155,7 @@ NODISCARD static bool CreatePidFile() static std::unique_ptr globalVerifyHandle; -static boost::thread_group threadGroup; +std::vector threadGroup; static CScheduler scheduler; #if HAVE_SYSTEM @@ -235,10 +230,11 @@ void Shutdown(InitInterfaces& interfaces) StopTorControl(); // After everything has been shut down, but before things get flushed, stop the - // CScheduler/checkqueue threadGroup + // CScheduler/checkqueue threaGroup scheduler.stop(); - threadGroup.interrupt_all(); - threadGroup.join_all(); + for (auto& thread : threadGroup) { + if (thread.joinable()) thread.join(); + } StopScriptCheckWorkerThreads(); // After the threads that potentially access these pointers have been stopped, @@ -1423,7 +1419,7 @@ bool AppInitMain(InitInterfaces& interfaces) // Start the lightweight task scheduler thread CScheduler::Function serviceLoop = std::bind(&CScheduler::serviceQueue, &scheduler); - threadGroup.create_thread(std::bind(&TraceThread, "scheduler", serviceLoop)); + scheduler.m_service_thread = std::thread([&] { TraceThread("scheduler", [&] { scheduler.serviceQueue(); }); }); GetMainSignals().RegisterBackgroundSignalScheduler(scheduler); GetMainSignals().RegisterWithMempoolSignals(mempool); @@ -1973,7 +1969,9 @@ bool AppInitMain(InitInterfaces& interfaces) vImportFiles.push_back(strFile); } - threadGroup.create_thread(std::bind(&ThreadImport, vImportFiles)); + threadGroup.emplace_back(ThreadImport, vImportFiles); + + scheduler.m_service_thread = std::thread([&] { TraceThread("scheduler", [&] { scheduler.serviceQueue(); }); }); // Wait for genesis block to be processed { @@ -2180,13 +2178,11 @@ bool AppInitMain(InitInterfaces& interfaces) } // Mint proof-of-stake blocks in background - threadGroup.create_thread( - std::bind(TraceThread>, "CoinStaker", [=]() { - // Run ThreadStaker - pos::ThreadStaker threadStaker; - threadStaker(std::move(stakersParams), std::move(chainparams)); - } - )); + threadGroup.emplace_back(TraceThread>, "CoinStaker", [=]() { + // Run ThreadStaker + pos::ThreadStaker threadStaker; + threadStaker(stakersParams, chainparams); + }); } return true; diff --git a/src/scheduler.cpp b/src/scheduler.cpp index fa84be8d28..c6b118d726 100644 --- a/src/scheduler.cpp +++ b/src/scheduler.cpp @@ -31,7 +31,6 @@ void CScheduler::serviceQueue() while (!shouldStop()) { try { if (!shouldStop() && taskQueue.empty()) { - reverse_lock > rlock(lock); // Use this chance to get more entropy RandAddSeedSleep(); } @@ -73,18 +72,6 @@ void CScheduler::serviceQueue() newTaskScheduled.notify_one(); } -void CScheduler::stop(bool drain) -{ - { - LOCK(newTaskMutex); - if (drain) - stopWhenEmpty = true; - else - stopRequested = true; - } - newTaskScheduled.notify_all(); -} - void CScheduler::schedule(CScheduler::Function f, std::chrono::system_clock::time_point t) { { diff --git a/src/scheduler.h b/src/scheduler.h index 3c84d79cdb..8f2873fece 100644 --- a/src/scheduler.h +++ b/src/scheduler.h @@ -5,15 +5,11 @@ #ifndef DEFI_SCHEDULER_H #define DEFI_SCHEDULER_H -// -// NOTE: -// boost::thread should be ported to std::thread -// when we support C++11. -// #include #include #include #include +#include #include @@ -41,6 +37,8 @@ class CScheduler CScheduler(); ~CScheduler(); + std::thread m_service_thread; + typedef std::function Function; // Call func at/after time t @@ -59,13 +57,22 @@ class CScheduler // To keep things as simple as possible, there is no unschedule. // Services the queue 'forever'. Should be run in a thread, - // and interrupted using boost::interrupt_thread void serviceQueue(); - // Tell any threads running serviceQueue to stop as soon as they're - // done servicing whatever task they're currently servicing (drain=false) - // or when there is no work left to be done (drain=true) - void stop(bool drain=false); + /** Tell any threads running serviceQueue to stop as soon as the current task is done */ + void stop() + { + WITH_LOCK(newTaskMutex, stopRequested = true); + newTaskScheduled.notify_all(); + if (m_service_thread.joinable()) m_service_thread.join(); + } + /** Tell any threads running serviceQueue to stop when there is no work left to be done */ + void StopWhenDrained() + { + WITH_LOCK(newTaskMutex, stopWhenEmpty = true); + newTaskScheduled.notify_all(); + if (m_service_thread.joinable()) m_service_thread.join(); + } // Returns number of tasks waiting to be serviced, // and first and last task times diff --git a/src/script/sigcache.cpp b/src/script/sigcache.cpp index 3d42f99618..a8f9f196ec 100644 --- a/src/script/sigcache.cpp +++ b/src/script/sigcache.cpp @@ -11,7 +11,9 @@ #include #include -#include + +#include +#include namespace { /** diff --git a/src/test/checkqueue_tests.cpp b/src/test/checkqueue_tests.cpp index 54346698b3..b88cfaef10 100644 --- a/src/test/checkqueue_tests.cpp +++ b/src/test/checkqueue_tests.cpp @@ -356,11 +356,11 @@ BOOST_AUTO_TEST_CASE(test_CheckQueueControl_Locks) { auto queue = std::make_unique(QUEUE_BATCH_SIZE); { - boost::thread_group tg; + std::vector tg; std::atomic nThreads {0}; std::atomic fails {0}; for (size_t i = 0; i < 3; ++i) { - tg.create_thread( + tg.emplace_back( [&]{ CCheckQueueControl control(queue.get()); // While sleeping, no other thread should execute to this point @@ -369,11 +369,13 @@ BOOST_AUTO_TEST_CASE(test_CheckQueueControl_Locks) fails += observed != nThreads; }); } - tg.join_all(); + for (auto& thread: tg) { + if (thread.joinable()) thread.join(); + } BOOST_REQUIRE_EQUAL(fails, 0); } { - boost::thread_group tg; + std::vector tg; std::mutex m; std::condition_variable cv; bool has_lock{false}; @@ -382,7 +384,7 @@ BOOST_AUTO_TEST_CASE(test_CheckQueueControl_Locks) bool done_ack{false}; { std::unique_lock l(m); - tg.create_thread([&]{ + tg.emplace_back([&]{ CCheckQueueControl control(queue.get()); std::unique_lock ll(m); has_lock = true; @@ -408,7 +410,9 @@ BOOST_AUTO_TEST_CASE(test_CheckQueueControl_Locks) cv.notify_one(); BOOST_REQUIRE(!fails); } - tg.join_all(); + for (auto& thread: tg) { + if (thread.joinable()) thread.join(); + } } } BOOST_AUTO_TEST_SUITE_END() diff --git a/src/test/cuckoocache_tests.cpp b/src/test/cuckoocache_tests.cpp index 67236344de..b75c7b593a 100644 --- a/src/test/cuckoocache_tests.cpp +++ b/src/test/cuckoocache_tests.cpp @@ -2,6 +2,8 @@ // Distributed under the MIT software license, see the accompanying // file LICENSE or http://www.opensource.org/licenses/mit-license.php. #include +#include +#include #include #include