Skip to content

Commit

Permalink
Rate limiter for parallel prehandling (#8184)
Browse files Browse the repository at this point in the history
close #8081
CalvinNeo authored Oct 19, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent d7badaa commit b77eca9
Showing 14 changed files with 278 additions and 28 deletions.
1 change: 1 addition & 0 deletions dbms/src/Common/CurrentMetrics.cpp
Original file line number Diff line number Diff line change
@@ -48,6 +48,7 @@
M(DT_DeltaIndexCacheSize) \
M(RaftNumSnapshotsPendingApply) \
M(RaftNumPrehandlingSubTasks) \
M(RaftNumParallelPrehandlingTasks) \
M(RateLimiterPendingWriteRequest) \
M(DT_SegmentReadTasks) \
M(DT_SnapshotOfRead) \
1 change: 1 addition & 0 deletions dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
@@ -127,6 +127,7 @@ namespace DB
M(pause_when_altering_dt_store) \
M(pause_after_copr_streams_acquired) \
M(pause_query_init) \
M(pause_before_prehandle_subtask) \
M(pause_before_wn_establish_task) \
M(pause_passive_flush_before_persist_region)

3 changes: 3 additions & 0 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
@@ -379,6 +379,9 @@ namespace DB
F(type_ingest_sst_upload, {{"type", "ingest_sst_upload"}}, ExpBuckets{0.05, 2, 10}), \
F(type_apply_snapshot_predecode, {{"type", "snapshot_predecode"}}, ExpBuckets{0.05, 2, 15}), \
F(type_apply_snapshot_predecode_sst2dt, {{"type", "snapshot_predecode_sst2dt"}}, ExpBuckets{0.05, 2, 15}), \
F(type_apply_snapshot_predecode_parallel_wait, \
{{"type", "snapshot_predecode_parallel_wait"}}, \
ExpBuckets{0.1, 2, 10}), \
F(type_apply_snapshot_predecode_upload, {{"type", "snapshot_predecode_upload"}}, ExpBuckets{0.05, 2, 10}), \
F(type_apply_snapshot_flush, {{"type", "snapshot_flush"}}, ExpBuckets{0.05, 2, 10})) \
M(tiflash_raft_process_keys, \
7 changes: 5 additions & 2 deletions dbms/src/Debug/MockRaftStoreProxy.cpp
Original file line number Diff line number Diff line change
@@ -180,9 +180,10 @@ static RaftstoreVer fn_get_cluster_raftstore_version(RaftStoreProxyPtr ptr, uint
}

// Must call `RustGcHelper` to gc the returned pointer in the end.
static RustStrWithView fn_get_config_json(RaftStoreProxyPtr, ConfigJsonType)
static RustStrWithView fn_get_config_json(RaftStoreProxyPtr ptr, ConfigJsonType)
{
auto * s = new std::string(R"({"raftstore":{"snap-handle-pool-size":4}})");
auto & x = as_ref(ptr);
auto * s = new std::string(x.proxy_config_string);
GCMonitor::instance().add(RawObjType::MockString, 1);
return RustStrWithView{
.buff = cppStringAsBuff(*s),
@@ -916,6 +917,7 @@ void MockRaftStoreProxy::Cf::finish_file(SSTFormatKind kind)
kv_list.emplace_back(kv.first, kv.second);
}
std::sort(kv_list.begin(), kv_list.end(), [](const auto & a, const auto & b) { return a.first < b.first; });
std::scoped_lock<std::mutex> lock(MockSSTReader::mut);
auto & mmp = MockSSTReader::getMockSSTData();
mmp[MockSSTReader::Key{region_id_str, type}] = std::move(kv_list);
kvs.clear();
@@ -943,6 +945,7 @@ MockRaftStoreProxy::Cf::Cf(UInt64 region_id_, TableID table_id_, ColumnFamilyTyp
, c(0)
, freezed(false)
{
std::scoped_lock<std::mutex> lock(MockSSTReader::mut);
auto & mmp = MockSSTReader::getMockSSTData();
auto region_id_str = std::to_string(region_id) + "_multi_" + std::to_string(c);
mmp[MockSSTReader::Key{region_id_str, type}].clear();
2 changes: 2 additions & 0 deletions dbms/src/Debug/MockRaftStoreProxy.h
Original file line number Diff line number Diff line change
@@ -293,6 +293,7 @@ struct MockRaftStoreProxy : MutexLockWrap
log = Logger::get("MockRaftStoreProxy");
table_id = 1;
cluster_ver = RaftstoreVer::V1;
proxy_config_string = R"({"raftstore":{"snap-handle-pool-size":4}})";
}

// Mock Proxy will drop read index requests to these regions
@@ -304,6 +305,7 @@ struct MockRaftStoreProxy : MutexLockWrap
AsyncWaker::Notifier notifier;
TableID table_id;
RaftstoreVer cluster_ver;
std::string proxy_config_string;
LoggerPtr log;
};

1 change: 1 addition & 0 deletions dbms/src/Debug/MockSSTReader.cpp
Original file line number Diff line number Diff line change
@@ -24,6 +24,7 @@ namespace DB
SSTReaderPtr fn_get_sst_reader(SSTView v, RaftStoreProxyPtr)
{
std::string s(v.path.data, v.path.len);
std::scoped_lock<std::mutex> lock(MockSSTReader::mut);
auto iter = MockSSTReader::getMockSSTData().find({s, v.type});
if (iter == MockSSTReader::getMockSSTData().end())
throw Exception("Can not find data in MockSSTData, [key=" + s + "] [type=" + CFToName(v.type) + "]");
7 changes: 7 additions & 0 deletions dbms/src/Debug/MockSSTReader.h
Original file line number Diff line number Diff line change
@@ -19,6 +19,7 @@
#include <Storages/KVStore/FFI/SSTReader.h>

#include <map>
#include <mutex>


namespace DB
@@ -116,6 +117,9 @@ struct MockSSTReader

static std::map<Key, MockSSTReader::Data> & getMockSSTData() { return MockSSTData; }

public:
static std::mutex mut;

private:
Data::const_iterator iter;
Data::const_iterator begin;
@@ -129,5 +133,8 @@ struct MockSSTReader
static std::map<Key, MockSSTReader::Data> MockSSTData;
};

inline std::map<MockSSTReader::Key, MockSSTReader::Data> MockSSTReader::MockSSTData;
inline std::mutex MockSSTReader::mut;

SSTReaderInterfaces make_mock_sst_reader_interface();
} // namespace DB
2 changes: 0 additions & 2 deletions dbms/src/Debug/dbgFuncMockRaftSnapshot.cpp
Original file line number Diff line number Diff line change
@@ -263,8 +263,6 @@ void MockRaftCommand::dbgFuncRegionSnapshot(Context & context, const ASTs & args
table_id));
}

