Skip to content

Commit

Permalink
[Feature] Added some concurrent utilities, GenericThreadPool, etc. (v…
Browse files Browse the repository at this point in the history
…esoft-inc#3)

* [Feature] Added some concurrent utilities, GenericThreadPool, etc.
  • Loading branch information
dutor authored Sep 5, 2018
1 parent 7eb5439 commit 92d917b
Show file tree
Hide file tree
Showing 24 changed files with 1,511 additions and 1 deletion.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,7 @@ common/base/Base.h.gch
CMakeCache.txt
Makefile
cmake_install.cmake
CTestTestfile.cmake
CMakeFiles/
Testing/

2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ set(CMAKE_SKIP_RPATH TRUE)

set(CMAKE_VERBOSE_MAKEFILE TRUE)

enable_testing()

if (!CMAKE_CXX_COMPILER)
message(FATAL_ERROR "No C++ compiler found")
endif()
Expand Down
4 changes: 3 additions & 1 deletion common/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
add_custom_target(
common ALL
DEPENDS base_obj time proc network fs
DEPENDS base_obj time proc network fs concurrent thread
)

add_subdirectory(base)
add_subdirectory(time)
add_subdirectory(proc)
add_subdirectory(network)
add_subdirectory(fs)
add_subdirectory(concurrent)
add_subdirectory(thread)

18 changes: 18 additions & 0 deletions common/base/Base.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,24 @@

//#include "base/StringUnorderedMap.h"

#define VE_MUST_USE_RESULT __attribute__((warn_unused_result))
#define VE_DONT_OPTIMIZE __attribute__((optimize("O0")))

#define VE_ALWAYS_INLINE __attribute__((always_inline))
#define VE_ALWAYS_NO_INLINE __attribute__((noinline))

#define VE_BEGIN_NO_OPTIMIZATION _Pragma("GCC push_options") \
_Pragma("GCC optimize(\"O0\")")
#define VE_END_NO_OPTIMIZATION _Pragma("GCC pop_options")

#ifndef UNUSED
#define UNUSED(x) (void)(x)
#endif // UNUSED

#ifndef COMPILER_BARRIER
#define COMPILER_BARRIER() asm volatile ("":::"memory")
#endif // COMPILER_BARRIER

namespace vesoft {

using GraphSpaceID = int32_t;
Expand Down
37 changes: 37 additions & 0 deletions common/concurrent/Barrier.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved
*
* This source code is licensed under Apache 2.0 License
* (found in the LICENSE.Apache file in the root directory)
*/
#include "concurrent/Barrier.h"


namespace vesoft {
namespace concurrent {

Barrier::Barrier(size_t counter, std::function<void()> completion) {
if (counter == 0) {
throw std::invalid_argument("Barrier counter can't be zero");
}
completion_ = std::move(completion);
counter_ = counter;
ages_ = counter_;
}

void Barrier::wait() {
std::unique_lock<std::mutex> guard(lock_);
if (--ages_ == 0) {
ages_ = counter_;
++generation_;
if (completion_ != nullptr) {
completion_();
}
cond_.notify_all();
} else {
auto current = generation_;
cond_.wait(guard, [=] () { return current != generation_; });
}
}

} // namespace concurrent
} // namespace vesoft
54 changes: 54 additions & 0 deletions common/concurrent/Barrier.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved
*
* This source code is licensed under Apache 2.0 License
* (found in the LICENSE.Apache file in the root directory)
*/
#ifndef COMMON_CONCURRENT_BARRIER_H_
#define COMMON_CONCURRENT_BARRIER_H_
#include <mutex>
#include <condition_variable>
#include <functional>
#include "common/cpp/helpers.h"

/**
* Like `Latch', `Barrier' is a synchronization object, except that
* `Barrier' is reusable.
* Besides, `Barrier' features with an optional callable object,
* which would be invoked at the completion phase, i.e. synchronization point,
* by the last participating thread who entered `wait', before waking up other blocking threads.
*/

namespace vesoft {
namespace concurrent {

class Barrier final : public vesoft::cpp::NonCopyable, public vesoft::cpp::NonMovable {
public:
/**
* @counter number of participating threads
* @completion callback invoked at the completion phase
*/
explicit Barrier(size_t counter, std::function<void()> completion = nullptr);
~Barrier() = default;
/**
* Decrements the internal counter.
* If the counter reaches zero, the completion callback would be invoked if present,
* then all preceding blocked threads would be woken up, with the internal counter
* reset to the original value.
* Otherwise, the calling thread would be blocked to wait for
* other participants' arrival.
*/
void wait();

private:
std::function<void()> completion_{nullptr};
size_t counter_{0};
size_t ages_{0};
size_t generation_{0};
std::mutex lock_;
std::condition_variable cond_;
};

} // namespace concurrent
} // namespace vesoft

#endif // COMMON_CONCURRENT_BARRIER_H_
6 changes: 6 additions & 0 deletions common/concurrent/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
add_library(
concurrent_obj OBJECT
Barrier.cpp
Latch.cpp
)
add_subdirectory(test)
50 changes: 50 additions & 0 deletions common/concurrent/Latch.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved
*
* This source code is licensed under Apache 2.0 License
* (found in the LICENSE.Apache file in the root directory)
*/
#include "concurrent/Latch.h"

namespace vesoft {
namespace concurrent {

Latch::Latch(size_t counter) {
if (counter == 0) {
throw std::invalid_argument("Zero Latch counter");
}
counter_ = counter;
}

void Latch::down() {
std::unique_lock<std::mutex> unique(lock_);
if (counter_ == 0) {
throw std::runtime_error("Count down on zero Latch");
}
if (--counter_ == 0) {
cond_.notify_all();
}
}

void Latch::downWait() {
std::unique_lock<std::mutex> unique(lock_);
if (counter_ == 0) {
throw std::runtime_error("Count down on zero Latch");
}
if (--counter_ == 0) {
cond_.notify_all();
return;
}
cond_.wait(unique, [this] () { return counter_ == 0; });
}

void Latch::wait() {
std::unique_lock<std::mutex> unique(lock_);
cond_.wait(unique, [this] () { return counter_ == 0; });
}

bool Latch::isReady() {
return counter_ == 0;
}

} // namespace concurrent
} // namespace vesoft
64 changes: 64 additions & 0 deletions common/concurrent/Latch.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved
*
* This source code is licensed under Apache 2.0 License
* (found in the LICENSE.Apache file in the root directory)
*/
#ifndef COMMON_CONCURRENT_LATCH_H_
#define COMMON_CONCURRENT_LATCH_H_
#include <mutex>
#include <condition_variable>
#include "common/cpp/helpers.h"
/**
* Latch is an one-shot synchronization object.
* It provides an synchronization point for multiple threads.
* See shared/concurrent/test/LatchTest.cpp for use scenarios.
*/

namespace vesoft {
namespace concurrent {

class Latch final : public vesoft::cpp::NonCopyable, public vesoft::cpp::NonMovable {
public:
/**
* @counter: initial counter,
* typically number of participating threads.
* Throws `std::invalid_argument' if counter is zero.
*/
explicit Latch(size_t counter);
~Latch() = default;
/**
* Decrements the internal counter by one.
* If the counter reaches 0, all blocking(in `wait')
* threads will be given the green light.
* Throws `std::runtime_error' if counter already zeroed.
*/
void down();
/**
* Decrements the internal counter by one.
* If the counter reaches 0, returns immediately,
* and all blocking threads will be woken up.
* Otherwise, the calling thread blocks until being woken up.
* Throws `std::runtime_error' if counter already zeroed.
*/
void downWait();
/**
* Blocks if internal counter not zero.
* Otherwise, returns immediately.
*/
void wait();
/**
* Returns true if internal counter already zeroed.
* Otherwise, returns false.
*/
bool isReady();

private:
volatile size_t counter_{0};
std::mutex lock_;
std::condition_variable cond_;
};

} // namespace concurrent
} // namespace vesoft

#endif // COMMON_CONCURRENT_LATCH_H_
96 changes: 96 additions & 0 deletions common/concurrent/test/BarrierTest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved
*
* This source code is licensed under Apache 2.0 License
* (found in the LICENSE.Apache file in the root directory)
*/
#include <gtest/gtest.h>
#include <cstdlib>
#include <thread>
#include <mutex>
#include <atomic>
#include "common/concurrent/Barrier.h"
#include "common/thread/GenericThreadPool.h"

namespace vesoft {
namespace concurrent {

TEST(BarrierTest, BasicTest) {
// test for invalid initial counter
{
ASSERT_THROW({Barrier barrier(0UL);}, std::invalid_argument);
}
// test for single-thread normal case
{
Barrier barrier(1UL);
barrier.wait();
ASSERT_TRUE(true);
}
// test for multiple-thread normal case
{
Barrier barrier(2UL);
std::atomic<size_t> counter{0};
auto cb = [&] () {
barrier.wait();
++counter;
};
std::thread thread(cb);
usleep(1000);
ASSERT_EQ(0UL, counter.load());
barrier.wait();
thread.join();
ASSERT_EQ(1UL, counter.load());
}
// test for multiple-thread completion
{
std::atomic<size_t> counter{0};
auto completion = [&] () {
++counter;
++counter;
};
Barrier barrier(2UL, completion);

auto cb = [&] () {
barrier.wait();
++counter;
};

std::thread thread(cb);
usleep(1000);
ASSERT_EQ(0UL, counter.load());
barrier.wait();
ASSERT_GE(counter.load(), 2UL);
thread.join();
ASSERT_EQ(3UL, counter.load());
}
}

TEST(BarrierTest, ConsecutiveTest) {
std::atomic<size_t> counter{0};
constexpr auto N = 64UL;
constexpr auto iters = 100UL;
auto completion = [&] () {
// At the completion phase, `counter' should be multiple to `N'.
ASSERT_EQ(0UL, counter.load() % N);
};

Barrier barrier(N, completion);
auto cb = [&] () {
auto i = iters;
while (i-- != 0) {
++counter;
barrier.wait();
}
};

std::vector<std::thread> threads;
for (auto i = 0UL; i < N; i++) {
threads.emplace_back(cb);
}
for (auto &thread : threads) {
thread.join();
}
ASSERT_EQ(0UL, counter.load() % N);
}

} // namespace concurrent
} // namespace vesoft
10 changes: 10 additions & 0 deletions common/concurrent/test/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
add_executable(
concurrent_test
BarrierTest.cpp
LatchTest.cpp
$<TARGET_OBJECTS:concurrent_obj>
$<TARGET_OBJECTS:thread_obj>
$<TARGET_OBJECTS:time_obj>
)
target_link_libraries(concurrent_test ev gtest gtest_main pthread)
add_test(NAME concurrent_test COMMAND concurrent_test)
Loading

0 comments on commit 92d917b

Please sign in to comment.