Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Added some concurrent utilities, GenericThreadPool, etc. #3

Merged
merged 2 commits into from
Sep 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,7 @@ common/base/Base.h.gch
CMakeCache.txt
Makefile
cmake_install.cmake
CTestTestfile.cmake
CMakeFiles/
Testing/

2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ set(CMAKE_SKIP_RPATH TRUE)

set(CMAKE_VERBOSE_MAKEFILE TRUE)

enable_testing()
dutor marked this conversation as resolved.
Show resolved Hide resolved

if (!CMAKE_CXX_COMPILER)
message(FATAL_ERROR "No C++ compiler found")
endif()
Expand Down
4 changes: 3 additions & 1 deletion common/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
add_custom_target(
common ALL
DEPENDS base_obj time proc network fs
DEPENDS base_obj time proc network fs concurrent thread
)

add_subdirectory(base)
add_subdirectory(time)
add_subdirectory(proc)
add_subdirectory(network)
add_subdirectory(fs)
add_subdirectory(concurrent)
add_subdirectory(thread)

18 changes: 18 additions & 0 deletions common/base/Base.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,24 @@

//#include "base/StringUnorderedMap.h"

#define VE_MUST_USE_RESULT __attribute__((warn_unused_result))
#define VE_DONT_OPTIMIZE __attribute__((optimize("O0")))

#define VE_ALWAYS_INLINE __attribute__((always_inline))
#define VE_ALWAYS_NO_INLINE __attribute__((noinline))

#define VE_BEGIN_NO_OPTIMIZATION _Pragma("GCC push_options") \
_Pragma("GCC optimize(\"O0\")")
#define VE_END_NO_OPTIMIZATION _Pragma("GCC pop_options")

#ifndef UNUSED
#define UNUSED(x) (void)(x)
#endif // UNUSED

#ifndef COMPILER_BARRIER
#define COMPILER_BARRIER() asm volatile ("":::"memory")
#endif // COMPILER_BARRIER

