Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Added some concurrent utilities, GenericThreadPool, etc. #3

Merged
merged 2 commits into from
Sep 5, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,7 @@ common/base/Base.h.gch
CMakeCache.txt
Makefile
cmake_install.cmake
CTestTestfile.cmake
CMakeFiles/
Testing/

2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ set(CMAKE_SKIP_RPATH TRUE)

set(CMAKE_VERBOSE_MAKEFILE TRUE)

enable_testing()
dutor marked this conversation as resolved.
Show resolved Hide resolved

if (!CMAKE_CXX_COMPILER)
message(FATAL_ERROR "No C++ compiler found")
endif()
Expand Down
3 changes: 2 additions & 1 deletion common/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
add_custom_target(
common ALL
DEPENDS base_obj time proc network fs
DEPENDS base_obj time proc network fs concurrent
)

add_subdirectory(base)
add_subdirectory(time)
add_subdirectory(proc)
add_subdirectory(network)
add_subdirectory(fs)
add_subdirectory(concurrent)

7 changes: 7 additions & 0 deletions common/concurrent/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
add_library(concurrent_obj OBJECT thread/NamedThread.cpp
dutor marked this conversation as resolved.
Show resolved Hide resolved
thread/GenericWorker.cpp
thread/GenericThreadPool.cpp
sync/Barrier.cpp
sync/Latch.cpp
)
add_subdirectory(test)
37 changes: 37 additions & 0 deletions common/concurrent/sync/Barrier.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved
*
* This source code is licensed under Apache 2.0 License
* (found in the LICENSE.Apache file in the root directory)
*/
#include "concurrent/sync/Barrier.h"


namespace vesoft {
namespace concurrent {

Barrier::Barrier(size_t counter, std::function<void()> completion) {
if (counter == 0) {
throw std::invalid_argument("Zero barrier counter");
}
completion_ = std::move(completion);
counter_ = counter;
ages_ = counter_;
}

void Barrier::wait() {
std::unique_lock<std::mutex> unique(lock_);
dutor marked this conversation as resolved.
Show resolved Hide resolved
if (--ages_ == 0) {
ages_ = counter_;
++generation_;
if (completion_ != nullptr) {
completion_();
}
cond_.notify_all();
} else {
auto current = generation_;
cond_.wait(unique, [=] () { return current != generation_; });
}
}

} // namespace concurrent
} // namespace vesoft
54 changes: 54 additions & 0 deletions common/concurrent/sync/Barrier.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved
*
* This source code is licensed under Apache 2.0 License
* (found in the LICENSE.Apache file in the root directory)
*/
#ifndef COMMON_CONCURRENT_SYNC_BARRIER_H_
#define COMMON_CONCURRENT_SYNC_BARRIER_H_
#include <mutex>
#include <condition_variable>
#include <functional>

/**
* Like `Latch', `Barrier' is a synchronization object, except that
* `Barrier' is reusable.
* Besides, `Barrier' features with an optional callable object,
* which would be invoked at the completion phase, i.e. synchronization point,
* by the last participating thread entering `wait'.
dutor marked this conversation as resolved.
Show resolved Hide resolved
*/

namespace vesoft {
namespace concurrent {

class Barrier {
public:
explicit Barrier(size_t counter, std::function<void()> completion = nullptr);
dutor marked this conversation as resolved.
Show resolved Hide resolved
Barrier() = delete;
~Barrier() = default;
dutor marked this conversation as resolved.
Show resolved Hide resolved
Barrier(const Barrier&) = delete;
Barrier(Barrier&&) = delete;
Barrier& operator=(const Barrier&) = delete;
Barrier& operator=(Barrier&&) = delete;
/**
* Decrements the internal counter.
* If the counter reaches zero, the completion callback would be invoked if present,
* then all preceding blocked threads would be woken up, with the internal counter
* reset to the original value.
* Otherwise, the calling thread would be blocked to wait for
* other participants' arrival.
*/
void wait();

private:
std::function<void()> completion_{nullptr};
size_t counter_{0};
size_t ages_{0};
size_t generation_{0};
std::mutex lock_;
std::condition_variable cond_;
};

} // namespace concurrent
} // namespace vesoft

#endif // COMMON_CONCURRENT_SYNC_BARRIER_H_
50 changes: 50 additions & 0 deletions common/concurrent/sync/Latch.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved
*
* This source code is licensed under Apache 2.0 License
* (found in the LICENSE.Apache file in the root directory)
*/
#include "concurrent/sync/Latch.h"