std::map<MockSSTReader::Key, MockSSTReader::Data> MockSSTReader::MockSSTData;

class RegionMockTest final
{
public:
5 changes: 5 additions & 0 deletions dbms/src/Storages/KVStore/KVStore.cpp
Original file line number Diff line number Diff line change
@@ -108,6 +108,11 @@ void KVStore::restore(PathPool & path_pool, const TiFlashRaftProxyHelper * proxy
}
}

fetchProxyConfig(proxy_helper);
}

void KVStore::fetchProxyConfig(const TiFlashRaftProxyHelper * proxy_helper)
{
// Try fetch proxy's config as a json string
if (proxy_helper && proxy_helper->fn_get_config_json)
{
3 changes: 3 additions & 0 deletions dbms/src/Storages/KVStore/KVStore.h
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@
#include <Storages/KVStore/MultiRaft/RegionRangeKeys.h>
#include <Storages/KVStore/StorageEngineType.h>

#include <condition_variable>
#include <magic_enum.hpp>

namespace TiDB
@@ -249,6 +250,7 @@ class KVStore final : private boost::noncopyable
RaftLogEagerGcTasks::Hints getRaftLogGcHints();
void applyRaftLogGcTaskRes(const RaftLogGcTasksRes & res) const;
const ProxyConfigSummary & getProxyConfigSummay() const { return proxy_config_summary; }
size_t getMaxParallelPrehandleSize() const;

#ifndef DBMS_PUBLIC_GTEST
private:
@@ -361,6 +363,7 @@ class KVStore final : private boost::noncopyable

void releaseReadIndexWorkers();
void handleDestroy(UInt64 region_id, TMTContext & tmt, const KVStoreTaskLock &);
void fetchProxyConfig(const TiFlashRaftProxyHelper * proxy_helper);

#ifndef DBMS_PUBLIC_GTEST
private:
21 changes: 21 additions & 0 deletions dbms/src/Storages/KVStore/MultiRaft/PreHandlingTrace.h
Original file line number Diff line number Diff line change
@@ -31,8 +31,16 @@ struct PreHandlingTrace : MutexLockWrap
{}
std::atomic_bool abort_flag;
};

std::unordered_map<uint64_t, std::shared_ptr<Item>> tasks;
std::atomic<uint64_t> ongoing_prehandle_subtask_count{0};
std::mutex cpu_resource_mut;
std::condition_variable cpu_resource_cv;
LoggerPtr log;

PreHandlingTrace()
: log(Logger::get("PreHandlingTrace"))
{}
std::shared_ptr<Item> registerTask(uint64_t region_id)
{
// Automaticlly override the old one.
@@ -61,5 +69,18 @@ struct PreHandlingTrace : MutexLockWrap
auto _ = genLockGuard();
return tasks.find(region_id) != tasks.end();
}
void waitForSubtaskResources(uint64_t region_id, size_t parallel, size_t parallel_subtask_limit);
void releaseSubtaskResources(uint64_t region_id, size_t split_id)
{
std::unique_lock<std::mutex> cpu_resource_lock(cpu_resource_mut);
// TODO(split) refine this to avoid notify_all
auto prev = ongoing_prehandle_subtask_count.fetch_sub(1);
RUNTIME_CHECK_MSG(
prev > 0,
"Try to decrease prehandle subtask count to below 0, region_id={}, split_id={}",
region_id,
split_id);
cpu_resource_cv.notify_all();
}
};
} // namespace DB
Loading

0 comments on commit b77eca9

Please sign in to comment.