Skip to content

Commit

Permalink
Merge branch 'r2c/phase1' of github.com:CalvinNeo/tics into r2c/phase1
Browse files Browse the repository at this point in the history
  • Loading branch information
CalvinNeo committed May 30, 2023
2 parents 944c0f1 + 5ba05df commit dbd9d0e
Show file tree
Hide file tree
Showing 85 changed files with 2,374 additions and 579 deletions.
70 changes: 61 additions & 9 deletions dbms/src/Common/LooseBoundedMPMCQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,22 @@ template <typename T>
class LooseBoundedMPMCQueue
{
public:
using ElementAuxiliaryMemoryUsageFunc = std::function<Int64(const T & element)>;

explicit LooseBoundedMPMCQueue(size_t capacity_)
: capacity(std::max(1, capacity_))
, max_auxiliary_memory_usage(std::numeric_limits<Int64>::max())
, get_auxiliary_memory_usage([](const T &) { return 0; })
{}
LooseBoundedMPMCQueue(size_t capacity_, Int64 max_auxiliary_memory_usage_, ElementAuxiliaryMemoryUsageFunc && get_auxiliary_memory_usage_)
: capacity(std::max(1, capacity_))
, max_auxiliary_memory_usage(max_auxiliary_memory_usage_ <= 0 ? std::numeric_limits<Int64>::max() : max_auxiliary_memory_usage_)
, get_auxiliary_memory_usage(
max_auxiliary_memory_usage == std::numeric_limits<Int64>::max()
? [](const T &) {
return 0;
}
: std::move(get_auxiliary_memory_usage_))
{}

/// blocking function.
Expand All @@ -41,9 +55,9 @@ class LooseBoundedMPMCQueue
MPMCQueueResult push(U && data)
{
std::unique_lock lock(mu);
writer_head.wait(lock, [&] { return queue.size() < capacity || (unlikely(status != MPMCQueueStatus::NORMAL)); });
writer_head.wait(lock, [&] { return !isFullWithoutLock() || (unlikely(status != MPMCQueueStatus::NORMAL)); });

if ((likely(status == MPMCQueueStatus::NORMAL)) && queue.size() < capacity)
if ((likely(status == MPMCQueueStatus::NORMAL)) && !isFullWithoutLock())
{
pushFront(std::forward<U>(data));
return MPMCQueueResult::OK;
Expand Down Expand Up @@ -71,7 +85,7 @@ class LooseBoundedMPMCQueue
if unlikely (status == MPMCQueueStatus::FINISHED)
return MPMCQueueResult::FINISHED;

if (queue.size() >= capacity)
if (isFullWithoutLock())
return MPMCQueueResult::FULL;

pushFront(std::forward<U>(data));
Expand Down Expand Up @@ -142,7 +156,7 @@ class LooseBoundedMPMCQueue
bool isFull() const
{
std::lock_guard lock(mu);
return queue.size() >= capacity;
return isFullWithoutLock();
}

MPMCQueueStatus getStatus() const
Expand Down Expand Up @@ -185,6 +199,12 @@ class LooseBoundedMPMCQueue
}

private:
bool isFullWithoutLock() const
{
assert(current_auxiliary_memory_usage >= 0);
return queue.size() >= capacity || current_auxiliary_memory_usage >= max_auxiliary_memory_usage;
}

template <typename FF>
ALWAYS_INLINE bool changeStatus(FF && ff)
{
Expand All @@ -201,24 +221,56 @@ class LooseBoundedMPMCQueue

ALWAYS_INLINE T popBack()
{
auto data = std::move(queue.back());
auto element = std::move(queue.back());
queue.pop_back();
current_auxiliary_memory_usage -= element.memory_usage;
assert(!queue.empty() || current_auxiliary_memory_usage == 0);
writer_head.notifyNext();
return data;
return element.data;
}

template <typename U>
ALWAYS_INLINE void pushFront(U && data)
{
queue.emplace_front(std::forward<U>(data));
Int64 memory_usage = get_auxiliary_memory_usage(data);
queue.emplace_front(std::forward<U>(data), memory_usage);
current_auxiliary_memory_usage += memory_usage;
reader_head.notifyNext();
/// consider a case that the queue capacity is 2, the max_auxiliary_memory_usage is 100,
/// T1: a writer write an object with size 100
/// T2: two writers(w2, w3) try to write, but all blocked because of the max_auxiliary_memory_usage
/// T3: a reader reads the object, and it will notify one of the waiting writers
/// T4: assuming w2 is notified, then it writes an object of size 50, and there is no reader at that time
/// then the queue's size is 1 and current_auxiliary_memory_usage is 50, which means the
/// queue is not full, but w3 is still blocked, the queue's status is not changed until
/// 1. there is another reader
/// 2. there is another writer
/// if we notify the writer if the queue is not full here, w3 can write immediately
if (max_auxiliary_memory_usage != std::numeric_limits<Int64>::max() && !isFullWithoutLock())
writer_head.notifyNext();
}

private:
mutable std::mutex mu;

std::deque<T> queue;
struct DataWithMemoryUsage
{
T data;
Int64 memory_usage;
DataWithMemoryUsage(T && data_, Int64 memory_usage_)
: data(std::move(data_))
, memory_usage(memory_usage_)
{}
DataWithMemoryUsage(T & data_, Int64 memory_usage_)
: data(data_)
, memory_usage(memory_usage_)
{}
};

std::deque<DataWithMemoryUsage> queue;
size_t capacity;
const Int64 max_auxiliary_memory_usage;
const ElementAuxiliaryMemoryUsageFunc get_auxiliary_memory_usage;
Int64 current_auxiliary_memory_usage = 0;

MPMCQueueDetail::WaitingNode reader_head;
MPMCQueueDetail::WaitingNode writer_head;
Expand Down
9 changes: 8 additions & 1 deletion dbms/src/Common/MPMCQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,12 @@ class MPMCQueue
MPMCQueue(size_t capacity_, Int64 max_auxiliary_memory_usage_, ElementAuxiliaryMemoryUsageFunc && get_auxiliary_memory_usage_)
: capacity(capacity_)
, max_auxiliary_memory_usage(max_auxiliary_memory_usage_ <= 0 ? std::numeric_limits<Int64>::max() : max_auxiliary_memory_usage_)
, get_auxiliary_memory_usage(std::move(get_auxiliary_memory_usage_))
, get_auxiliary_memory_usage(
max_auxiliary_memory_usage == std::numeric_limits<Int64>::max()
? [](const T &) {
return 0;
}
: std::move(get_auxiliary_memory_usage_))
, element_auxiliary_memory(capacity, 0)
, data(capacity * sizeof(T))
{
Expand Down Expand Up @@ -409,6 +414,8 @@ class MPMCQueue

/// See comments in `popObj`.
reader_head.notifyNext();
if (max_auxiliary_memory_usage != std::numeric_limits<Int64>::max() && current_auxiliary_memory_usage < max_auxiliary_memory_usage && write_pos - read_pos < capacity)
writer_head.notifyNext();
return Result::OK;
}
if constexpr (need_wait)
Expand Down
105 changes: 105 additions & 0 deletions dbms/src/Common/tests/gtest_loose_bounded_mpmc_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <TestUtils/TiFlashTestBasic.h>

#include <atomic>
#include <thread>

namespace DB::tests
{
Expand Down Expand Up @@ -205,5 +206,109 @@ try
}
CATCH

TEST_F(LooseBoundedMPMCQueueTest, AuxiliaryMemoryBound)
try
{
size_t max_size = 10;
Int64 auxiliary_memory_bound;
Int64 value;

{
/// case 1: no auxiliary memory usage bound
LooseBoundedMPMCQueue<Int64> queue(max_size);
for (size_t i = 0; i < max_size; i++)
ASSERT_TRUE(queue.tryPush(i) == MPMCQueueResult::OK);
ASSERT_TRUE(queue.tryPush(max_size) == MPMCQueueResult::FULL);
}

{
/// case 2: less auxiliary memory bound than the capacity bound
size_t actual_max_size = 5;
auxiliary_memory_bound = sizeof(Int64) * actual_max_size;
LooseBoundedMPMCQueue<Int64> queue(max_size, auxiliary_memory_bound, [](const Int64 &) { return sizeof(Int64); });
for (size_t i = 0; i < actual_max_size; i++)
ASSERT_TRUE(queue.tryPush(i) == MPMCQueueResult::OK);
ASSERT_TRUE(queue.tryPush(actual_max_size) == MPMCQueueResult::FULL);
/// after pop one element, the queue can be pushed again
ASSERT_TRUE(queue.tryPop(value) == MPMCQueueResult::OK);
ASSERT_TRUE(queue.tryPush(actual_max_size) == MPMCQueueResult::OK);
}

{
/// case 3: less capacity bound than the auxiliary memory bound
auxiliary_memory_bound = sizeof(Int64) * (max_size * 10);
LooseBoundedMPMCQueue<Int64> queue(max_size, auxiliary_memory_bound, [](const Int64 &) { return sizeof(Int64); });
for (size_t i = 0; i < max_size; i++)
ASSERT_TRUE(queue.tryPush(i) == MPMCQueueResult::OK);
ASSERT_TRUE(queue.tryPush(max_size) == MPMCQueueResult::FULL);
}

{
/// case 4, auxiliary memory bound <= 0 means unbounded for auxiliary memory usage
std::vector<Int64> bounds{0, -1};
for (const auto & bound : bounds)
{
LooseBoundedMPMCQueue<Int64> queue(max_size, bound, [](const Int64 &) { return 1024 * 1024; });
for (size_t i = 0; i < max_size; i++)
ASSERT_TRUE(queue.tryPush(i) == MPMCQueueResult::OK);
ASSERT_TRUE(queue.tryPush(max_size) == MPMCQueueResult::FULL);
}
}

{
/// case 5 even if the element's auxiliary memory is out of bound, at least one element can be pushed
LooseBoundedMPMCQueue<Int64> queue(max_size, 1, [](const Int64 &) { return 10; });
ASSERT_TRUE(queue.tryPush(1) == MPMCQueueResult::OK);
ASSERT_TRUE(queue.tryPush(2) == MPMCQueueResult::FULL);
ASSERT_TRUE(queue.tryPop(value) == MPMCQueueResult::OK);
ASSERT_TRUE(queue.tryPop(value) == MPMCQueueResult::EMPTY);
ASSERT_TRUE(queue.tryPush(1) == MPMCQueueResult::OK);
}

{
/// case 6 after pop a huge element, more than one small push can be notified without further pop
LooseBoundedMPMCQueue<Int64> queue(max_size, 20, [](const Int64 & element) { return std::abs(element); });
ASSERT_TRUE(queue.tryPush(100) == MPMCQueueResult::OK);
ASSERT_TRUE(queue.tryPush(5) == MPMCQueueResult::FULL);
auto thread_manager = newThreadManager();
thread_manager->schedule(false, "thread_1", [&]() {
queue.push(5);
});
thread_manager->schedule(false, "thread_2", [&]() {
queue.push(6);
});
std::exception_ptr current_exception = nullptr;
try
{
std::this_thread::sleep_for(std::chrono::seconds(2));
ASSERT_TRUE(queue.pop(value) == MPMCQueueResult::OK);
ASSERT_EQ(value, 100);
std::this_thread::sleep_for(std::chrono::seconds(5));
ASSERT_EQ(queue.size(), 2);
}
catch (...)
{
current_exception = std::current_exception();
queue.cancelWith("test failed");
}
thread_manager->wait();
if (current_exception)
std::rethrow_exception(current_exception);
}

{
/// case 7, force push does not limited by memory bound
LooseBoundedMPMCQueue<Int64> queue(max_size, sizeof(Int64) * max_size / 2, [](const Int64 &) { return sizeof(Int64); });
for (size_t i = 0; i < max_size / 2; i++)
ASSERT_TRUE(queue.tryPush(i) == MPMCQueueResult::OK);
for (size_t i = max_size / 2; i < max_size; i++)
{
ASSERT_TRUE(queue.tryPush(i) == MPMCQueueResult::FULL);
ASSERT_TRUE(queue.forcePush(i) == MPMCQueueResult::OK);
}
}
}
CATCH

} // namespace
} // namespace DB::tests
Loading

0 comments on commit dbd9d0e

Please sign in to comment.