namespace vesoft {
namespace concurrent {

Latch::Latch(size_t counter) {
if (counter == 0) {
throw std::invalid_argument("Zero Latch counter");
}
counter_ = counter;
}

void Latch::down() {
std::unique_lock<std::mutex> unique(lock_);
if (counter_ == 0) {
throw std::runtime_error("Count down on zero Latch");
}
if (--counter_ == 0) {
cond_.notify_all();
}
}

void Latch::downWait() {
std::unique_lock<std::mutex> unique(lock_);
if (counter_ == 0) {
throw std::runtime_error("Count down on zero Latch");
}
if (--counter_ == 0) {
cond_.notify_all();
return;
}
cond_.wait(unique, [this] () { return counter_ == 0; });
}

void Latch::wait() {
std::unique_lock<std::mutex> unique(lock_);
cond_.wait(unique, [this] () { return counter_ == 0; });
dutor marked this conversation as resolved.
Show resolved Hide resolved
}

bool Latch::isReady() {
return counter_ == 0;
}

} // namespace concurrent
} // namespace vesoft
67 changes: 67 additions & 0 deletions common/concurrent/sync/Latch.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved
*
* This source code is licensed under Apache 2.0 License
* (found in the LICENSE.Apache file in the root directory)
*/
#ifndef COMMON_CONCURRENT_SYNC_LATCH_H_
#define COMMON_CONCURRENT_SYNC_LATCH_H_
#include <mutex>
#include <condition_variable>
/**
* Latch is an one-shot synchronization object.
* It provides synchronization point for multiple threads.
* See shared/concurrent/test/LatchTest.cpp for use scenarios.
*/

namespace vesoft {
namespace concurrent {

class Latch {
public:
/**
* @counter: initial counter,
* typically number of participating threads.
* Throws `std::invalid_argument' if counter is zero.
*/
explicit Latch(size_t counter);
Latch(const Latch&) = delete;
Latch(Latch&&) = delete;
~Latch() = default;
dutor marked this conversation as resolved.
Show resolved Hide resolved
Latch& operator=(const Latch&) = delete;
Latch& operator=(Latch&&) = delete;
/**
* Decrements the internal counter by one.
* If the counter reaches 0, all blocking(in `wait')
* threads will be given the green light.
* Throws `std::runtime_error' if counter already zeroed.
*/
void down();
/**
* Decrements the internal counter by one.
* If the counter reaches 0, returns immediately,
* and all blocking threads will be woken up.
* Otherwise, the calling thread blocks until being woken up.
* Throws `std::runtime_error' if counter already zeroed.
*/
void downWait();
/**
* Blocks if internal counter not zero.
* Otherwise, returns immediately.
*/
void wait();
/**
* Returns true if internal counter already zeroed.
* Otherwise, returns false.
*/
bool isReady();
dutor marked this conversation as resolved.
Show resolved Hide resolved

private:
volatile size_t counter_{0};
std::mutex lock_;
std::condition_variable cond_;
};

} // namespace concurrent
} // namespace vesoft

#endif // COMMON_CONCURRENT_SYNC_LATCH_H_
96 changes: 96 additions & 0 deletions common/concurrent/test/BarrierTest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved
*
* This source code is licensed under Apache 2.0 License
* (found in the LICENSE.Apache file in the root directory)
*/
#include <gtest/gtest.h>
#include <cstdlib>
#include <thread>
#include <mutex>
#include <atomic>
#include "concurrent/sync/Barrier.h"
#include "concurrent/thread/GenericThreadPool.h"

namespace vesoft {
namespace concurrent {

TEST(BarrierTest, BasicTest) {
// test for invalid initial counter
{
ASSERT_THROW({Barrier barrier(0UL);}, std::invalid_argument);
}
// test for single-thread normal case
{
Barrier barrier(1UL);
barrier.wait();
ASSERT_TRUE(true);
}
// test for multiple-thread normal case
{
Barrier barrier(2UL);
std::atomic<size_t> counter{0};
auto cb = [&] () {
barrier.wait();
++counter;
};
std::thread thread(cb);
usleep(1000);
ASSERT_EQ(0UL, counter.load());
barrier.wait();
thread.join();
ASSERT_EQ(1UL, counter.load());
}
// test for multiple-thread completion
{
std::atomic<size_t> counter{0};
auto completion = [&] () {
++counter;
++counter;
};
Barrier barrier(2UL, completion);

auto cb = [&] () {
barrier.wait();
++counter;
};

std::thread thread(cb);
usleep(1000);
ASSERT_EQ(0UL, counter.load());
barrier.wait();
ASSERT_GE(counter.load(), 2UL);
thread.join();
ASSERT_EQ(3UL, counter.load());
}
}

TEST(BarrierTest, ConsecutiveTest) {
std::atomic<size_t> counter{0};
constexpr auto N = 64UL;
constexpr auto iters = 100UL;
auto completion = [&] () {
// At the completion phase, `counter' should be multiple to `N'.
ASSERT_EQ(0UL, counter.load() % N);
};

Barrier barrier(N, completion);
auto cb = [&] () {
auto i = iters;
while (i-- != 0) {
++counter;
barrier.wait();
}
};

std::vector<std::thread> threads;
for (auto i = 0UL; i < N; i++) {
threads.emplace_back(cb);
}
for (auto &thread : threads) {
thread.join();
}
ASSERT_EQ(0UL, counter.load() % N);
}

} // namespace concurrent
} // namespace vesoft
9 changes: 9 additions & 0 deletions common/concurrent/test/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
add_executable(concurrent_test ThreadTest.cpp
GenericWorkerTest.cpp
GenericThreadPoolTest.cpp
ThreadLocalPtrTest.cpp
$<TARGET_OBJECTS:concurrent_obj>
$<TARGET_OBJECTS:time_obj>
)
target_link_libraries(concurrent_test ev gtest gtest_main pthread)
add_test(NAME concurrent_test COMMAND concurrent_test)
Loading