From 3afd9cf113e327f21da1093a4269f691a0e6acd5 Mon Sep 17 00:00:00 2001 From: dutor <440396+dutor@users.noreply.github.com> Date: Fri, 31 Aug 2018 02:45:52 +0800 Subject: [PATCH 1/2] [Feature] Added some concurrent utilities, GenericThreadPool, etc. --- .gitignore | 2 + CMakeLists.txt | 2 + common/CMakeLists.txt | 3 +- common/concurrent/CMakeLists.txt | 7 + common/concurrent/sync/Barrier.cpp | 37 +++ common/concurrent/sync/Barrier.h | 54 +++++ common/concurrent/sync/Latch.cpp | 50 ++++ common/concurrent/sync/Latch.h | 67 ++++++ common/concurrent/test/BarrierTest.cpp | 96 ++++++++ common/concurrent/test/CMakeLists.txt | 9 + .../concurrent/test/GenericThreadPoolTest.cpp | 152 ++++++++++++ common/concurrent/test/GenericWorkerTest.cpp | 152 ++++++++++++ common/concurrent/test/LatchTest.cpp | 110 +++++++++ common/concurrent/test/ThreadLocalPtrTest.cpp | 113 +++++++++ common/concurrent/test/ThreadTest.cpp | 35 +++ .../concurrent/thread/GenericThreadPool.cpp | 57 +++++ common/concurrent/thread/GenericThreadPool.h | 126 ++++++++++ common/concurrent/thread/GenericWorker.cpp | 163 +++++++++++++ common/concurrent/thread/GenericWorker.h | 199 ++++++++++++++++ common/concurrent/thread/NamedThread.cpp | 35 +++ common/concurrent/thread/NamedThread.h | 92 ++++++++ common/concurrent/thread/ThreadLocalPtr.h | 220 ++++++++++++++++++ common/cpp/helpers.h | 28 +++ common/cpp/macros.h | 34 +++ 24 files changed, 1842 insertions(+), 1 deletion(-) create mode 100644 common/concurrent/CMakeLists.txt create mode 100644 common/concurrent/sync/Barrier.cpp create mode 100644 common/concurrent/sync/Barrier.h create mode 100644 common/concurrent/sync/Latch.cpp create mode 100644 common/concurrent/sync/Latch.h create mode 100644 common/concurrent/test/BarrierTest.cpp create mode 100644 common/concurrent/test/CMakeLists.txt create mode 100644 common/concurrent/test/GenericThreadPoolTest.cpp create mode 100644 common/concurrent/test/GenericWorkerTest.cpp create mode 100644 common/concurrent/test/LatchTest.cpp create mode 100644 common/concurrent/test/ThreadLocalPtrTest.cpp create mode 100644 common/concurrent/test/ThreadTest.cpp create mode 100644 common/concurrent/thread/GenericThreadPool.cpp create mode 100644 common/concurrent/thread/GenericThreadPool.h create mode 100644 common/concurrent/thread/GenericWorker.cpp create mode 100644 common/concurrent/thread/GenericWorker.h create mode 100644 common/concurrent/thread/NamedThread.cpp create mode 100644 common/concurrent/thread/NamedThread.h create mode 100644 common/concurrent/thread/ThreadLocalPtr.h create mode 100644 common/cpp/helpers.h create mode 100644 common/cpp/macros.h diff --git a/.gitignore b/.gitignore index ea42f1f2b74..6b4fb8d9de7 100644 --- a/.gitignore +++ b/.gitignore @@ -8,5 +8,7 @@ common/base/Base.h.gch CMakeCache.txt Makefile cmake_install.cmake +CTestTestfile.cmake CMakeFiles/ +Testing/ diff --git a/CMakeLists.txt b/CMakeLists.txt index aa1f831433c..43f0710c942 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -6,6 +6,8 @@ set(CMAKE_SKIP_RPATH TRUE) set(CMAKE_VERBOSE_MAKEFILE TRUE) +enable_testing() + if (!CMAKE_CXX_COMPILER) message(FATAL_ERROR "No C++ compiler found") endif() diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index 041b698a147..5e405cd796b 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt @@ -1,6 +1,6 @@ add_custom_target( common ALL - DEPENDS base_obj time proc network fs + DEPENDS base_obj time proc network fs concurrent ) add_subdirectory(base) @@ -8,4 +8,5 @@ add_subdirectory(time) add_subdirectory(proc) add_subdirectory(network) add_subdirectory(fs) +add_subdirectory(concurrent) diff --git a/common/concurrent/CMakeLists.txt b/common/concurrent/CMakeLists.txt new file mode 100644 index 00000000000..f723d739d5a --- /dev/null +++ b/common/concurrent/CMakeLists.txt @@ -0,0 +1,7 @@ +add_library(concurrent_obj OBJECT thread/NamedThread.cpp + thread/GenericWorker.cpp + thread/GenericThreadPool.cpp + sync/Barrier.cpp + sync/Latch.cpp +) +add_subdirectory(test) diff --git a/common/concurrent/sync/Barrier.cpp b/common/concurrent/sync/Barrier.cpp new file mode 100644 index 00000000000..4beb19fbac2 --- /dev/null +++ b/common/concurrent/sync/Barrier.cpp @@ -0,0 +1,37 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ +#include "concurrent/sync/Barrier.h" + + +namespace vesoft { +namespace concurrent { + +Barrier::Barrier(size_t counter, std::function completion) { + if (counter == 0) { + throw std::invalid_argument("Zero barrier counter"); + } + completion_ = std::move(completion); + counter_ = counter; + ages_ = counter_; +} + +void Barrier::wait() { + std::unique_lock unique(lock_); + if (--ages_ == 0) { + ages_ = counter_; + ++generation_; + if (completion_ != nullptr) { + completion_(); + } + cond_.notify_all(); + } else { + auto current = generation_; + cond_.wait(unique, [=] () { return current != generation_; }); + } +} + +} // namespace concurrent +} // namespace vesoft diff --git a/common/concurrent/sync/Barrier.h b/common/concurrent/sync/Barrier.h new file mode 100644 index 00000000000..e90b78dc2de --- /dev/null +++ b/common/concurrent/sync/Barrier.h @@ -0,0 +1,54 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ +#ifndef COMMON_CONCURRENT_SYNC_BARRIER_H_ +#define COMMON_CONCURRENT_SYNC_BARRIER_H_ +#include +#include +#include + +/** + * Like `Latch', `Barrier' is a synchronization object, except that + * `Barrier' is reusable. + * Besides, `Barrier' features with an optional callable object, + * which would be invoked at the completion phase, i.e. synchronization point, + * by the last participating thread entering `wait'. + */ + +namespace vesoft { +namespace concurrent { + +class Barrier { +public: + explicit Barrier(size_t counter, std::function completion = nullptr); + Barrier() = delete; + ~Barrier() = default; + Barrier(const Barrier&) = delete; + Barrier(Barrier&&) = delete; + Barrier& operator=(const Barrier&) = delete; + Barrier& operator=(Barrier&&) = delete; + /** + * Decrements the internal counter. + * If the counter reaches zero, the completion callback would be invoked if present, + * then all preceding blocked threads would be woken up, with the internal counter + * reset to the original value. + * Otherwise, the calling thread would be blocked to wait for + * other participants' arrival. + */ + void wait(); + +private: + std::function completion_{nullptr}; + size_t counter_{0}; + size_t ages_{0}; + size_t generation_{0}; + std::mutex lock_; + std::condition_variable cond_; +}; + +} // namespace concurrent +} // namespace vesoft + +#endif // COMMON_CONCURRENT_SYNC_BARRIER_H_ diff --git a/common/concurrent/sync/Latch.cpp b/common/concurrent/sync/Latch.cpp new file mode 100644 index 00000000000..930b7844765 --- /dev/null +++ b/common/concurrent/sync/Latch.cpp @@ -0,0 +1,50 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ +#include "concurrent/sync/Latch.h" + +namespace vesoft { +namespace concurrent { + +Latch::Latch(size_t counter) { + if (counter == 0) { + throw std::invalid_argument("Zero Latch counter"); + } + counter_ = counter; +} + +void Latch::down() { + std::unique_lock unique(lock_); + if (counter_ == 0) { + throw std::runtime_error("Count down on zero Latch"); + } + if (--counter_ == 0) { + cond_.notify_all(); + } +} + +void Latch::downWait() { + std::unique_lock unique(lock_); + if (counter_ == 0) { + throw std::runtime_error("Count down on zero Latch"); + } + if (--counter_ == 0) { + cond_.notify_all(); + return; + } + cond_.wait(unique, [this] () { return counter_ == 0; }); +} + +void Latch::wait() { + std::unique_lock unique(lock_); + cond_.wait(unique, [this] () { return counter_ == 0; }); +} + +bool Latch::isReady() { + return counter_ == 0; +} + +} // namespace concurrent +} // namespace vesoft diff --git a/common/concurrent/sync/Latch.h b/common/concurrent/sync/Latch.h new file mode 100644 index 00000000000..266b2ba22a5 --- /dev/null +++ b/common/concurrent/sync/Latch.h @@ -0,0 +1,67 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ +#ifndef COMMON_CONCURRENT_SYNC_LATCH_H_ +#define COMMON_CONCURRENT_SYNC_LATCH_H_ +#include +#include +/** + * Latch is an one-shot synchronization object. + * It provides synchronization point for multiple threads. + * See shared/concurrent/test/LatchTest.cpp for use scenarios. + */ + +namespace vesoft { +namespace concurrent { + +class Latch { +public: + /** + * @counter: initial counter, + * typically number of participating threads. + * Throws `std::invalid_argument' if counter is zero. + */ + explicit Latch(size_t counter); + Latch(const Latch&) = delete; + Latch(Latch&&) = delete; + ~Latch() = default; + Latch& operator=(const Latch&) = delete; + Latch& operator=(Latch&&) = delete; + /** + * Decrements the internal counter by one. + * If the counter reaches 0, all blocking(in `wait') + * threads will be given the green light. + * Throws `std::runtime_error' if counter already zeroed. + */ + void down(); + /** + * Decrements the internal counter by one. + * If the counter reaches 0, returns immediately, + * and all blocking threads will be woken up. + * Otherwise, the calling thread blocks until being woken up. + * Throws `std::runtime_error' if counter already zeroed. + */ + void downWait(); + /** + * Blocks if internal counter not zero. + * Otherwise, returns immediately. + */ + void wait(); + /** + * Returns true if internal counter already zeroed. + * Otherwise, returns false. + */ + bool isReady(); + +private: + volatile size_t counter_{0}; + std::mutex lock_; + std::condition_variable cond_; +}; + +} // namespace concurrent +} // namespace vesoft + +#endif // COMMON_CONCURRENT_SYNC_LATCH_H_ diff --git a/common/concurrent/test/BarrierTest.cpp b/common/concurrent/test/BarrierTest.cpp new file mode 100644 index 00000000000..92d41ac6ccb --- /dev/null +++ b/common/concurrent/test/BarrierTest.cpp @@ -0,0 +1,96 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ +#include +#include +#include +#include +#include +#include "concurrent/sync/Barrier.h" +#include "concurrent/thread/GenericThreadPool.h" + +namespace vesoft { +namespace concurrent { + +TEST(BarrierTest, BasicTest) { + // test for invalid initial counter + { + ASSERT_THROW({Barrier barrier(0UL);}, std::invalid_argument); + } + // test for single-thread normal case + { + Barrier barrier(1UL); + barrier.wait(); + ASSERT_TRUE(true); + } + // test for multiple-thread normal case + { + Barrier barrier(2UL); + std::atomic counter{0}; + auto cb = [&] () { + barrier.wait(); + ++counter; + }; + std::thread thread(cb); + usleep(1000); + ASSERT_EQ(0UL, counter.load()); + barrier.wait(); + thread.join(); + ASSERT_EQ(1UL, counter.load()); + } + // test for multiple-thread completion + { + std::atomic counter{0}; + auto completion = [&] () { + ++counter; + ++counter; + }; + Barrier barrier(2UL, completion); + + auto cb = [&] () { + barrier.wait(); + ++counter; + }; + + std::thread thread(cb); + usleep(1000); + ASSERT_EQ(0UL, counter.load()); + barrier.wait(); + ASSERT_GE(counter.load(), 2UL); + thread.join(); + ASSERT_EQ(3UL, counter.load()); + } +} + +TEST(BarrierTest, ConsecutiveTest) { + std::atomic counter{0}; + constexpr auto N = 64UL; + constexpr auto iters = 100UL; + auto completion = [&] () { + // At the completion phase, `counter' should be multiple to `N'. + ASSERT_EQ(0UL, counter.load() % N); + }; + + Barrier barrier(N, completion); + auto cb = [&] () { + auto i = iters; + while (i-- != 0) { + ++counter; + barrier.wait(); + } + }; + + std::vector threads; + for (auto i = 0UL; i < N; i++) { + threads.emplace_back(cb); + } + for (auto &thread : threads) { + thread.join(); + } + ASSERT_EQ(0UL, counter.load() % N); +} + +} // namespace concurrent +} // namespace vesoft diff --git a/common/concurrent/test/CMakeLists.txt b/common/concurrent/test/CMakeLists.txt new file mode 100644 index 00000000000..d9c9340675b --- /dev/null +++ b/common/concurrent/test/CMakeLists.txt @@ -0,0 +1,9 @@ +add_executable(concurrent_test ThreadTest.cpp + GenericWorkerTest.cpp + GenericThreadPoolTest.cpp + ThreadLocalPtrTest.cpp + $ + $ +) +target_link_libraries(concurrent_test ev gtest gtest_main pthread) +add_test(NAME concurrent_test COMMAND concurrent_test) diff --git a/common/concurrent/test/GenericThreadPoolTest.cpp b/common/concurrent/test/GenericThreadPoolTest.cpp new file mode 100644 index 00000000000..06ec0d6cc6b --- /dev/null +++ b/common/concurrent/test/GenericThreadPoolTest.cpp @@ -0,0 +1,152 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ +#include +#include +#include +#include +#include "common/concurrent/thread/GenericThreadPool.h" +#include "common/time/Duration.h" +using namespace std; + +namespace vesoft { +namespace concurrent { + +TEST(GenericThreadPool, StartAndStop) { + // inactive pool + { + GenericThreadPool pool; + } + // start & stop & wait + { + GenericThreadPool pool; + ASSERT_TRUE(pool.start(1)); + ASSERT_TRUE(pool.stop()); + ASSERT_TRUE(pool.wait()); + } + // start & stop & no wait + { + GenericThreadPool pool; + ASSERT_TRUE(pool.start(1)); + ASSERT_TRUE(pool.stop()); + } + // start & no stop & no wait + { + GenericThreadPool pool; + ASSERT_TRUE(pool.start(1)); + } + // start twice + { + GenericThreadPool pool; + ASSERT_TRUE(pool.start(1)); + ASSERT_FALSE(pool.start(1)); + } + // stop twice + { + GenericThreadPool pool; + ASSERT_TRUE(pool.start(1)); + ASSERT_TRUE(pool.stop()); + ASSERT_FALSE(pool.stop()); + } +} + +TEST(GenericThreadPool, addTask) { + GenericThreadPool pool; + ASSERT_TRUE(pool.start(1)); + // task without parameters + { + volatile auto flag = false; + auto set_flag = [&] () { flag = true; }; + pool.addTask(set_flag).get(); + ASSERT_TRUE(flag); + } + // task with parameter + { + volatile auto flag = false; + auto set_flag = [&] (auto value) { flag = value; }; + pool.addTask(set_flag, true).get(); + ASSERT_TRUE(flag); + pool.addTask(set_flag, false).get(); + ASSERT_FALSE(flag); + } + // future with value + { + ASSERT_TRUE(pool.addTask([] () { return true; }).get()); + ASSERT_FALSE(pool.addTask([] () { return false; }).get()); + ASSERT_EQ(13UL, pool.addTask([] () { return ::strlen("Rock 'n' Roll"); }).get()); + ASSERT_EQ("Innuendo", pool.addTask([] () { return std::string("Innuendo"); }).get()); + } + // member function as task + { + struct X { + std::string itos(size_t i) { + return std::to_string(i); + } + } x; + ASSERT_EQ("918", pool.addTask(&X::itos, &x, 918).get()); + ASSERT_EQ("918", pool.addTask(&X::itos, std::make_shared(), 918).get()); + } +} + +static size_t currentMicroSeconds() { + struct timeval ts; + ::gettimeofday(&ts, nullptr); + return ts.tv_sec * 1000000 + ts.tv_usec; +} + +static bool msAboutEqual(size_t target, size_t actual) { + return std::max(target, actual) - std::min(target, actual) <= 10; +} + +TEST(GenericThreadPool, addDelayTask) { + GenericThreadPool pool; + ASSERT_TRUE(pool.start(1)); + { + auto shared = std::make_shared(0); + auto cb = [shared] () { + return ++(*shared); + }; + time::Duration clock; + ASSERT_EQ(1, pool.addDelayTask(50, cb).get()); + ASSERT_GE(shared.use_count(), 2); + ASSERT_TRUE(msAboutEqual(50, clock.elapsedInUSec() / 1000)); + ::usleep(5 * 1000); // ensure all internal resources are released + ASSERT_EQ(2, shared.use_count()); // two ref: `shared' and `cb' + } +} + +TEST(GenericThreadPool, addRepeatTask) { + GenericThreadPool pool; + ASSERT_TRUE(pool.start(1)); + { + auto counter = 0UL; + auto cb = [&] () { + counter++; + }; + pool.addRepeatTask(50, cb); + ::usleep(160 * 1000); + ASSERT_EQ(3, counter); + } +} + +TEST(GenericThreadPool, purgeRepeatTask) { + GenericThreadPool pool; + ASSERT_TRUE(pool.start(4)); + for (auto i = 0; i < 8; i++) { + auto counter = 0UL; + auto cb = [&] () { + counter++; + }; + auto id = pool.addRepeatTask(50, cb); + // fprintf(stderr, "id: 0x%016lx\n", id); + ::usleep(110 * 1000); + pool.purgeTimerTask(id); + ::usleep(50 * 1000); + ASSERT_EQ(2, counter) << "i: " << i << ", id: " << id; + } +} + +} // namespace concurrent +} // namespace vesoft diff --git a/common/concurrent/test/GenericWorkerTest.cpp b/common/concurrent/test/GenericWorkerTest.cpp new file mode 100644 index 00000000000..28c06416de5 --- /dev/null +++ b/common/concurrent/test/GenericWorkerTest.cpp @@ -0,0 +1,152 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ +#include +#include +#include +#include +#include "concurrent/thread/GenericWorker.h" +using namespace std; + +namespace vesoft { +namespace concurrent { + +TEST(GenericWorker, StartAndStop) { + // inactive worker + { + GenericWorker worker; + } + // start & stop & wait + { + GenericWorker worker; + ASSERT_TRUE(worker.start()); + ASSERT_TRUE(worker.stop()); + ASSERT_TRUE(worker.wait()); + } + // start & stop & no wait + { + GenericWorker worker; + ASSERT_TRUE(worker.start()); + ASSERT_TRUE(worker.stop()); + } + // start & no stop & no wait + { + GenericWorker worker; + ASSERT_TRUE(worker.start()); + } + // start twice + { + GenericWorker worker; + ASSERT_TRUE(worker.start()); + ASSERT_FALSE(worker.start()); + } + // stop twice + { + GenericWorker worker; + ASSERT_TRUE(worker.start()); + ASSERT_TRUE(worker.stop()); + ASSERT_FALSE(worker.stop()); + } +} + +TEST(GenericWorker, addTask) { + GenericWorker worker; + ASSERT_TRUE(worker.start()); + // task without parameters + { + volatile auto flag = false; + auto set_flag = [&] () mutable { flag = true; }; + worker.addTask(set_flag).get(); + ASSERT_TRUE(flag); + } + // task with parameter + { + volatile auto flag = false; + auto set_flag = [&] (auto value) { flag = value; }; + worker.addTask(set_flag, true).get(); + ASSERT_TRUE(flag); + worker.addTask(set_flag, false).get(); + ASSERT_FALSE(flag); + } + // future with value + { + ASSERT_TRUE(worker.addTask([] () { return true; }).get()); + ASSERT_FALSE(worker.addTask([] () { return false; }).get()); + ASSERT_EQ(13UL, worker.addTask([] () { return ::strlen("Rock 'n' Roll"); }).get()); + ASSERT_EQ("Innuendo", worker.addTask([] () { return std::string("Innuendo"); }).get()); + } + // member function as task + { + struct X { + std::string itos(size_t i) { + return std::to_string(i); + } + } x; + ASSERT_EQ("918", worker.addTask(&X::itos, &x, 918).get()); + ASSERT_EQ("918", worker.addTask(&X::itos, std::make_shared(), 918).get()); + } +} + +static size_t currentMicroSeconds() { + struct timeval ts; + ::gettimeofday(&ts, nullptr); + return ts.tv_sec * 1000000 + ts.tv_usec; +} + +static bool msAboutEqual(size_t target, size_t actual) { + return std::max(target, actual) - std::min(target, actual) <= 10; +} + +TEST(GenericWorker, addDelayTask) { + GenericWorker worker; + ASSERT_TRUE(worker.start()); + { + auto shared = std::make_shared(0); + auto cb = [shared] () { + return ++(*shared); + }; + auto beg = currentMicroSeconds(); + ASSERT_EQ(1, worker.addDelayTask(50, cb).get()); + ASSERT_GE(shared.use_count(), 2); + auto now = currentMicroSeconds(); + ASSERT_TRUE(msAboutEqual(50, (now - beg) / 1000)); + ::usleep(5 * 1000); // ensure all internal resources are released + ASSERT_EQ(2, shared.use_count()); // two ref: `shared' and `cb' + } +} + +TEST(GenericWorker, addRepeatTask) { + GenericWorker worker; + ASSERT_TRUE(worker.start()); + { + auto counter = 0UL; + auto cb = [&] () { + counter++; + }; + worker.addRepeatTask(50, cb); + ::usleep(160 * 1000); + ASSERT_EQ(3, counter); + } +} + +TEST(GenericWorker, purgeRepeatTask) { + GenericWorker worker; + ASSERT_TRUE(worker.start()); + { + auto counter = 0UL; + auto cb = [&] () { + counter++; + }; + auto id = worker.addRepeatTask(50, cb); + // fprintf(stderr, "id: 0x%016lx\n", id); + ::usleep(110 * 1000); + worker.purgeTimerTask(id); + ::usleep(50 * 1000); + ASSERT_EQ(2, counter); + } +} + +} // namespace concurrent +} // namespace vesoft diff --git a/common/concurrent/test/LatchTest.cpp b/common/concurrent/test/LatchTest.cpp new file mode 100644 index 00000000000..06b36e1cae6 --- /dev/null +++ b/common/concurrent/test/LatchTest.cpp @@ -0,0 +1,110 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ +#include +#include +#include +#include +#include +#include "concurrent/sync/Latch.h" +#include "concurrent/thread/GenericThreadPool.h" + +namespace vesoft { +namespace concurrent { + +TEST(LatchTest, BasicTest) { + // test for invalid initial counter + { + ASSERT_THROW({Latch latch(0);}, std::invalid_argument); + } + // test for illegal `downWait' + { + ASSERT_THROW({ + Latch latch(1); + latch.down(); + latch.downWait(); + }, std::runtime_error); + } + // test for illegal `down' + { + ASSERT_THROW({ + Latch latch(1); + latch.down(); + latch.down(); + }, std::runtime_error); + } + // test for single-thread normal case + { + Latch latch(1); + ASSERT_FALSE(latch.isReady()); + latch.down(); + ASSERT_TRUE(latch.isReady()); + latch.wait(); + latch.wait(); + ASSERT_TRUE(true); + } + // test for multiple-thread normal case + { + Latch latch(2); + auto cb = [&] () { + latch.downWait(); + }; + std::thread thread(cb); + ASSERT_FALSE(latch.isReady()); + latch.downWait(); + ASSERT_TRUE(latch.isReady()); + thread.join(); + } +} + +TEST(LatchTest, JoinLikeTest) { + // start bunch of tasks, then wait for them all done. + constexpr auto nthreads = 4UL; + constexpr auto ntasks = 16UL; + concurrent::thread::GenericThreadPool pool; + Latch latch(ntasks); + std::atomic counter{0}; + auto task = [&] () { + ++counter; + latch.down(); + }; + pool.start(nthreads); + for (auto i = 0UL; i < ntasks; i++) { + pool.addTask(task); + } + latch.wait(); + ASSERT_EQ(ntasks, counter.load()); +} + +TEST(LatchTest, SignalTest) { + // There are preceding I/O works and subsequent CPU bound works. + // Do I/O works with single thread, and CPU works concurrently. + constexpr auto nthreads = 16UL; + constexpr auto ntasks = 16UL; + concurrent::thread::GenericThreadPool pool; + Latch latch(1); + pool.start(nthreads); + std::atomic counter{0}; + auto task = [&] () { + // do some preparing works + latch.wait(); // wait for the I/O works done + // do subsequent CPU bound works, where parallelism is more efficient. + ++counter; + }; + for (auto i = 0UL; i < ntasks; i++) { + pool.addTask(task); + } + // sleep to simulate I/O bound task, which single threading suffices + usleep(100000); + // I/O works done + ASSERT_EQ(0, counter.load()); // no CPU bound work done. + latch.down(); + pool.stop(); + pool.wait(); // wait all tasks done + ASSERT_EQ(ntasks, counter.load()); // all tasks are done +} + +} // namespace concurrent +} // namespace vesoft diff --git a/common/concurrent/test/ThreadLocalPtrTest.cpp b/common/concurrent/test/ThreadLocalPtrTest.cpp new file mode 100644 index 00000000000..2c0ef8e29ad --- /dev/null +++ b/common/concurrent/test/ThreadLocalPtrTest.cpp @@ -0,0 +1,113 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ +#include +#include +#include "concurrent/thread/ThreadLocalPtr.h" +#include "concurrent/sync/Barrier.h" + +namespace vesoft { +namespace concurrent { + +TEST(ThreadLocalPtr, SingleThread) { + ThreadLocalPtr tls([](auto *ptr){ delete ptr; }); + auto cb = [&] () { + ASSERT_EQ(nullptr, tls.get()); + tls.reset(new std::string("Bohemian Rhapsody")); + ASSERT_EQ("Bohemian Rhapsody", *tls.get()); + tls.reset(); + ASSERT_EQ(nullptr, tls.get()); + }; + std::thread thread(cb); + thread.join(); +} + +TEST(ThreadLocalPtr, MultiThread) { + ThreadLocalPtr tls([](auto *ptr){ delete ptr; }); + auto cb = [&] (int idx) { + ASSERT_EQ(nullptr, tls.get()); + tls.reset(new int(idx)); + ASSERT_EQ(idx, *tls.get()); + ASSERT_EQ(idx, *tls); + tls.reset(); + ASSERT_EQ(nullptr, tls.get()); + }; + std::vector threads; + for (auto i = 0; i < 128; i++) { + threads.emplace_back(cb, i); + } + + for (auto &th : threads) { + th.join(); + } +} + +TEST(ThreadLocalPtr, Operators) { + struct Object { int i{0xAA}; }; + ThreadLocalPtr tls([](auto *ptr) { delete ptr; }); + + ASSERT_FALSE(static_cast(tls)); + ASSERT_TRUE(!tls); + ASSERT_FALSE(!!tls); + ASSERT_TRUE(tls == nullptr); + ASSERT_TRUE(nullptr == tls); + + tls.reset(new Object); + + ASSERT_TRUE(static_cast(tls)); + ASSERT_FALSE(!tls); + ASSERT_TRUE(!!tls); + ASSERT_TRUE(nullptr != tls); + ASSERT_TRUE(tls != nullptr); + ASSERT_EQ(0xAA, tls->i); + ASSERT_EQ(0xAA, (*tls).i); + + tls.reset(); + + ASSERT_FALSE(static_cast(tls)); + ASSERT_TRUE(!tls); + ASSERT_FALSE(!!tls); + ASSERT_TRUE(tls == nullptr); + ASSERT_TRUE(nullptr == tls); +} + +class Object { +public: + Object() { ++cnt_; } + ~Object() { --cnt_; } + static std::atomic cnt_; +}; +std::atomic Object::cnt_{0}; + +TEST(ThreadLocalPtr, Destructor) { + constexpr auto N = 128; + auto completion = [=] () { + ASSERT_EQ(N, Object::cnt_.load()); + }; + + concurrent::Barrier barrier(N, completion); + + ThreadLocalPtr tls([](auto *ptr) { delete ptr; }); + auto cb = [&] () { + ASSERT_EQ(nullptr, tls.get()); + tls.reset(new Object); + ASSERT_NE(nullptr, tls.get()); + barrier.wait(); + }; + std::vector threads; + for (auto i = 0; i < 128; i++) { + threads.emplace_back(cb); + } + + for (auto &th : threads) { + th.join(); + } + + ASSERT_EQ(0UL, Object::cnt_.load()); +} + + +} // namespace concurrent +} // namespace vesoft diff --git a/common/concurrent/test/ThreadTest.cpp b/common/concurrent/test/ThreadTest.cpp new file mode 100644 index 00000000000..a34d24f3598 --- /dev/null +++ b/common/concurrent/test/ThreadTest.cpp @@ -0,0 +1,35 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ +#include +#include "concurrent/thread/NamedThread.h" + +namespace vesoft { +namespace concurrent { + +TEST(NamedThread, ThreadName) { + std::string setname("thread"); + std::string getname; + auto getter = [&] () { + NamedThread::Nominator::get(getname); + }; + NamedThread thread(setname, getter); + thread.join(); + ASSERT_EQ(setname, getname); +} + +TEST(NamedThread, ThreadID) { + pid_t tid; + auto getter = [&] () { + tid = ::syscall(SYS_gettid); + }; + + NamedThread thread("", getter); + thread.join(); + ASSERT_EQ(tid, thread.tid()); +} + +} // namespace concurrent +} // namespace vesoft diff --git a/common/concurrent/thread/GenericThreadPool.cpp b/common/concurrent/thread/GenericThreadPool.cpp new file mode 100644 index 00000000000..bd9d05240fd --- /dev/null +++ b/common/concurrent/thread/GenericThreadPool.cpp @@ -0,0 +1,57 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ +#include "common/concurrent/thread/GenericThreadPool.h" + +namespace vesoft { +namespace concurrent { + +GenericThreadPool::GenericThreadPool() { +} + +GenericThreadPool::~GenericThreadPool() { + stop(); + wait(); +} + +bool GenericThreadPool::start(size_t nrThreads, const std::string &name) { + if (nrThreads_ != 0) { + return false; + } + nrThreads_ = nrThreads; + auto ok = true; + for (auto i = 0UL; ok && i < nrThreads_; i++) { + pool_.emplace_back(std::make_unique()); + ok = ok && pool_.back()->start(name); + } + return ok; +} + +bool GenericThreadPool::stop() { + auto ok = true; + for (auto &worker : pool_) { + ok = worker->stop() && ok; + } + return ok; +} + +bool GenericThreadPool::wait() { + auto ok = true; + for (auto &worker : pool_) { + ok = worker->wait() && ok; + } + nrThreads_ = 0; + pool_.clear(); + return ok; +} + +void GenericThreadPool::purgeTimerTask(uint64_t id) { + auto idx = (id >> GenericWorker::TIMER_ID_BITS); + id = (id & GenericWorker::TIMER_ID_MASK); + pool_[idx]->purgeTimerTask(id); +} + +} // namespace concurrent +} // namespace vesoft diff --git a/common/concurrent/thread/GenericThreadPool.h b/common/concurrent/thread/GenericThreadPool.h new file mode 100644 index 00000000000..0067881426b --- /dev/null +++ b/common/concurrent/thread/GenericThreadPool.h @@ -0,0 +1,126 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ +#ifndef COMMON_CONCURRENT_THREAD_GENERICTHREADPOOL_H_ +#define COMMON_CONCURRENT_THREAD_GENERICTHREADPOOL_H_ +#include "common/concurrent/thread/GenericWorker.h" + +/** + * Based on GenericWorker, GenericThreadPool implements a thread pool that execute tasks asynchronously. + * + * Under the hood, GenericThreadPool distributes tasks around the internal threads in a round-robin way. + * + * Please NOTE that, as the name indicates, this a thread pool for the general purpose, + * but not for the performance critical situation. + */ + +namespace vesoft { +namespace concurrent { + +class GenericThreadPool : public vesoft::cpp::NonCopyable, public vesoft::cpp::NonMovable { +public: + GenericThreadPool(); + ~GenericThreadPool(); + + /** + * To launch the internal generic workers. + * + * Optionally, you could give the internal thread a specific name, + * which will be shown in some system utilities like `top'. + * + * Once started, the worker will keep waiting for newly-added tasks indefinitely, + * until `stop' is invoked. + * + * A GenericThreadPool MUST be `start'ed successfully before invoking + * any other interfaces. + * + * @nrThreads number of internal threads + * @name name of internal threads + */ + bool start(size_t nrThreads, const std::string &name = ""); + + /** + * Asynchronouly to notify the workers to stop handling further new tasks. + */ + bool stop(); + + /** + * Synchronously to wait the workers to finish and exit. + * + * For the sake of convenience, `~GenericWorker' invokes `stop' and `wait', + * but it's better to control these processes manually to make the resource + * management more clearly. + */ + bool wait(); + + template + using ReturnType = typename std::result_of::type; + template + using FutureType = std::future>; + + /** + * To add a normal task. + * @task a callable object + * @args variadic arguments + * @return an instance of `std::future' you could wait upon for the result of `task' + */ + template + auto addTask(F&&, Args&&...) -> FutureType; + + /** + * To add a oneshot timer task which will be executed after a while. + * @ms milliseconds from now when the task get executed + * @task a callable object + * @args variadic arguments + * @return an instance of `std::future' you could wait upon for the result of `task' + */ + template + auto addDelayTask(size_t, F&&, Args&&...) -> FutureType; + + /** + * To add a repeated timer task which will be executed in each period. + * @ms interval in milliseconds + * @task a callable object + * @args variadic arguments + * @return ID of the added task, unique for this worker + */ + template + uint64_t addRepeatTask(size_t, F&&, Args&&...); + + /** + * To purge or deactivate a repeated task. + * @id ID returned by `addRepeatTask' + */ + void purgeTimerTask(uint64_t id); + +private: + size_t nrThreads_{0}; + std::atomic nextThread_{0}; + std::vector> pool_; +}; + +template +auto GenericThreadPool::addTask(F &&f, Args &&...args) -> FutureType { + auto idx = nextThread_++ % nrThreads_; + return pool_[idx]->addTask(std::forward(f), std::forward(args)...); +} + +template +auto GenericThreadPool::addDelayTask(size_t ms, F &&f, Args &&...args) -> FutureType { + auto idx = nextThread_++ % nrThreads_; + return pool_[idx]->addDelayTask(ms, std::forward(f), std::forward(args)...); +} + +template +uint64_t GenericThreadPool::addRepeatTask(size_t ms, F &&f, Args &&...args) { + auto idx = nextThread_++ % nrThreads_; + auto id = pool_[idx]->addRepeatTask(ms, std::forward(f), std::forward(args)...); + return ((idx << GenericWorker::TIMER_ID_BITS) | id); +} + +} // namespace concurrent +} // namespace vesoft + +#endif // COMMON_CONCURRENT_THREAD_GENERICTHREADPOOL_H_ diff --git a/common/concurrent/thread/GenericWorker.cpp b/common/concurrent/thread/GenericWorker.cpp new file mode 100644 index 00000000000..10c54a710e7 --- /dev/null +++ b/common/concurrent/thread/GenericWorker.cpp @@ -0,0 +1,163 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ +#include "common/concurrent/thread/GenericWorker.h" +#include + +#ifndef EV_MULTIPLICITY +#define EV_MULTIPLICITY 1 +#endif +#include + +namespace vesoft { +namespace concurrent { + +GenericWorker::GenericWorker() { +} + +GenericWorker::~GenericWorker() { + stop(); + wait(); + if (notifier_ != nullptr) { + notifier_ = nullptr; + } + if (evloop_ != nullptr) { + ev_loop_destroy(evloop_); + evloop_ = nullptr; + } +} + +bool GenericWorker::start(std::string name) { + name_ = std::move(name); + if (!stopped_.load(std::memory_order_acquire)) { + return false; + } + evloop_ = ev_loop_new(0); + ev_set_userdata(evloop_, this); + + auto cb = [] (struct ev_loop *loop, ev_async *, int) { + reinterpret_cast(ev_userdata(loop))->onNotify(); + }; + notifier_ = std::make_unique(); + ev_async_init(notifier_.get(), cb); + ev_async_start(evloop_, notifier_.get()); + + thread_ = std::make_unique(name_, &GenericWorker::loop, this); + + stopped_.store(false, std::memory_order_release); + + return true; +} + +bool GenericWorker::stop() { + if (stopped_.load(std::memory_order_acquire)) { + return false; + } + stopped_.store(true, std::memory_order_release); + notify(); + return true; +} + +bool GenericWorker::wait() { + if (thread_ == nullptr) { + return false; + } + thread_->join(); + thread_.reset(); + return true; +} + +void GenericWorker::loop() { + ev_run(evloop_, 0); +} + +void GenericWorker::notify() { + if (notifier_ == nullptr) { + return; + } + ev_async_send(evloop_, notifier_.get()); +} + +void GenericWorker::onNotify() { + if (stopped_.load(std::memory_order_acquire)) { + ev_break(evloop_, EVBREAK_ALL); + // Even been broken, we still fall through to finish the current loop. + } + { + decltype(pendingTasks_) newcomings; + { + std::lock_guard guard(lock_); + newcomings.swap(pendingTasks_); + } + for (auto &task : newcomings) { + task(); + } + } + { + decltype(pendingTimers_) newcomings; + { + std::lock_guard guard(lock_); + newcomings.swap(pendingTimers_); + } + auto cb = [] (struct ev_loop *loop, ev_timer *w, int) { + auto timer = reinterpret_cast(w->data); + auto worker = reinterpret_cast(ev_userdata(loop)); + timer->callback_(); + if (timer->intervalSec_ == 0.0) { + worker->purgeTimerInternal(timer->id_); + } else { + w->repeat = timer->intervalSec_; + ev_timer_again(loop, w); + } + }; + for (auto &timer : newcomings) { + timer->ev_ = std::make_unique(); + auto delay = timer->delaySec_; + auto interval = timer->intervalSec_; + ev_timer_init(timer->ev_.get(), cb, delay, interval); + timer->ev_->data = timer.get(); + ev_timer_start(evloop_, timer->ev_.get()); + auto id = timer->id_; + activeTimers_[id] = std::move(timer); + } + } + { + decltype(purgingingTimers_) newcomings; + { + std::lock_guard guard(lock_); + newcomings.swap(purgingingTimers_); + } + for (auto id : newcomings) { + purgeTimerInternal(id); + } + } +} + +GenericWorker::Timer::Timer(std::function cb) { + callback_ = std::move(cb); +} + +GenericWorker::Timer::~Timer() { +} + +void GenericWorker::purgeTimerTask(uint64_t id) { + { + std::lock_guard guard(lock_); + purgingingTimers_.push_back(id); + } + notify(); +} + +void GenericWorker::purgeTimerInternal(uint64_t id) { + auto iter = activeTimers_.find(id); + if (iter != activeTimers_.end()) { + ev_timer_stop(evloop_, iter->second->ev_.get()); + activeTimers_.erase(iter); + } +} + +} // namespace concurrent +} // namespace vesoft + diff --git a/common/concurrent/thread/GenericWorker.h b/common/concurrent/thread/GenericWorker.h new file mode 100644 index 00000000000..cd7d3d5fa09 --- /dev/null +++ b/common/concurrent/thread/GenericWorker.h @@ -0,0 +1,199 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ +#ifndef COMMON_CONCURRENT_THREAD_GENERICWORKER_H_ +#define COMMON_CONCURRENT_THREAD_GENERICWORKER_H_ +#include +#include +#include +#include +#include +#include +#include "common/cpp/helpers.h" +#include "common/cpp/macros.h" +#include "common/concurrent/thread/NamedThread.h" + +/** + * GenericWorker implements a event-based task executor that executes tasks asynchronously + * in a separate thread. Like `std::thread', It takes any callable object and its optional + * arguments as a normal, delayed or repeated task. + * + * GenericWorker executes tasks one after one, in the FIFO way, while tasks are non-preemptible. + * + * Please NOTE that, as the name indicates, this a worker thread for the general purpose, + * but not for the performance critical situation. + */ + +struct ev_loop; +struct ev_timer; +struct ev_async; + +namespace vesoft { +namespace concurrent { + +class GenericWorker : public vesoft::cpp::NonCopyable, public vesoft::cpp::NonMovable { +public: + friend class GenericThreadPool; + GenericWorker(); + ~GenericWorker(); + + /** + * To allocate resouces and launch the internal thread which executes + * the event loop to make this worker usable. + * + * Optionally, you could give the internal thread a specific name, + * which will be shown in some system utilities like `top'. + * + * Once started, the worker will keep waiting for newly-added tasks indefinitely, + * until `stop' is invoked. + * + * A GenericWorker MUST be `start'ed successfully before invoking + * any other interfaces. + */ + bool VE_MUST_USE_RESULT start(std::string name = ""); + + /** + * Asynchronouly to notify the worker to stop handling further new tasks. + */ + bool stop(); + + /** + * Synchronously to wait the worker to finish and exit. + * + * For the sake of convenience, `~GenericWorker' invokes `stop' and `wait', + * but it's better to control these processes manually to make the resource + * management more clearly. + */ + bool wait(); + + template + using ReturnType = typename std::result_of::type; + template + using FutureType = std::future>; + + /** + * To add a normal task. + * @task a callable object + * @args variadic arguments + * @return an instance of `std::future' you could wait upon for the result of `task' + */ + template + auto addTask(F &&task, Args &&...args) -> FutureType; + + /** + * To add a oneshot timer task which will be executed after a while. + * @ms milliseconds from now when the task get executed + * @task a callable object + * @args variadic arguments + * @return an instance of `std::future' you could wait upon for the result of `task' + */ + template + auto addDelayTask(size_t ms, F &&task, Args &&...args) -> FutureType; + + /** + * To add a repeated timer task which will be executed in each period. + * @ms interval in milliseconds + * @task a callable object + * @args variadic arguments + * @return ID of the added task, unique for this worker + */ + template + uint64_t addRepeatTask(size_t ms, F &&task, Args &&...args); + + /** + * To purge or deactivate a repeated task. + * @id ID returned by `addRepeatTask' + */ + void purgeTimerTask(uint64_t id); + +private: + template + uint64_t addTimerTask(size_t, size_t, F&&, Args&&...); + + void purgeTimerInternal(uint64_t id); + +private: + struct Timer { + explicit Timer(std::function cb); + ~Timer(); + uint64_t id_; + double delaySec_; + double intervalSec_; + std::function callback_; + std::unique_ptr ev_; + }; + +private: + void loop(); + void notify(); + void onNotify(); + uint64_t nextTimerId() { + // !NOTE! `lock_' must be hold + return (nextTimerId_++ & TIMER_ID_MASK); + } + +private: + static constexpr uint64_t TIMER_ID_BITS = 6 * 8; + static constexpr uint64_t TIMER_ID_MASK = ((~0x0UL) >> (64 - TIMER_ID_BITS)); + std::string name_; + std::atomic stopped_{true}; + volatile uint64_t nextTimerId_{0}; + std::unique_ptr notifier_; + struct ev_loop *evloop_ = nullptr; + std::mutex lock_; + std::vector> pendingTasks_; + using TimerPtr = std::unique_ptr; + std::vector pendingTimers_; + std::vector purgingingTimers_; + std::unordered_map activeTimers_; + std::unique_ptr thread_; +}; + +template +auto GenericWorker::addTask(F &&f, Args &&...args) -> FutureType { + using TaskType = std::packaged_task()>; + auto task = std::make_shared(std::bind(f, args...)); + auto future = task->get_future(); + { + std::lock_guard guard(lock_); + pendingTasks_.emplace_back([=](){ (*task)(); }); + } + notify(); + return future; +} + +template +auto GenericWorker::addDelayTask(size_t ms, F &&f, Args &&...args) -> FutureType { + using TaskType = std::packaged_task()>; + auto task = std::make_shared(std::bind(f, args...)); + auto future = task->get_future(); + addTimerTask(ms, 0, [=](){ (*task)(); }); + return future; +} + +template +uint64_t GenericWorker::addRepeatTask(size_t ms, F &&f, Args &&...args) { + return addTimerTask(ms, ms, std::forward(f), std::forward(args)...); +} + +template +uint64_t GenericWorker::addTimerTask(size_t delay, size_t interval, F &&f, Args &&...args) { + auto timer = std::make_unique(std::bind(f, args...)); + timer->delaySec_ = delay / 1000.; + timer->intervalSec_ = interval / 1000.; + auto id = 0UL; + { + std::lock_guard guard(lock_); + timer->id_ = (id = nextTimerId()); + pendingTimers_.emplace_back(std::move(timer)); + } + notify(); + return id; +} + +} // namespace concurrent +} // namespace vesoft + +#endif // COMMON_CONCURRENT_THREAD_GENERICWORKER_H_ diff --git a/common/concurrent/thread/NamedThread.cpp b/common/concurrent/thread/NamedThread.cpp new file mode 100644 index 00000000000..7b76076ca9a --- /dev/null +++ b/common/concurrent/thread/NamedThread.cpp @@ -0,0 +1,35 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ +#include "common/concurrent/thread/NamedThread.h" + +namespace vesoft { +namespace concurrent { + +class TLSThreadID { +public: + TLSThreadID() { + tid_ = ::syscall(SYS_gettid); + } + + ~TLSThreadID() { + tid_ = 0; + } + + pid_t tid() { + return tid_; + } + +private: + pid_t tid_; +}; + +pid_t gettid() { + static thread_local TLSThreadID tlstid; + return tlstid.tid(); +} + +} // namespace concurrent +} // namespace vesoft diff --git a/common/concurrent/thread/NamedThread.h b/common/concurrent/thread/NamedThread.h new file mode 100644 index 00000000000..96f738a2fd8 --- /dev/null +++ b/common/concurrent/thread/NamedThread.h @@ -0,0 +1,92 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ +#ifndef COMMON_CONCURRENT_THREAD_NAMEDTHREAD_H_ +#define COMMON_CONCURRENT_THREAD_NAMEDTHREAD_H_ + +#ifndef _GNU_SOURCE +#define _GNU_SOURCE +#endif +#include +#include + +#include +#include +#include +#include + +namespace vesoft { +namespace concurrent { + +pid_t gettid(); + +class NamedThread : public std::thread { +public: + NamedThread() = default; + NamedThread(NamedThread&&) = default; + template + NamedThread(const std::string &name, F &&f, Args&&...args); + NamedThread& operator=(NamedThread&&) = default; + NamedThread(const NamedThread&) = delete; + NamedThread& operator=(const NamedThread&) = delete; + + pid_t tid() const { + while (tid_ == 0) { + // `tid' is unavailable until the thread function is called. + } + return tid_; + } + +public: + class Nominator { + public: + Nominator(const std::string &name) { + get(prevName_); + set(name); + } + + ~Nominator() { + set(prevName_); + } + + static void set(const std::string &name) { + ::prctl(PR_SET_NAME, name.c_str(), 0, 0, 0); + } + + static void get(std::string &name) { + char buf[64]; + ::prctl(PR_GET_NAME, buf, 0, 0, 0); + name = buf; + } + + private: + std::string prevName_; + }; + +private: + static void hook(NamedThread *thread, + const std::string &name, + const std::function &f) { + thread->tid_ = vesoft::concurrent::gettid(); + if (!name.empty()) { + Nominator::set(name); + } + f(); + } + +private: + pid_t tid_{0}; +}; + +template +NamedThread::NamedThread(const std::string &name, F &&f, Args&&...args) + : std::thread(hook, this, name, + std::bind(std::forward(f), std::forward(args)...)) { +}; + +} // namespace concurrent +} // namespace vesoft + +#endif // COMMON_CONCURRENT_THREAD_NAMEDTHREAD_H_ diff --git a/common/concurrent/thread/ThreadLocalPtr.h b/common/concurrent/thread/ThreadLocalPtr.h new file mode 100644 index 00000000000..c826e42c7f3 --- /dev/null +++ b/common/concurrent/thread/ThreadLocalPtr.h @@ -0,0 +1,220 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ +#ifndef COMMON_CONCURRENT_THREAD_THREADLOCALPTR_H_ +#define COMMON_CONCURRENT_THREAD_THREADLOCALPTR_H_ + +#include +#include +#include +#include +#include +#include +#include "common/concurrent/thread/NamedThread.h" +#include "common/cpp/helpers.h" + +/** + * `ThreadLocalPtr' is based on pthread's thread-specific data key. + * Beside the normal TLS capability, `ThreadLocalPtr' allows us to collect + * all of other threads' TLS data, which is usually needed in the udpate-mostly, + * read-rarely occasions. + * + * Please refer to concurrent/test/ThreadLocalPtrTest.cpp for examples. + */ + +namespace vesoft { +namespace concurrent { + +template +class ThreadLocalPtr : public vesoft::cpp::NonCopyable, public vesoft::cpp::NonMovable { +public: + /** + * @dtor If provided, the callback would be invoked when an existing + * TLS data needs to be destructed. + */ + ThreadLocalPtr(std::function dtor = nullptr); + ~ThreadLocalPtr(); + /** + * Set or reset TLS data for the current thread. + */ + void reset(const T *data = nullptr); + /** + * Obtain the TLS data of the current thread, `nullptr' if there is none. + * + * Please NOTE that constness leaking might happens here. + */ + T* get() const; + /** + * Some convenience operator overloads. + */ + T* operator->() const; + T& operator*() const; + explicit operator bool() const; + /** + * Retrieve all TLS data + */ + std::vector getAll() const; + +private: + struct MetaBlock { + const T *data_; + pid_t tid_; + ThreadLocalPtr *owner_; + }; + MetaBlock* getMetaBlock() const; + void setMetaBlock(MetaBlock *meta) const; + void destructData(const T *data); + void destructMetaBlock(MetaBlock *meta); + static void destructor(void *meta); + +private: + pthread_key_t key_; + std::function dtor_; + std::unordered_map metaPerThread_; + mutable std::mutex lock_; +}; + +/** + * Compare with nullptr + */ + +template +bool operator==(const ThreadLocalPtr &ptr, nullptr_t) { + return !ptr; +} + +template +bool operator==(nullptr_t, const ThreadLocalPtr &ptr) { + return !ptr; +} + +template +bool operator !=(const ThreadLocalPtr &ptr, nullptr_t) { + return !!ptr; +} + +template +bool operator!=(nullptr_t, const ThreadLocalPtr &ptr) { + return !!ptr; +} + +template +ThreadLocalPtr::ThreadLocalPtr(std::function dtor) { + dtor_ = std::move(dtor); + auto status = ::pthread_key_create(&key_, destructor); + if (status != 0) { + throw std::runtime_error("::pthread_key_create failed with " + std::to_string(status)); + } +} + +template +ThreadLocalPtr::~ThreadLocalPtr() { + std::lock_guard guard(lock_); + for (auto &e : metaPerThread_) { + if (e.second->data_ != nullptr) { + fprintf(stderr, "Outstanding thread still exists, tid: %d, type: %s\n", e.first, typeid(T).name()); + } + delete e.second; + } +} + +template +void ThreadLocalPtr::reset(const T *data) { + auto *meta = getMetaBlock(); + if (meta == nullptr) { + meta = new MetaBlock; + meta->data_ = data; + meta->owner_ = this; + meta->tid_ = concurrent::gettid(); + { + std::lock_guard guard(lock_); + metaPerThread_.emplace(meta->tid_, meta); + } + setMetaBlock(meta); + } else { + auto *old = meta->data_; + meta->data_ = data; + destructData(old); + } +} + +template +T* ThreadLocalPtr::get() const { + auto meta = getMetaBlock(); + if (meta != nullptr) { + return const_cast(meta->data_); + } + return nullptr; +} + +template +T* ThreadLocalPtr::operator->() const { + return this->get(); +} + +template +T& ThreadLocalPtr::operator*() const { + return *this->get(); +} + +template +ThreadLocalPtr::operator bool() const { + return this->get() != nullptr; +} + +template +std::vector ThreadLocalPtr::getAll() const { + std::vector all; + { + std::lock_guard guard(lock_); + for (auto &e : metaPerThread_) { + all.push_back(const_cast(e.second->data_)); + } + } + return all; +} + +template +void ThreadLocalPtr::destructor(void *meta) { + assert(meta != nullptr); + auto *m = reinterpret_cast(meta); + m->owner_->destructMetaBlock(m); +} + +template +typename ThreadLocalPtr::MetaBlock* +ThreadLocalPtr::getMetaBlock() const { + return reinterpret_cast(::pthread_getspecific(key_)); +} + +template +void ThreadLocalPtr::setMetaBlock(MetaBlock *meta) const { + ::pthread_setspecific(key_, meta); +} + +template +void ThreadLocalPtr::destructData(const T *data) { + if (dtor_ != nullptr && data != nullptr) { + dtor_(const_cast(data)); + } +} + +template +void ThreadLocalPtr::destructMetaBlock(MetaBlock *meta) { + { + std::lock_guard guard(lock_); + auto iter = metaPerThread_.find(meta->tid_); + if (iter != metaPerThread_.end()) { + metaPerThread_.erase(iter); + } + } + destructData(meta->data_); + delete meta; +} + +} // namespace concurrent +} // namespace vesoft + +#endif // COMMON_CONCURRENT_THREAD_THREADLOCALPTR_H_ diff --git a/common/cpp/helpers.h b/common/cpp/helpers.h new file mode 100644 index 00000000000..75cbe4b22ec --- /dev/null +++ b/common/cpp/helpers.h @@ -0,0 +1,28 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ +#ifndef COMMON_CPP_HELPERS_H_ +#define COMMON_CPP_HELPERS_H_ + +namespace vesoft { +namespace cpp { + +class NonCopyable { +public: + NonCopyable() {} + NonCopyable(const NonCopyable&) = delete; + NonCopyable& operator=(const NonCopyable&) = delete; +}; + +class NonMovable { +public: + NonMovable() {} + NonMovable(NonMovable&&) = delete; + NonMovable& operator=(NonMovable&&) = delete; +}; + +} // namespace cpp +} // namespace vesoft +#endif // COMMON_CPP_HELPERS_H_ diff --git a/common/cpp/macros.h b/common/cpp/macros.h new file mode 100644 index 00000000000..639a9e65b27 --- /dev/null +++ b/common/cpp/macros.h @@ -0,0 +1,34 @@ +/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved + * + * This source code is licensed under Apache 2.0 License + * (found in the LICENSE.Apache file in the root directory) + */ +#ifndef CPP_MACROS_H_ +#define CPP_MACROS_H_ + +#define VE_MUST_USE_RESULT __attribute__((warn_unused_result)) +#define VE_DONT_OPTIMIZE __attribute__((optimize("O0"))) + +#define VE_ALWAYS_INLINE __attribute__((always_inline)) +#define VE_ALWAYS_NO_INLINE __attribute__((noinline)) + +#define VE_BEGIN_NO_OPTIMIZATION _Pragma("GCC push_options") \ + _Pragma("GCC optimize(\"O0\")") +#define VE_END_NO_OPTIMIZATION _Pragma("GCC pop_options") + +#ifndef likely +#define likely(x) __builtin_expect((x),1) +#endif // likely +#ifndef unlikely +#define unlikely(x) __builtin_expect((x),0) +#endif // unlikely + +#ifndef UNUSED +#define UNUSED(x) (void)(x) +#endif // UNUSED + +#ifndef COMPILER_BARRIER +#define COMPILER_BARRIER() asm volatile ("":::"memory") +#endif // COMPILER_BARRIER + +#endif // CPP_MACROS_H_ From 25a7a690d1696153e1d3a9590de1c0ef44df2f84 Mon Sep 17 00:00:00 2001 From: dutor <440396+dutor@users.noreply.github.com> Date: Fri, 31 Aug 2018 07:53:54 +0800 Subject: [PATCH 2/2] [Feature] Added some concurrent utilities, GenericThreadPool, etc. --- common/CMakeLists.txt | 3 +- common/base/Base.h | 18 ++ common/concurrent/{sync => }/Barrier.cpp | 8 +- common/concurrent/{sync => }/Barrier.h | 20 +- common/concurrent/CMakeLists.txt | 9 +- common/concurrent/{sync => }/Latch.cpp | 2 +- common/concurrent/{sync => }/Latch.h | 15 +- common/concurrent/test/BarrierTest.cpp | 4 +- common/concurrent/test/CMakeLists.txt | 9 +- common/concurrent/test/LatchTest.cpp | 8 +- common/concurrent/test/ThreadLocalPtrTest.cpp | 113 --------- common/concurrent/thread/ThreadLocalPtr.h | 220 ------------------ common/cpp/macros.h | 34 --- common/thread/CMakeLists.txt | 7 + .../thread/GenericThreadPool.cpp | 6 +- .../thread/GenericThreadPool.h | 14 +- .../{concurrent => }/thread/GenericWorker.cpp | 6 +- .../{concurrent => }/thread/GenericWorker.h | 16 +- .../{concurrent => }/thread/NamedThread.cpp | 10 +- common/{concurrent => }/thread/NamedThread.h | 14 +- common/thread/test/CMakeLists.txt | 11 + .../test/GenericThreadPoolTest.cpp | 6 +- .../test/GenericWorkerTest.cpp | 6 +- .../test/ThreadTest.cpp | 6 +- 24 files changed, 117 insertions(+), 448 deletions(-) rename common/concurrent/{sync => }/Barrier.cpp (77%) rename common/concurrent/{sync => }/Barrier.h (74%) rename common/concurrent/{sync => }/Latch.cpp (97%) rename common/concurrent/{sync => }/Latch.h (82%) delete mode 100644 common/concurrent/test/ThreadLocalPtrTest.cpp delete mode 100644 common/concurrent/thread/ThreadLocalPtr.h delete mode 100644 common/cpp/macros.h create mode 100644 common/thread/CMakeLists.txt rename common/{concurrent => }/thread/GenericThreadPool.cpp (92%) rename common/{concurrent => }/thread/GenericThreadPool.h (92%) rename common/{concurrent => }/thread/GenericWorker.cpp (97%) rename common/{concurrent => }/thread/GenericWorker.h (94%) rename common/{concurrent => }/thread/NamedThread.cpp (77%) rename common/{concurrent => }/thread/NamedThread.h (87%) create mode 100644 common/thread/test/CMakeLists.txt rename common/{concurrent => thread}/test/GenericThreadPoolTest.cpp (97%) rename common/{concurrent => thread}/test/GenericWorkerTest.cpp (97%) rename common/{concurrent => thread}/test/ThreadTest.cpp (88%) diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index 5e405cd796b..53b3e11321b 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt @@ -1,6 +1,6 @@ add_custom_target( common ALL - DEPENDS base_obj time proc network fs concurrent + DEPENDS base_obj time proc network fs concurrent thread ) add_subdirectory(base) @@ -9,4 +9,5 @@ add_subdirectory(proc) add_subdirectory(network) add_subdirectory(fs) add_subdirectory(concurrent) +add_subdirectory(thread) diff --git a/common/base/Base.h b/common/base/Base.h index 55e501e8ce9..ba6afd7a5b1 100644 --- a/common/base/Base.h +++ b/common/base/Base.h @@ -57,6 +57,24 @@ //#include "base/StringUnorderedMap.h" +#define VE_MUST_USE_RESULT __attribute__((warn_unused_result)) +#define VE_DONT_OPTIMIZE __attribute__((optimize("O0"))) + +#define VE_ALWAYS_INLINE __attribute__((always_inline)) +#define VE_ALWAYS_NO_INLINE __attribute__((noinline)) + +#define VE_BEGIN_NO_OPTIMIZATION _Pragma("GCC push_options") \ + _Pragma("GCC optimize(\"O0\")") +#define VE_END_NO_OPTIMIZATION _Pragma("GCC pop_options") + +#ifndef UNUSED +#define UNUSED(x) (void)(x) +#endif // UNUSED + +#ifndef COMPILER_BARRIER +#define COMPILER_BARRIER() asm volatile ("":::"memory") +#endif // COMPILER_BARRIER + namespace vesoft { using GraphSpaceID = int32_t; diff --git a/common/concurrent/sync/Barrier.cpp b/common/concurrent/Barrier.cpp similarity index 77% rename from common/concurrent/sync/Barrier.cpp rename to common/concurrent/Barrier.cpp index 4beb19fbac2..41029361592 100644 --- a/common/concurrent/sync/Barrier.cpp +++ b/common/concurrent/Barrier.cpp @@ -3,7 +3,7 @@ * This source code is licensed under Apache 2.0 License * (found in the LICENSE.Apache file in the root directory) */ -#include "concurrent/sync/Barrier.h" +#include "concurrent/Barrier.h" namespace vesoft { @@ -11,7 +11,7 @@ namespace concurrent { Barrier::Barrier(size_t counter, std::function completion) { if (counter == 0) { - throw std::invalid_argument("Zero barrier counter"); + throw std::invalid_argument("Barrier counter can't be zero"); } completion_ = std::move(completion); counter_ = counter; @@ -19,7 +19,7 @@ Barrier::Barrier(size_t counter, std::function completion) { } void Barrier::wait() { - std::unique_lock unique(lock_); + std::unique_lock guard(lock_); if (--ages_ == 0) { ages_ = counter_; ++generation_; @@ -29,7 +29,7 @@ void Barrier::wait() { cond_.notify_all(); } else { auto current = generation_; - cond_.wait(unique, [=] () { return current != generation_; }); + cond_.wait(guard, [=] () { return current != generation_; }); } } diff --git a/common/concurrent/sync/Barrier.h b/common/concurrent/Barrier.h similarity index 74% rename from common/concurrent/sync/Barrier.h rename to common/concurrent/Barrier.h index e90b78dc2de..19e46dbdc71 100644 --- a/common/concurrent/sync/Barrier.h +++ b/common/concurrent/Barrier.h @@ -3,32 +3,32 @@ * This source code is licensed under Apache 2.0 License * (found in the LICENSE.Apache file in the root directory) */ -#ifndef COMMON_CONCURRENT_SYNC_BARRIER_H_ -#define COMMON_CONCURRENT_SYNC_BARRIER_H_ +#ifndef COMMON_CONCURRENT_BARRIER_H_ +#define COMMON_CONCURRENT_BARRIER_H_ #include #include #include +#include "common/cpp/helpers.h" /** * Like `Latch', `Barrier' is a synchronization object, except that * `Barrier' is reusable. * Besides, `Barrier' features with an optional callable object, * which would be invoked at the completion phase, i.e. synchronization point, - * by the last participating thread entering `wait'. + * by the last participating thread who entered `wait', before waking up other blocking threads. */ namespace vesoft { namespace concurrent { -class Barrier { +class Barrier final : public vesoft::cpp::NonCopyable, public vesoft::cpp::NonMovable { public: + /** + * @counter number of participating threads + * @completion callback invoked at the completion phase + */ explicit Barrier(size_t counter, std::function completion = nullptr); - Barrier() = delete; ~Barrier() = default; - Barrier(const Barrier&) = delete; - Barrier(Barrier&&) = delete; - Barrier& operator=(const Barrier&) = delete; - Barrier& operator=(Barrier&&) = delete; /** * Decrements the internal counter. * If the counter reaches zero, the completion callback would be invoked if present, @@ -51,4 +51,4 @@ class Barrier { } // namespace concurrent } // namespace vesoft -#endif // COMMON_CONCURRENT_SYNC_BARRIER_H_ +#endif // COMMON_CONCURRENT_BARRIER_H_ diff --git a/common/concurrent/CMakeLists.txt b/common/concurrent/CMakeLists.txt index f723d739d5a..3086fdd9cdf 100644 --- a/common/concurrent/CMakeLists.txt +++ b/common/concurrent/CMakeLists.txt @@ -1,7 +1,6 @@ -add_library(concurrent_obj OBJECT thread/NamedThread.cpp - thread/GenericWorker.cpp - thread/GenericThreadPool.cpp - sync/Barrier.cpp - sync/Latch.cpp +add_library( + concurrent_obj OBJECT + Barrier.cpp + Latch.cpp ) add_subdirectory(test) diff --git a/common/concurrent/sync/Latch.cpp b/common/concurrent/Latch.cpp similarity index 97% rename from common/concurrent/sync/Latch.cpp rename to common/concurrent/Latch.cpp index 930b7844765..eaa3ecd790c 100644 --- a/common/concurrent/sync/Latch.cpp +++ b/common/concurrent/Latch.cpp @@ -3,7 +3,7 @@ * This source code is licensed under Apache 2.0 License * (found in the LICENSE.Apache file in the root directory) */ -#include "concurrent/sync/Latch.h" +#include "concurrent/Latch.h" namespace vesoft { namespace concurrent { diff --git a/common/concurrent/sync/Latch.h b/common/concurrent/Latch.h similarity index 82% rename from common/concurrent/sync/Latch.h rename to common/concurrent/Latch.h index 266b2ba22a5..63411e8df8f 100644 --- a/common/concurrent/sync/Latch.h +++ b/common/concurrent/Latch.h @@ -3,20 +3,21 @@ * This source code is licensed under Apache 2.0 License * (found in the LICENSE.Apache file in the root directory) */ -#ifndef COMMON_CONCURRENT_SYNC_LATCH_H_ -#define COMMON_CONCURRENT_SYNC_LATCH_H_ +#ifndef COMMON_CONCURRENT_LATCH_H_ +#define COMMON_CONCURRENT_LATCH_H_ #include #include +#include "common/cpp/helpers.h" /** * Latch is an one-shot synchronization object. - * It provides synchronization point for multiple threads. + * It provides an synchronization point for multiple threads. * See shared/concurrent/test/LatchTest.cpp for use scenarios. */ namespace vesoft { namespace concurrent { -class Latch { +class Latch final : public vesoft::cpp::NonCopyable, public vesoft::cpp::NonMovable { public: /** * @counter: initial counter, @@ -24,11 +25,7 @@ class Latch { * Throws `std::invalid_argument' if counter is zero. */ explicit Latch(size_t counter); - Latch(const Latch&) = delete; - Latch(Latch&&) = delete; ~Latch() = default; - Latch& operator=(const Latch&) = delete; - Latch& operator=(Latch&&) = delete; /** * Decrements the internal counter by one. * If the counter reaches 0, all blocking(in `wait') @@ -64,4 +61,4 @@ class Latch { } // namespace concurrent } // namespace vesoft -#endif // COMMON_CONCURRENT_SYNC_LATCH_H_ +#endif // COMMON_CONCURRENT_LATCH_H_ diff --git a/common/concurrent/test/BarrierTest.cpp b/common/concurrent/test/BarrierTest.cpp index 92d41ac6ccb..fbf503c1f5b 100644 --- a/common/concurrent/test/BarrierTest.cpp +++ b/common/concurrent/test/BarrierTest.cpp @@ -8,8 +8,8 @@ #include #include #include -#include "concurrent/sync/Barrier.h" -#include "concurrent/thread/GenericThreadPool.h" +#include "common/concurrent/Barrier.h" +#include "common/thread/GenericThreadPool.h" namespace vesoft { namespace concurrent { diff --git a/common/concurrent/test/CMakeLists.txt b/common/concurrent/test/CMakeLists.txt index d9c9340675b..01f5afd7616 100644 --- a/common/concurrent/test/CMakeLists.txt +++ b/common/concurrent/test/CMakeLists.txt @@ -1,8 +1,9 @@ -add_executable(concurrent_test ThreadTest.cpp - GenericWorkerTest.cpp - GenericThreadPoolTest.cpp - ThreadLocalPtrTest.cpp +add_executable( + concurrent_test + BarrierTest.cpp + LatchTest.cpp $ + $ $ ) target_link_libraries(concurrent_test ev gtest gtest_main pthread) diff --git a/common/concurrent/test/LatchTest.cpp b/common/concurrent/test/LatchTest.cpp index 06b36e1cae6..8162e45d6b2 100644 --- a/common/concurrent/test/LatchTest.cpp +++ b/common/concurrent/test/LatchTest.cpp @@ -8,8 +8,8 @@ #include #include #include -#include "concurrent/sync/Latch.h" -#include "concurrent/thread/GenericThreadPool.h" +#include "common/concurrent/Latch.h" +#include "common/thread/GenericThreadPool.h" namespace vesoft { namespace concurrent { @@ -63,7 +63,7 @@ TEST(LatchTest, JoinLikeTest) { // start bunch of tasks, then wait for them all done. constexpr auto nthreads = 4UL; constexpr auto ntasks = 16UL; - concurrent::thread::GenericThreadPool pool; + thread::GenericThreadPool pool; Latch latch(ntasks); std::atomic counter{0}; auto task = [&] () { @@ -83,7 +83,7 @@ TEST(LatchTest, SignalTest) { // Do I/O works with single thread, and CPU works concurrently. constexpr auto nthreads = 16UL; constexpr auto ntasks = 16UL; - concurrent::thread::GenericThreadPool pool; + thread::GenericThreadPool pool; Latch latch(1); pool.start(nthreads); std::atomic counter{0}; diff --git a/common/concurrent/test/ThreadLocalPtrTest.cpp b/common/concurrent/test/ThreadLocalPtrTest.cpp deleted file mode 100644 index 2c0ef8e29ad..00000000000 --- a/common/concurrent/test/ThreadLocalPtrTest.cpp +++ /dev/null @@ -1,113 +0,0 @@ -/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved - * - * This source code is licensed under Apache 2.0 License - * (found in the LICENSE.Apache file in the root directory) - */ -#include -#include -#include "concurrent/thread/ThreadLocalPtr.h" -#include "concurrent/sync/Barrier.h" - -namespace vesoft { -namespace concurrent { - -TEST(ThreadLocalPtr, SingleThread) { - ThreadLocalPtr tls([](auto *ptr){ delete ptr; }); - auto cb = [&] () { - ASSERT_EQ(nullptr, tls.get()); - tls.reset(new std::string("Bohemian Rhapsody")); - ASSERT_EQ("Bohemian Rhapsody", *tls.get()); - tls.reset(); - ASSERT_EQ(nullptr, tls.get()); - }; - std::thread thread(cb); - thread.join(); -} - -TEST(ThreadLocalPtr, MultiThread) { - ThreadLocalPtr tls([](auto *ptr){ delete ptr; }); - auto cb = [&] (int idx) { - ASSERT_EQ(nullptr, tls.get()); - tls.reset(new int(idx)); - ASSERT_EQ(idx, *tls.get()); - ASSERT_EQ(idx, *tls); - tls.reset(); - ASSERT_EQ(nullptr, tls.get()); - }; - std::vector threads; - for (auto i = 0; i < 128; i++) { - threads.emplace_back(cb, i); - } - - for (auto &th : threads) { - th.join(); - } -} - -TEST(ThreadLocalPtr, Operators) { - struct Object { int i{0xAA}; }; - ThreadLocalPtr tls([](auto *ptr) { delete ptr; }); - - ASSERT_FALSE(static_cast(tls)); - ASSERT_TRUE(!tls); - ASSERT_FALSE(!!tls); - ASSERT_TRUE(tls == nullptr); - ASSERT_TRUE(nullptr == tls); - - tls.reset(new Object); - - ASSERT_TRUE(static_cast(tls)); - ASSERT_FALSE(!tls); - ASSERT_TRUE(!!tls); - ASSERT_TRUE(nullptr != tls); - ASSERT_TRUE(tls != nullptr); - ASSERT_EQ(0xAA, tls->i); - ASSERT_EQ(0xAA, (*tls).i); - - tls.reset(); - - ASSERT_FALSE(static_cast(tls)); - ASSERT_TRUE(!tls); - ASSERT_FALSE(!!tls); - ASSERT_TRUE(tls == nullptr); - ASSERT_TRUE(nullptr == tls); -} - -class Object { -public: - Object() { ++cnt_; } - ~Object() { --cnt_; } - static std::atomic cnt_; -}; -std::atomic Object::cnt_{0}; - -TEST(ThreadLocalPtr, Destructor) { - constexpr auto N = 128; - auto completion = [=] () { - ASSERT_EQ(N, Object::cnt_.load()); - }; - - concurrent::Barrier barrier(N, completion); - - ThreadLocalPtr tls([](auto *ptr) { delete ptr; }); - auto cb = [&] () { - ASSERT_EQ(nullptr, tls.get()); - tls.reset(new Object); - ASSERT_NE(nullptr, tls.get()); - barrier.wait(); - }; - std::vector threads; - for (auto i = 0; i < 128; i++) { - threads.emplace_back(cb); - } - - for (auto &th : threads) { - th.join(); - } - - ASSERT_EQ(0UL, Object::cnt_.load()); -} - - -} // namespace concurrent -} // namespace vesoft diff --git a/common/concurrent/thread/ThreadLocalPtr.h b/common/concurrent/thread/ThreadLocalPtr.h deleted file mode 100644 index c826e42c7f3..00000000000 --- a/common/concurrent/thread/ThreadLocalPtr.h +++ /dev/null @@ -1,220 +0,0 @@ -/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved - * - * This source code is licensed under Apache 2.0 License - * (found in the LICENSE.Apache file in the root directory) - */ -#ifndef COMMON_CONCURRENT_THREAD_THREADLOCALPTR_H_ -#define COMMON_CONCURRENT_THREAD_THREADLOCALPTR_H_ - -#include -#include -#include -#include -#include -#include -#include "common/concurrent/thread/NamedThread.h" -#include "common/cpp/helpers.h" - -/** - * `ThreadLocalPtr' is based on pthread's thread-specific data key. - * Beside the normal TLS capability, `ThreadLocalPtr' allows us to collect - * all of other threads' TLS data, which is usually needed in the udpate-mostly, - * read-rarely occasions. - * - * Please refer to concurrent/test/ThreadLocalPtrTest.cpp for examples. - */ - -namespace vesoft { -namespace concurrent { - -template -class ThreadLocalPtr : public vesoft::cpp::NonCopyable, public vesoft::cpp::NonMovable { -public: - /** - * @dtor If provided, the callback would be invoked when an existing - * TLS data needs to be destructed. - */ - ThreadLocalPtr(std::function dtor = nullptr); - ~ThreadLocalPtr(); - /** - * Set or reset TLS data for the current thread. - */ - void reset(const T *data = nullptr); - /** - * Obtain the TLS data of the current thread, `nullptr' if there is none. - * - * Please NOTE that constness leaking might happens here. - */ - T* get() const; - /** - * Some convenience operator overloads. - */ - T* operator->() const; - T& operator*() const; - explicit operator bool() const; - /** - * Retrieve all TLS data - */ - std::vector getAll() const; - -private: - struct MetaBlock { - const T *data_; - pid_t tid_; - ThreadLocalPtr *owner_; - }; - MetaBlock* getMetaBlock() const; - void setMetaBlock(MetaBlock *meta) const; - void destructData(const T *data); - void destructMetaBlock(MetaBlock *meta); - static void destructor(void *meta); - -private: - pthread_key_t key_; - std::function dtor_; - std::unordered_map metaPerThread_; - mutable std::mutex lock_; -}; - -/** - * Compare with nullptr - */ - -template -bool operator==(const ThreadLocalPtr &ptr, nullptr_t) { - return !ptr; -} - -template -bool operator==(nullptr_t, const ThreadLocalPtr &ptr) { - return !ptr; -} - -template -bool operator !=(const ThreadLocalPtr &ptr, nullptr_t) { - return !!ptr; -} - -template -bool operator!=(nullptr_t, const ThreadLocalPtr &ptr) { - return !!ptr; -} - -template -ThreadLocalPtr::ThreadLocalPtr(std::function dtor) { - dtor_ = std::move(dtor); - auto status = ::pthread_key_create(&key_, destructor); - if (status != 0) { - throw std::runtime_error("::pthread_key_create failed with " + std::to_string(status)); - } -} - -template -ThreadLocalPtr::~ThreadLocalPtr() { - std::lock_guard guard(lock_); - for (auto &e : metaPerThread_) { - if (e.second->data_ != nullptr) { - fprintf(stderr, "Outstanding thread still exists, tid: %d, type: %s\n", e.first, typeid(T).name()); - } - delete e.second; - } -} - -template -void ThreadLocalPtr::reset(const T *data) { - auto *meta = getMetaBlock(); - if (meta == nullptr) { - meta = new MetaBlock; - meta->data_ = data; - meta->owner_ = this; - meta->tid_ = concurrent::gettid(); - { - std::lock_guard guard(lock_); - metaPerThread_.emplace(meta->tid_, meta); - } - setMetaBlock(meta); - } else { - auto *old = meta->data_; - meta->data_ = data; - destructData(old); - } -} - -template -T* ThreadLocalPtr::get() const { - auto meta = getMetaBlock(); - if (meta != nullptr) { - return const_cast(meta->data_); - } - return nullptr; -} - -template -T* ThreadLocalPtr::operator->() const { - return this->get(); -} - -template -T& ThreadLocalPtr::operator*() const { - return *this->get(); -} - -template -ThreadLocalPtr::operator bool() const { - return this->get() != nullptr; -} - -template -std::vector ThreadLocalPtr::getAll() const { - std::vector all; - { - std::lock_guard guard(lock_); - for (auto &e : metaPerThread_) { - all.push_back(const_cast(e.second->data_)); - } - } - return all; -} - -template -void ThreadLocalPtr::destructor(void *meta) { - assert(meta != nullptr); - auto *m = reinterpret_cast(meta); - m->owner_->destructMetaBlock(m); -} - -template -typename ThreadLocalPtr::MetaBlock* -ThreadLocalPtr::getMetaBlock() const { - return reinterpret_cast(::pthread_getspecific(key_)); -} - -template -void ThreadLocalPtr::setMetaBlock(MetaBlock *meta) const { - ::pthread_setspecific(key_, meta); -} - -template -void ThreadLocalPtr::destructData(const T *data) { - if (dtor_ != nullptr && data != nullptr) { - dtor_(const_cast(data)); - } -} - -template -void ThreadLocalPtr::destructMetaBlock(MetaBlock *meta) { - { - std::lock_guard guard(lock_); - auto iter = metaPerThread_.find(meta->tid_); - if (iter != metaPerThread_.end()) { - metaPerThread_.erase(iter); - } - } - destructData(meta->data_); - delete meta; -} - -} // namespace concurrent -} // namespace vesoft - -#endif // COMMON_CONCURRENT_THREAD_THREADLOCALPTR_H_ diff --git a/common/cpp/macros.h b/common/cpp/macros.h deleted file mode 100644 index 639a9e65b27..00000000000 --- a/common/cpp/macros.h +++ /dev/null @@ -1,34 +0,0 @@ -/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved - * - * This source code is licensed under Apache 2.0 License - * (found in the LICENSE.Apache file in the root directory) - */ -#ifndef CPP_MACROS_H_ -#define CPP_MACROS_H_ - -#define VE_MUST_USE_RESULT __attribute__((warn_unused_result)) -#define VE_DONT_OPTIMIZE __attribute__((optimize("O0"))) - -#define VE_ALWAYS_INLINE __attribute__((always_inline)) -#define VE_ALWAYS_NO_INLINE __attribute__((noinline)) - -#define VE_BEGIN_NO_OPTIMIZATION _Pragma("GCC push_options") \ - _Pragma("GCC optimize(\"O0\")") -#define VE_END_NO_OPTIMIZATION _Pragma("GCC pop_options") - -#ifndef likely -#define likely(x) __builtin_expect((x),1) -#endif // likely -#ifndef unlikely -#define unlikely(x) __builtin_expect((x),0) -#endif // unlikely - -#ifndef UNUSED -#define UNUSED(x) (void)(x) -#endif // UNUSED - -#ifndef COMPILER_BARRIER -#define COMPILER_BARRIER() asm volatile ("":::"memory") -#endif // COMPILER_BARRIER - -#endif // CPP_MACROS_H_ diff --git a/common/thread/CMakeLists.txt b/common/thread/CMakeLists.txt new file mode 100644 index 00000000000..78c978a9ebd --- /dev/null +++ b/common/thread/CMakeLists.txt @@ -0,0 +1,7 @@ +add_library( + thread_obj OBJECT + NamedThread.cpp + GenericWorker.cpp + GenericThreadPool.cpp +) +add_subdirectory(test) diff --git a/common/concurrent/thread/GenericThreadPool.cpp b/common/thread/GenericThreadPool.cpp similarity index 92% rename from common/concurrent/thread/GenericThreadPool.cpp rename to common/thread/GenericThreadPool.cpp index bd9d05240fd..1bd4984fc6a 100644 --- a/common/concurrent/thread/GenericThreadPool.cpp +++ b/common/thread/GenericThreadPool.cpp @@ -3,10 +3,10 @@ * This source code is licensed under Apache 2.0 License * (found in the LICENSE.Apache file in the root directory) */ -#include "common/concurrent/thread/GenericThreadPool.h" +#include "common/thread/GenericThreadPool.h" namespace vesoft { -namespace concurrent { +namespace thread { GenericThreadPool::GenericThreadPool() { } @@ -53,5 +53,5 @@ void GenericThreadPool::purgeTimerTask(uint64_t id) { pool_[idx]->purgeTimerTask(id); } -} // namespace concurrent +} // namespace thread } // namespace vesoft diff --git a/common/concurrent/thread/GenericThreadPool.h b/common/thread/GenericThreadPool.h similarity index 92% rename from common/concurrent/thread/GenericThreadPool.h rename to common/thread/GenericThreadPool.h index 0067881426b..434befb0b84 100644 --- a/common/concurrent/thread/GenericThreadPool.h +++ b/common/thread/GenericThreadPool.h @@ -3,9 +3,9 @@ * This source code is licensed under Apache 2.0 License * (found in the LICENSE.Apache file in the root directory) */ -#ifndef COMMON_CONCURRENT_THREAD_GENERICTHREADPOOL_H_ -#define COMMON_CONCURRENT_THREAD_GENERICTHREADPOOL_H_ -#include "common/concurrent/thread/GenericWorker.h" +#ifndef COMMON_THREAD_GENERICTHREADPOOL_H_ +#define COMMON_THREAD_GENERICTHREADPOOL_H_ +#include "common/thread/GenericWorker.h" /** * Based on GenericWorker, GenericThreadPool implements a thread pool that execute tasks asynchronously. @@ -17,9 +17,9 @@ */ namespace vesoft { -namespace concurrent { +namespace thread { -class GenericThreadPool : public vesoft::cpp::NonCopyable, public vesoft::cpp::NonMovable { +class GenericThreadPool final : public vesoft::cpp::NonCopyable, public vesoft::cpp::NonMovable { public: GenericThreadPool(); ~GenericThreadPool(); @@ -120,7 +120,7 @@ uint64_t GenericThreadPool::addRepeatTask(size_t ms, F &&f, Args &&...args) { return ((idx << GenericWorker::TIMER_ID_BITS) | id); } -} // namespace concurrent +} // namespace thread } // namespace vesoft -#endif // COMMON_CONCURRENT_THREAD_GENERICTHREADPOOL_H_ +#endif // COMMON_THREAD_GENERICTHREADPOOL_H_ diff --git a/common/concurrent/thread/GenericWorker.cpp b/common/thread/GenericWorker.cpp similarity index 97% rename from common/concurrent/thread/GenericWorker.cpp rename to common/thread/GenericWorker.cpp index 10c54a710e7..544a5fa27af 100644 --- a/common/concurrent/thread/GenericWorker.cpp +++ b/common/thread/GenericWorker.cpp @@ -3,7 +3,7 @@ * This source code is licensed under Apache 2.0 License * (found in the LICENSE.Apache file in the root directory) */ -#include "common/concurrent/thread/GenericWorker.h" +#include "common/thread/GenericWorker.h" #include #ifndef EV_MULTIPLICITY @@ -12,7 +12,7 @@ #include namespace vesoft { -namespace concurrent { +namespace thread { GenericWorker::GenericWorker() { } @@ -158,6 +158,6 @@ void GenericWorker::purgeTimerInternal(uint64_t id) { } } -} // namespace concurrent +} // namespace thread } // namespace vesoft diff --git a/common/concurrent/thread/GenericWorker.h b/common/thread/GenericWorker.h similarity index 94% rename from common/concurrent/thread/GenericWorker.h rename to common/thread/GenericWorker.h index cd7d3d5fa09..e379c04dc50 100644 --- a/common/concurrent/thread/GenericWorker.h +++ b/common/thread/GenericWorker.h @@ -3,8 +3,8 @@ * This source code is licensed under Apache 2.0 License * (found in the LICENSE.Apache file in the root directory) */ -#ifndef COMMON_CONCURRENT_THREAD_GENERICWORKER_H_ -#define COMMON_CONCURRENT_THREAD_GENERICWORKER_H_ +#ifndef COMMON_THREAD_GENERICWORKER_H_ +#define COMMON_THREAD_GENERICWORKER_H_ #include #include #include @@ -12,8 +12,8 @@ #include #include #include "common/cpp/helpers.h" -#include "common/cpp/macros.h" -#include "common/concurrent/thread/NamedThread.h" +#include "common/base/Base.h" +#include "common/thread/NamedThread.h" /** * GenericWorker implements a event-based task executor that executes tasks asynchronously @@ -31,9 +31,9 @@ struct ev_timer; struct ev_async; namespace vesoft { -namespace concurrent { +namespace thread { -class GenericWorker : public vesoft::cpp::NonCopyable, public vesoft::cpp::NonMovable { +class GenericWorker final : public vesoft::cpp::NonCopyable, public vesoft::cpp::NonMovable { public: friend class GenericThreadPool; GenericWorker(); @@ -193,7 +193,7 @@ uint64_t GenericWorker::addTimerTask(size_t delay, size_t interval, F &&f, Args return id; } -} // namespace concurrent +} // namespace thread } // namespace vesoft -#endif // COMMON_CONCURRENT_THREAD_GENERICWORKER_H_ +#endif // COMMON_THREAD_GENERICWORKER_H_ diff --git a/common/concurrent/thread/NamedThread.cpp b/common/thread/NamedThread.cpp similarity index 77% rename from common/concurrent/thread/NamedThread.cpp rename to common/thread/NamedThread.cpp index 7b76076ca9a..3f22b88640a 100644 --- a/common/concurrent/thread/NamedThread.cpp +++ b/common/thread/NamedThread.cpp @@ -3,11 +3,12 @@ * This source code is licensed under Apache 2.0 License * (found in the LICENSE.Apache file in the root directory) */ -#include "common/concurrent/thread/NamedThread.h" +#include "common/thread/NamedThread.h" namespace vesoft { -namespace concurrent { +namespace thread { +namespace detail { class TLSThreadID { public: TLSThreadID() { @@ -25,11 +26,12 @@ class TLSThreadID { private: pid_t tid_; }; +} pid_t gettid() { - static thread_local TLSThreadID tlstid; + static thread_local detail::TLSThreadID tlstid; return tlstid.tid(); } -} // namespace concurrent +} // namespace thread } // namespace vesoft diff --git a/common/concurrent/thread/NamedThread.h b/common/thread/NamedThread.h similarity index 87% rename from common/concurrent/thread/NamedThread.h rename to common/thread/NamedThread.h index 96f738a2fd8..df01075796c 100644 --- a/common/concurrent/thread/NamedThread.h +++ b/common/thread/NamedThread.h @@ -3,8 +3,8 @@ * This source code is licensed under Apache 2.0 License * (found in the LICENSE.Apache file in the root directory) */ -#ifndef COMMON_CONCURRENT_THREAD_NAMEDTHREAD_H_ -#define COMMON_CONCURRENT_THREAD_NAMEDTHREAD_H_ +#ifndef COMMON_THREAD_NAMEDTHREAD_H_ +#define COMMON_THREAD_NAMEDTHREAD_H_ #ifndef _GNU_SOURCE #define _GNU_SOURCE @@ -18,11 +18,11 @@ #include namespace vesoft { -namespace concurrent { +namespace thread { pid_t gettid(); -class NamedThread : public std::thread { +class NamedThread final : public std::thread { public: NamedThread() = default; NamedThread(NamedThread&&) = default; @@ -69,7 +69,7 @@ class NamedThread : public std::thread { static void hook(NamedThread *thread, const std::string &name, const std::function &f) { - thread->tid_ = vesoft::concurrent::gettid(); + thread->tid_ = vesoft::thread::gettid(); if (!name.empty()) { Nominator::set(name); } @@ -86,7 +86,7 @@ NamedThread::NamedThread(const std::string &name, F &&f, Args&&...args) std::bind(std::forward(f), std::forward(args)...)) { }; -} // namespace concurrent +} // namespace thread } // namespace vesoft -#endif // COMMON_CONCURRENT_THREAD_NAMEDTHREAD_H_ +#endif // COMMON_THREAD_NAMEDTHREAD_H_ diff --git a/common/thread/test/CMakeLists.txt b/common/thread/test/CMakeLists.txt new file mode 100644 index 00000000000..8f349a2942a --- /dev/null +++ b/common/thread/test/CMakeLists.txt @@ -0,0 +1,11 @@ +add_executable( + thread_test + ThreadTest.cpp + GenericWorkerTest.cpp + GenericThreadPoolTest.cpp + $ + $ + $ +) +target_link_libraries(thread_test ev gtest gtest_main pthread) +add_test(NAME thread_test COMMAND thread_test) diff --git a/common/concurrent/test/GenericThreadPoolTest.cpp b/common/thread/test/GenericThreadPoolTest.cpp similarity index 97% rename from common/concurrent/test/GenericThreadPoolTest.cpp rename to common/thread/test/GenericThreadPoolTest.cpp index 06ec0d6cc6b..034432da501 100644 --- a/common/concurrent/test/GenericThreadPoolTest.cpp +++ b/common/thread/test/GenericThreadPoolTest.cpp @@ -7,12 +7,12 @@ #include #include #include -#include "common/concurrent/thread/GenericThreadPool.h" +#include "common/thread/GenericThreadPool.h" #include "common/time/Duration.h" using namespace std; namespace vesoft { -namespace concurrent { +namespace thread { TEST(GenericThreadPool, StartAndStop) { // inactive pool @@ -148,5 +148,5 @@ TEST(GenericThreadPool, purgeRepeatTask) { } } -} // namespace concurrent +} // namespace thread } // namespace vesoft diff --git a/common/concurrent/test/GenericWorkerTest.cpp b/common/thread/test/GenericWorkerTest.cpp similarity index 97% rename from common/concurrent/test/GenericWorkerTest.cpp rename to common/thread/test/GenericWorkerTest.cpp index 28c06416de5..b90e50bb6b4 100644 --- a/common/concurrent/test/GenericWorkerTest.cpp +++ b/common/thread/test/GenericWorkerTest.cpp @@ -7,11 +7,11 @@ #include #include #include -#include "concurrent/thread/GenericWorker.h" +#include "common/thread/GenericWorker.h" using namespace std; namespace vesoft { -namespace concurrent { +namespace thread { TEST(GenericWorker, StartAndStop) { // inactive worker @@ -148,5 +148,5 @@ TEST(GenericWorker, purgeRepeatTask) { } } -} // namespace concurrent +} // namespace thread } // namespace vesoft diff --git a/common/concurrent/test/ThreadTest.cpp b/common/thread/test/ThreadTest.cpp similarity index 88% rename from common/concurrent/test/ThreadTest.cpp rename to common/thread/test/ThreadTest.cpp index a34d24f3598..0a329922ca6 100644 --- a/common/concurrent/test/ThreadTest.cpp +++ b/common/thread/test/ThreadTest.cpp @@ -4,10 +4,10 @@ * (found in the LICENSE.Apache file in the root directory) */ #include -#include "concurrent/thread/NamedThread.h" +#include "common/thread/NamedThread.h" namespace vesoft { -namespace concurrent { +namespace thread { TEST(NamedThread, ThreadName) { std::string setname("thread"); @@ -31,5 +31,5 @@ TEST(NamedThread, ThreadID) { ASSERT_EQ(tid, thread.tid()); } -} // namespace concurrent +} // namespace thread } // namespace vesoft