-
Notifications
You must be signed in to change notification settings - Fork 411
/
BackgroundProcessingPool.h
147 lines (116 loc) · 4.6 KB
/
BackgroundProcessingPool.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <Core/Types.h>
#include <Poco/Event.h>
#include <Poco/Timestamp.h>
#include <Storages/KVStore/FFI/JointThreadAllocInfo.h>
#include <absl/synchronization/blocking_counter.h>
#include <atomic>
#include <condition_variable>
#include <functional>
#include <list>
#include <map>
#include <mutex>
#include <pcg_random.hpp>
#include <set>
#include <shared_mutex>
#include <thread>
namespace DB
{
/** Using a fixed number of threads, perform an arbitrary number of tasks in an infinite loop.
* In this case, one task can run simultaneously from different threads.
* Designed for tasks that perform continuous background work (for example, merge).
* `Task` is a function that returns a bool - did it do any work.
* If not, then the next time will be done later.
*/
class BackgroundProcessingPool
{
public:
/// Returns true, if some useful work was done. In that case, thread will not sleep before next run of this task.
/// Returns false, the next time will be done later.
using Task = std::function<bool()>;
class TaskInfo
{
public:
/// Wake up any thread.
void wake();
TaskInfo(
BackgroundProcessingPool & pool_,
const Task & function_,
const bool multi_,
const uint64_t interval_ms_)
: pool(pool_)
, function(function_)
, multi(multi_)
, interval_milliseconds(interval_ms_)
{}
private:
friend class BackgroundProcessingPool;
BackgroundProcessingPool & pool;
Task function;
/// Read lock is hold when task is executed.
std::shared_mutex rwlock;
std::atomic<bool> removed{false};
// multi=true, can be run by multiple threads concurrently
// multi=false, only run on one thread
const bool multi;
// The number of worker threads is going to execute this task
size_t concurrent_executors = 0;
// User defined execution interval
const uint64_t interval_milliseconds;
std::multimap<Poco::Timestamp, std::shared_ptr<TaskInfo>>::iterator iterator;
};
using TaskHandle = std::shared_ptr<TaskInfo>;
explicit BackgroundProcessingPool(
int size_,
std::string thread_prefix_,
JointThreadInfoJeallocMapPtr joint_memory_allocation_map_);
size_t getNumberOfThreads() const { return size; }
/// task
/// - A function return bool.
/// - Returning true mean some useful work was done. In that case, thread will not sleep before next run of this task.
/// - Returning false, the next time will be done later.
/// multi
/// - If multi == false, this task can only be executed by one thread within each scheduled time.
/// interval_ms
/// - If interval_ms is zero, this task will be scheduled with `sleep_seconds`.
/// - If interval_ms is not zero, this task will be scheduled with `interval_ms`.
TaskHandle addTask(const Task & task, bool multi = true, size_t interval_ms = 0);
void removeTask(const TaskHandle & task);
~BackgroundProcessingPool();
std::vector<pid_t> getThreadIds();
void addThreadId(pid_t tid);
private:
void threadFunction(size_t thread_idx) noexcept;
TaskHandle tryPopTask(pcg64 & rng) noexcept;
private:
using Tasks = std::multimap<Poco::Timestamp, TaskHandle>; /// key is desired next time to execute (priority).
using Threads = std::vector<std::thread>;
const size_t size;
const std::string thread_prefix;
static constexpr double sleep_seconds = 10;
static constexpr double sleep_seconds_random_part = 1.0;
Tasks tasks; /// Ordered in priority.
std::mutex tasks_mutex;
Threads threads;
std::vector<pid_t> thread_ids; // Linux Thread ID
std::mutex thread_ids_mtx;
absl::BlockingCounter thread_ids_counter;
std::atomic<bool> shutdown{false};
std::condition_variable wake_event;
JointThreadInfoJeallocMapPtr joint_memory_allocation_map;
};
using BackgroundProcessingPoolPtr = std::shared_ptr<BackgroundProcessingPool>;
} // namespace DB