namespace vesoft {

using GraphSpaceID = int32_t;
Expand Down
37 changes: 37 additions & 0 deletions common/concurrent/Barrier.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved
*
* This source code is licensed under Apache 2.0 License
* (found in the LICENSE.Apache file in the root directory)
*/
#include "concurrent/Barrier.h"


namespace vesoft {
namespace concurrent {

Barrier::Barrier(size_t counter, std::function<void()> completion) {
if (counter == 0) {
throw std::invalid_argument("Barrier counter can't be zero");
}
completion_ = std::move(completion);
counter_ = counter;
ages_ = counter_;
}

void Barrier::wait() {
std::unique_lock<std::mutex> guard(lock_);
if (--ages_ == 0) {
ages_ = counter_;
++generation_;
if (completion_ != nullptr) {
completion_();
}
cond_.notify_all();
} else {
auto current = generation_;
cond_.wait(guard, [=] () { return current != generation_; });
}
}

} // namespace concurrent
} // namespace vesoft
54 changes: 54 additions & 0 deletions common/concurrent/Barrier.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved
*
* This source code is licensed under Apache 2.0 License
* (found in the LICENSE.Apache file in the root directory)
*/
#ifndef COMMON_CONCURRENT_BARRIER_H_
#define COMMON_CONCURRENT_BARRIER_H_
#include <mutex>
#include <condition_variable>
#include <functional>
#include "common/cpp/helpers.h"

/**
* Like `Latch', `Barrier' is a synchronization object, except that
* `Barrier' is reusable.
* Besides, `Barrier' features with an optional callable object,
* which would be invoked at the completion phase, i.e. synchronization point,
* by the last participating thread who entered `wait', before waking up other blocking threads.
*/

namespace vesoft {
namespace concurrent {

class Barrier final : public vesoft::cpp::NonCopyable, public vesoft::cpp::NonMovable {
public:
/**
* @counter number of participating threads
* @completion callback invoked at the completion phase
*/
explicit Barrier(size_t counter, std::function<void()> completion = nullptr);
~Barrier() = default;
/**
* Decrements the internal counter.
* If the counter reaches zero, the completion callback would be invoked if present,
* then all preceding blocked threads would be woken up, with the internal counter
* reset to the original value.
* Otherwise, the calling thread would be blocked to wait for
* other participants' arrival.
*/
void wait();

private:
std::function<void()> completion_{nullptr};
size_t counter_{0};
size_t ages_{0};
size_t generation_{0};
std::mutex lock_;
std::condition_variable cond_;
};

} // namespace concurrent
} // namespace vesoft

#endif // COMMON_CONCURRENT_BARRIER_H_
6 changes: 6 additions & 0 deletions common/concurrent/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
add_library(
concurrent_obj OBJECT
Barrier.cpp
Latch.cpp
)
add_subdirectory(test)
50 changes: 50 additions & 0 deletions common/concurrent/Latch.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved
*
* This source code is licensed under Apache 2.0 License
* (found in the LICENSE.Apache file in the root directory)
*/
#include "concurrent/Latch.h"

namespace vesoft {
namespace concurrent {

Latch::Latch(size_t counter) {
if (counter == 0) {
throw std::invalid_argument("Zero Latch counter");
}
counter_ = counter;
}

void Latch::down() {
std::unique_lock<std::mutex> unique(lock_);
if (counter_ == 0) {
throw std::runtime_error("Count down on zero Latch");
}
if (--counter_ == 0) {
cond_.notify_all();
}
}

void Latch::downWait() {
std::unique_lock<std::mutex> unique(lock_);
if (counter_ == 0) {
throw std::runtime_error("Count down on zero Latch");
}
if (--counter_ == 0) {
cond_.notify_all();
return;
}
cond_.wait(unique, [this] () { return counter_ == 0; });
}

void Latch::wait() {
std::unique_lock<std::mutex> unique(lock_);
cond_.wait(unique, [this] () { return counter_ == 0; });
}

bool Latch::isReady() {
return counter_ == 0;
}

} // namespace concurrent
} // namespace vesoft
64 changes: 64 additions & 0 deletions common/concurrent/Latch.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved
*
* This source code is licensed under Apache 2.0 License
* (found in the LICENSE.Apache file in the root directory)
*/
#ifndef COMMON_CONCURRENT_LATCH_H_
#define COMMON_CONCURRENT_LATCH_H_
#include <mutex>
#include <condition_variable>
#include "common/cpp/helpers.h"
/**
* Latch is an one-shot synchronization object.
* It provides an synchronization point for multiple threads.
* See shared/concurrent/test/LatchTest.cpp for use scenarios.
*/

namespace vesoft {
namespace concurrent {

class Latch final : public vesoft::cpp::NonCopyable, public vesoft::cpp::NonMovable {
public:
/**
* @counter: initial counter,
* typically number of participating threads.
* Throws `std::invalid_argument' if counter is zero.
*/
explicit Latch(size_t counter);
~Latch() = default;
/**
* Decrements the internal counter by one.
* If the counter reaches 0, all blocking(in `wait')
* threads will be given the green light.
* Throws `std::runtime_error' if counter already zeroed.
*/
void down();
/**
* Decrements the internal counter by one.
* If the counter reaches 0, returns immediately,
* and all blocking threads will be woken up.
* Otherwise, the calling thread blocks until being woken up.
* Throws `std::runtime_error' if counter already zeroed.
*/
void downWait();
/**
* Blocks if internal counter not zero.
* Otherwise, returns immediately.
*/
void wait();
/**
* Returns true if internal counter already zeroed.
* Otherwise, returns false.
*/
bool isReady();

private:
volatile size_t counter_{0};
std::mutex lock_;
std::condition_variable cond_;
};

} // namespace concurrent
} // namespace vesoft

#endif // COMMON_CONCURRENT_LATCH_H_
96 changes: 96 additions & 0 deletions common/concurrent/test/BarrierTest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/* Copyright (c) 2018 - present, VE Software Inc. All rights reserved
*
* This source code is licensed under Apache 2.0 License
* (found in the LICENSE.Apache file in the root directory)
*/
#include <gtest/gtest.h>
#include <cstdlib>
#include <thread>
#include <mutex>
#include <atomic>
#include "common/concurrent/Barrier.h"
#include "common/thread/GenericThreadPool.h"

namespace vesoft {
namespace concurrent {

TEST(BarrierTest, BasicTest) {
// test for invalid initial counter
{
ASSERT_THROW({Barrier barrier(0UL);}, std::invalid_argument);
}
// test for single-thread normal case
{
Barrier barrier(1UL);
barrier.wait();
ASSERT_TRUE(true);
}
// test for multiple-thread normal case
{
Barrier barrier(2UL);
std::atomic<size_t> counter{0};
auto cb = [&] () {
barrier.wait();
++counter;
};
std::thread thread(cb);
usleep(1000);
ASSERT_EQ(0UL, counter.load());
barrier.wait();
thread.join();
ASSERT_EQ(1UL, counter.load());
}
// test for multiple-thread completion
{
std::atomic<size_t> counter{0};
auto completion = [&] () {
++counter;
++counter;
};
Barrier barrier(2UL, completion);

auto cb = [&] () {
barrier.wait();
++counter;
};

std::thread thread(cb);
usleep(1000);
ASSERT_EQ(0UL, counter.load());
barrier.wait();
ASSERT_GE(counter.load(), 2UL);
thread.join();
ASSERT_EQ(3UL, counter.load());
}
}

TEST(BarrierTest, ConsecutiveTest) {
std::atomic<size_t> counter{0};
constexpr auto N = 64UL;
constexpr auto iters = 100UL;
auto completion = [&] () {
// At the completion phase, `counter' should be multiple to `N'.
ASSERT_EQ(0UL, counter.load() % N);
};

Barrier barrier(N, completion);
auto cb = [&] () {
auto i = iters;
while (i-- != 0) {
++counter;
barrier.wait();
}
};

std::vector<std::thread> threads;
for (auto i = 0UL; i < N; i++) {
threads.emplace_back(cb);
}
for (auto &thread : threads) {
thread.join();
}
ASSERT_EQ(0UL, counter.load() % N);
}

} // namespace concurrent
} // namespace vesoft
10 changes: 10 additions & 0 deletions common/concurrent/test/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
add_executable(
concurrent_test
BarrierTest.cpp
LatchTest.cpp
$<TARGET_OBJECTS:concurrent_obj>
$<TARGET_OBJECTS:thread_obj>
$<TARGET_OBJECTS:time_obj>
)
target_link_libraries(concurrent_test ev gtest gtest_main pthread)
add_test(NAME concurrent_test COMMAND concurrent_test)
Loading