Skip to content

Commit

Permalink
Fix the tasks in bkg pool may be called more frequently than expected (
Browse files Browse the repository at this point in the history
  • Loading branch information
JaySon-Huang authored Mar 2, 2023
1 parent f1ce754 commit ef611fa
Show file tree
Hide file tree
Showing 3 changed files with 182 additions and 100 deletions.
172 changes: 87 additions & 85 deletions dbms/src/Storages/BackgroundProcessingPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,58 @@ BackgroundProcessingPool::~BackgroundProcessingPool()
}
}

// Try to pop a task from the pool that is ready for execution.
// For task->multi == false, it ensure the task is only pop for one execution threads.
// For task->multi == true, it may pop the task multiple times.
// Return nullptr when the pool is shutting down.
BackgroundProcessingPool::TaskHandle BackgroundProcessingPool::tryPopTask(pcg64 & rng) noexcept
{
TaskHandle task;
Poco::Timestamp min_time;

std::unique_lock lock(tasks_mutex);

void BackgroundProcessingPool::threadFunction(size_t thread_idx)
while (!task && !shutdown)
{
for (const auto & [task_time, task_handle] : tasks)
{
// find the first coming task that no thread is running
// or can be run by multithreads
if (!task_handle->removed
&& (task_handle->concurrent_executors == 0 || task_handle->multi))
{
min_time = task_time;
task = task_handle;
// add the counter to indicate this task is running by one more thread
task->concurrent_executors += 1;
break;
}
}

if (!task)
{
/// No tasks ready for execution, wait for a while and check again
wake_event.wait_for(lock,
std::chrono::duration<double>(
sleep_seconds + std::uniform_real_distribution<double>(0, sleep_seconds_random_part)(rng)));
continue;
}

Poco::Timestamp current_time;
if (min_time > current_time)
{
// The coming task is not ready for execution yet, wait for a while
wake_event.wait_for(lock,
std::chrono::microseconds(
min_time - current_time + std::uniform_int_distribution<uint64_t>(0, sleep_seconds_random_part * 1000000)(rng)));
}
// here task != nullptr and is ready for execution
return task;
}
return task;
}

void BackgroundProcessingPool::threadFunction(size_t thread_idx) noexcept
{
{
const auto name = thread_prefix + std::to_string(thread_idx);
Expand All @@ -154,6 +204,7 @@ void BackgroundProcessingPool::threadFunction(size_t thread_idx)
addThreadId(getTid());
}

// set up the thread local memory tracker
auto memory_tracker = MemoryTracker::create();
memory_tracker->setNext(root_of_non_query_mem_trackers.get());
memory_tracker->setMetric(CurrentMetrics::MemoryTrackingInBackgroundProcessingPool);
Expand All @@ -164,113 +215,64 @@ void BackgroundProcessingPool::threadFunction(size_t thread_idx)

while (!shutdown)
{
TaskHandle task;
// The time to sleep before running next task, `sleep_seconds` by default.
Poco::Timespan next_sleep_time_span(sleep_seconds, 0);
TaskHandle task = tryPopTask(rng);
if (shutdown)
break;
// not shutting down but a null task pop, should not happen
if (task == nullptr)
{
LOG_ERROR(Logger::get(), "a null task has been pop!");
continue;
}

bool done_work = false;
try
{
Poco::Timestamp min_time;

{
std::unique_lock lock(tasks_mutex);

if (!tasks.empty())
{
for (const auto & time_handle : tasks)
{
if (!time_handle.second->removed)
{
min_time = time_handle.first;
task = time_handle.second;
break;
}
}
}
}

if (shutdown)
break;

if (!task)
{
std::unique_lock lock(tasks_mutex);
wake_event.wait_for(lock,
std::chrono::duration<double>(
sleep_seconds + std::uniform_real_distribution<double>(0, sleep_seconds_random_part)(rng)));
continue;
}

/// No tasks ready for execution.
Poco::Timestamp current_time;
if (min_time > current_time)
{
std::unique_lock lock(tasks_mutex);
wake_event.wait_for(lock,
std::chrono::microseconds(
min_time - current_time + std::uniform_int_distribution<uint64_t>(0, sleep_seconds_random_part * 1000000)(rng)));
}

std::shared_lock<std::shared_mutex> rlock(task->rwlock);

if (task->removed)
continue;

{
bool done_work = false;
if (!task->multi)
{
bool expected = false;
if (task->occupied == expected && task->occupied.compare_exchange_strong(expected, true))
{
done_work = task->function();
task->occupied = false;
}
else
done_work = false;
}
else
done_work = task->function();

/// If task has done work, it could be executed again immediately.
/// If not, add delay before next run.
if (done_work)
{
next_sleep_time_span = 0;
}
else if (task->interval_milliseconds != 0)
{
// Update `next_sleep_time_span` by user-defined interval if the later one is non-zero
next_sleep_time_span = Poco::Timespan(0, /*microseconds=*/task->interval_milliseconds * 1000);
}
// else `sleep_seconds` by default
}
done_work = task->function();
}
catch (...)
{
if (task && !task->multi)
{
std::unique_lock<std::shared_mutex> wlock(task->rwlock);
task->occupied = false;
}

tryLogCurrentException(__PRETTY_FUNCTION__);
}

if (shutdown)
break;

/// If task has done work, it could be executed again immediately.
/// If not, add delay before next run.
Poco::Timestamp next_time_to_execute = Poco::Timestamp() + next_sleep_time_span;
// Get the time to sleep before the task in the next run.
// - If task has done work, it could be executed again immediately.
// - If not, add delay before next run.
const auto next_sleep_time_span = [](bool done_work, const TaskHandle & t) {
if (done_work)
{
return Poco::Timespan(0, 0);
}
else if (t->interval_milliseconds != 0)
{
// Update `next_sleep_time_span` by user-defined interval if the later one is non-zero
return Poco::Timespan(0, /*microseconds=*/t->interval_milliseconds * 1000);
}
else
{
// else `sleep_seconds` by default
return Poco::Timespan(sleep_seconds, 0);
}
}(done_work, task);

{
std::unique_lock lock(tasks_mutex);

// the task has been done in this thread
task->concurrent_executors -= 1;

if (task->removed)
continue;

// reschedule this task
tasks.erase(task->iterator);
Poco::Timestamp next_time_to_execute = Poco::Timestamp() + next_sleep_time_span;
task->iterator = tasks.emplace(next_time_to_execute, task);
}
}
Expand Down
36 changes: 21 additions & 15 deletions dbms/src/Storages/BackgroundProcessingPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <list>
#include <map>
#include <mutex>
#include <pcg_random.hpp>
#include <set>
#include <shared_mutex>
#include <thread>
Expand All @@ -43,6 +44,7 @@ class BackgroundProcessingPool
{
public:
/// Returns true, if some useful work was done. In that case, thread will not sleep before next run of this task.
/// Returns false, the next time will be done later.
using Task = std::function<bool()>;


Expand All @@ -69,10 +71,13 @@ class BackgroundProcessingPool
std::shared_mutex rwlock;
std::atomic<bool> removed{false};

/// only can be invoked by one thread at same time.
// multi=true, can be run by multiple threads concurrently
// multi=false, only run on one thread
const bool multi;
std::atomic_bool occupied{false};
// The number of worker threads is going to execute this task
size_t concurrent_executors = 0;

// User defined execution interval
const uint64_t interval_milliseconds;

std::multimap<Poco::Timestamp, std::shared_ptr<TaskInfo>>::iterator iterator;
Expand All @@ -85,17 +90,15 @@ class BackgroundProcessingPool

size_t getNumberOfThreads() const { return size; }

/// if multi == false, this task can only be called by one thread at same time.
/// If interval_ms is zero, this task will be scheduled with `sleep_seconds`.
/// If interval_ms is not zero, this task will be scheduled with `interval_ms`.
///
/// But at each scheduled time, there may be multiple threads try to run the same task,
/// and then execute the same task one by one in sequential order(not simultaneously) even if `multi` is false.
/// For example, consider the following case when it's time to schedule a task,
/// 1. thread A get the task, mark the task as occupied and begin to execute it
/// 2. thread B also get the same task
/// 3. thread A finish the execution of the task quickly, release the task and try to update the next schedule time of the task
/// 4. thread B find the task is not occupied and execute the task again almost immediately
/// task
/// - A function return bool.
/// - Returning true mean some useful work was done. In that case, thread will not sleep before next run of this task.
/// - Returning false, the next time will be done later.
/// multi
/// - If multi == false, this task can only be executed by one thread within each scheduled time.
/// interval_ms
/// - If interval_ms is zero, this task will be scheduled with `sleep_seconds`.
/// - If interval_ms is not zero, this task will be scheduled with `interval_ms`.
TaskHandle addTask(const Task & task, bool multi = true, size_t interval_ms = 0);
void removeTask(const TaskHandle & task);

Expand All @@ -104,6 +107,11 @@ class BackgroundProcessingPool
std::vector<pid_t> getThreadIds();
void addThreadId(pid_t tid);

private:
void threadFunction(size_t thread_idx) noexcept;

TaskHandle tryPopTask(pcg64 & rng) noexcept;

private:
using Tasks = std::multimap<Poco::Timestamp, TaskHandle>; /// key is desired next time to execute (priority).
using Threads = std::vector<std::thread>;
Expand All @@ -123,8 +131,6 @@ class BackgroundProcessingPool

std::atomic<bool> shutdown{false};
std::condition_variable wake_event;

void threadFunction(size_t thread_idx);
};

using BackgroundProcessingPoolPtr = std::shared_ptr<BackgroundProcessingPool>;
Expand Down
74 changes: 74 additions & 0 deletions dbms/src/Storages/tests/gtest_bkg_pool.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/Logger.h>
#include <Storages/BackgroundProcessingPool.h>
#include <common/logger_useful.h>
#include <gtest/gtest.h>

#include <chrono>
#include <limits>
#include <thread>

namespace DB::tests
{

TEST(BackgroundProcessingPoolTest, FixedInterval)
{
BackgroundProcessingPool pool(10, "test");

using Clock = std::chrono::system_clock;
using TimePoint = std::chrono::time_point<Clock>;


using namespace std::chrono_literals;
const auto sleep_seconds = 10s;
const Int64 expect_interval_ms = 2 * 1000;
const auto num_expect_called = 5;

Int64 num_actual_called = 0;
TimePoint last_update_timepoint = Clock::now();
Int64 min_diff_ms = std::numeric_limits<Int64>::max();
Int64 max_diff_ms = 0;
auto task = pool.addTask(
[&]() {
num_actual_called += 1;
if (num_actual_called != 1)
{
auto diff_ms = std::chrono::duration_cast<std::chrono::milliseconds>(Clock::now() - last_update_timepoint).count();
if (diff_ms < expect_interval_ms / 2)
{
LOG_ERROR(Logger::get(), "Unexpected frequent call, actual interval={}ms", diff_ms);
}
min_diff_ms = std::min(min_diff_ms, diff_ms);
max_diff_ms = std::max(max_diff_ms, diff_ms);
}

last_update_timepoint = Clock::now();
return false; // expected to be run n a fixed interval
},
/*multi*/ false,
expect_interval_ms);

std::this_thread::sleep_for(sleep_seconds);

pool.removeTask(task);

LOG_INFO(Logger::get(), "actual being called for {} times, min_diff={} max_diff={}", num_actual_called, min_diff_ms, max_diff_ms);
ASSERT_TRUE(num_expect_called - 1 <= num_actual_called
&& num_actual_called <= num_expect_called + 1)
<< fmt::format("actual_called={} min_diff_ms={}", num_actual_called, min_diff_ms);
}

} // namespace DB::tests

0 comments on commit ef611fa

Please sign in to comment.