From b77eca90df671d830b5a3799c264641e2c1533da Mon Sep 17 00:00:00 2001 From: Calvin Neo Date: Thu, 19 Oct 2023 12:27:29 +0800 Subject: [PATCH] Rate limiter for parallel prehandling (#8184) close pingcap/tiflash#8081 --- dbms/src/Common/CurrentMetrics.cpp | 1 + dbms/src/Common/FailPoint.cpp | 1 + dbms/src/Common/TiFlashMetrics.h | 3 + dbms/src/Debug/MockRaftStoreProxy.cpp | 7 +- dbms/src/Debug/MockRaftStoreProxy.h | 2 + dbms/src/Debug/MockSSTReader.cpp | 1 + dbms/src/Debug/MockSSTReader.h | 7 + dbms/src/Debug/dbgFuncMockRaftSnapshot.cpp | 2 - dbms/src/Storages/KVStore/KVStore.cpp | 5 + dbms/src/Storages/KVStore/KVStore.h | 3 + .../KVStore/MultiRaft/PreHandlingTrace.h | 21 +++ .../KVStore/MultiRaft/PrehandleSnapshot.cpp | 138 +++++++++++++++--- .../KVStore/tests/gtest_raftstore_v2.cpp | 107 ++++++++++++++ metrics/grafana/tiflash_summary.json | 8 + 14 files changed, 278 insertions(+), 28 deletions(-) diff --git a/dbms/src/Common/CurrentMetrics.cpp b/dbms/src/Common/CurrentMetrics.cpp index c89497b8173..605e119f8a8 100644 --- a/dbms/src/Common/CurrentMetrics.cpp +++ b/dbms/src/Common/CurrentMetrics.cpp @@ -48,6 +48,7 @@ M(DT_DeltaIndexCacheSize) \ M(RaftNumSnapshotsPendingApply) \ M(RaftNumPrehandlingSubTasks) \ + M(RaftNumParallelPrehandlingTasks) \ M(RateLimiterPendingWriteRequest) \ M(DT_SegmentReadTasks) \ M(DT_SnapshotOfRead) \ diff --git a/dbms/src/Common/FailPoint.cpp b/dbms/src/Common/FailPoint.cpp index edb24ec3d57..7cbb5cd78db 100644 --- a/dbms/src/Common/FailPoint.cpp +++ b/dbms/src/Common/FailPoint.cpp @@ -127,6 +127,7 @@ namespace DB M(pause_when_altering_dt_store) \ M(pause_after_copr_streams_acquired) \ M(pause_query_init) \ + M(pause_before_prehandle_subtask) \ M(pause_before_wn_establish_task) \ M(pause_passive_flush_before_persist_region) diff --git a/dbms/src/Common/TiFlashMetrics.h b/dbms/src/Common/TiFlashMetrics.h index 20f06f950d9..c4eb2a94f9e 100644 --- a/dbms/src/Common/TiFlashMetrics.h +++ b/dbms/src/Common/TiFlashMetrics.h @@ -379,6 +379,9 @@ namespace DB F(type_ingest_sst_upload, {{"type", "ingest_sst_upload"}}, ExpBuckets{0.05, 2, 10}), \ F(type_apply_snapshot_predecode, {{"type", "snapshot_predecode"}}, ExpBuckets{0.05, 2, 15}), \ F(type_apply_snapshot_predecode_sst2dt, {{"type", "snapshot_predecode_sst2dt"}}, ExpBuckets{0.05, 2, 15}), \ + F(type_apply_snapshot_predecode_parallel_wait, \ + {{"type", "snapshot_predecode_parallel_wait"}}, \ + ExpBuckets{0.1, 2, 10}), \ F(type_apply_snapshot_predecode_upload, {{"type", "snapshot_predecode_upload"}}, ExpBuckets{0.05, 2, 10}), \ F(type_apply_snapshot_flush, {{"type", "snapshot_flush"}}, ExpBuckets{0.05, 2, 10})) \ M(tiflash_raft_process_keys, \ diff --git a/dbms/src/Debug/MockRaftStoreProxy.cpp b/dbms/src/Debug/MockRaftStoreProxy.cpp index cd30abd1d6d..f971aba5780 100644 --- a/dbms/src/Debug/MockRaftStoreProxy.cpp +++ b/dbms/src/Debug/MockRaftStoreProxy.cpp @@ -180,9 +180,10 @@ static RaftstoreVer fn_get_cluster_raftstore_version(RaftStoreProxyPtr ptr, uint } // Must call `RustGcHelper` to gc the returned pointer in the end. -static RustStrWithView fn_get_config_json(RaftStoreProxyPtr, ConfigJsonType) +static RustStrWithView fn_get_config_json(RaftStoreProxyPtr ptr, ConfigJsonType) { - auto * s = new std::string(R"({"raftstore":{"snap-handle-pool-size":4}})"); + auto & x = as_ref(ptr); + auto * s = new std::string(x.proxy_config_string); GCMonitor::instance().add(RawObjType::MockString, 1); return RustStrWithView{ .buff = cppStringAsBuff(*s), @@ -916,6 +917,7 @@ void MockRaftStoreProxy::Cf::finish_file(SSTFormatKind kind) kv_list.emplace_back(kv.first, kv.second); } std::sort(kv_list.begin(), kv_list.end(), [](const auto & a, const auto & b) { return a.first < b.first; }); + std::scoped_lock lock(MockSSTReader::mut); auto & mmp = MockSSTReader::getMockSSTData(); mmp[MockSSTReader::Key{region_id_str, type}] = std::move(kv_list); kvs.clear(); @@ -943,6 +945,7 @@ MockRaftStoreProxy::Cf::Cf(UInt64 region_id_, TableID table_id_, ColumnFamilyTyp , c(0) , freezed(false) { + std::scoped_lock lock(MockSSTReader::mut); auto & mmp = MockSSTReader::getMockSSTData(); auto region_id_str = std::to_string(region_id) + "_multi_" + std::to_string(c); mmp[MockSSTReader::Key{region_id_str, type}].clear(); diff --git a/dbms/src/Debug/MockRaftStoreProxy.h b/dbms/src/Debug/MockRaftStoreProxy.h index 0ac1efd7179..8836de8afe8 100644 --- a/dbms/src/Debug/MockRaftStoreProxy.h +++ b/dbms/src/Debug/MockRaftStoreProxy.h @@ -293,6 +293,7 @@ struct MockRaftStoreProxy : MutexLockWrap log = Logger::get("MockRaftStoreProxy"); table_id = 1; cluster_ver = RaftstoreVer::V1; + proxy_config_string = R"({"raftstore":{"snap-handle-pool-size":4}})"; } // Mock Proxy will drop read index requests to these regions @@ -304,6 +305,7 @@ struct MockRaftStoreProxy : MutexLockWrap AsyncWaker::Notifier notifier; TableID table_id; RaftstoreVer cluster_ver; + std::string proxy_config_string; LoggerPtr log; }; diff --git a/dbms/src/Debug/MockSSTReader.cpp b/dbms/src/Debug/MockSSTReader.cpp index 5b0788009d7..393abab97a5 100644 --- a/dbms/src/Debug/MockSSTReader.cpp +++ b/dbms/src/Debug/MockSSTReader.cpp @@ -24,6 +24,7 @@ namespace DB SSTReaderPtr fn_get_sst_reader(SSTView v, RaftStoreProxyPtr) { std::string s(v.path.data, v.path.len); + std::scoped_lock lock(MockSSTReader::mut); auto iter = MockSSTReader::getMockSSTData().find({s, v.type}); if (iter == MockSSTReader::getMockSSTData().end()) throw Exception("Can not find data in MockSSTData, [key=" + s + "] [type=" + CFToName(v.type) + "]"); diff --git a/dbms/src/Debug/MockSSTReader.h b/dbms/src/Debug/MockSSTReader.h index 04ac47000a6..5d303456c81 100644 --- a/dbms/src/Debug/MockSSTReader.h +++ b/dbms/src/Debug/MockSSTReader.h @@ -19,6 +19,7 @@ #include #include +#include namespace DB @@ -116,6 +117,9 @@ struct MockSSTReader static std::map & getMockSSTData() { return MockSSTData; } +public: + static std::mutex mut; + private: Data::const_iterator iter; Data::const_iterator begin; @@ -129,5 +133,8 @@ struct MockSSTReader static std::map MockSSTData; }; +inline std::map MockSSTReader::MockSSTData; +inline std::mutex MockSSTReader::mut; + SSTReaderInterfaces make_mock_sst_reader_interface(); } // namespace DB diff --git a/dbms/src/Debug/dbgFuncMockRaftSnapshot.cpp b/dbms/src/Debug/dbgFuncMockRaftSnapshot.cpp index d2062d92bd1..4d9e4c7d9fc 100644 --- a/dbms/src/Debug/dbgFuncMockRaftSnapshot.cpp +++ b/dbms/src/Debug/dbgFuncMockRaftSnapshot.cpp @@ -263,8 +263,6 @@ void MockRaftCommand::dbgFuncRegionSnapshot(Context & context, const ASTs & args table_id)); } -std::map MockSSTReader::MockSSTData; - class RegionMockTest final { public: diff --git a/dbms/src/Storages/KVStore/KVStore.cpp b/dbms/src/Storages/KVStore/KVStore.cpp index 47628d19776..34759c91659 100644 --- a/dbms/src/Storages/KVStore/KVStore.cpp +++ b/dbms/src/Storages/KVStore/KVStore.cpp @@ -108,6 +108,11 @@ void KVStore::restore(PathPool & path_pool, const TiFlashRaftProxyHelper * proxy } } + fetchProxyConfig(proxy_helper); +} + +void KVStore::fetchProxyConfig(const TiFlashRaftProxyHelper * proxy_helper) +{ // Try fetch proxy's config as a json string if (proxy_helper && proxy_helper->fn_get_config_json) { diff --git a/dbms/src/Storages/KVStore/KVStore.h b/dbms/src/Storages/KVStore/KVStore.h index 006b4bc2861..c41cc793b87 100644 --- a/dbms/src/Storages/KVStore/KVStore.h +++ b/dbms/src/Storages/KVStore/KVStore.h @@ -23,6 +23,7 @@ #include #include +#include #include namespace TiDB @@ -249,6 +250,7 @@ class KVStore final : private boost::noncopyable RaftLogEagerGcTasks::Hints getRaftLogGcHints(); void applyRaftLogGcTaskRes(const RaftLogGcTasksRes & res) const; const ProxyConfigSummary & getProxyConfigSummay() const { return proxy_config_summary; } + size_t getMaxParallelPrehandleSize() const; #ifndef DBMS_PUBLIC_GTEST private: @@ -361,6 +363,7 @@ class KVStore final : private boost::noncopyable void releaseReadIndexWorkers(); void handleDestroy(UInt64 region_id, TMTContext & tmt, const KVStoreTaskLock &); + void fetchProxyConfig(const TiFlashRaftProxyHelper * proxy_helper); #ifndef DBMS_PUBLIC_GTEST private: diff --git a/dbms/src/Storages/KVStore/MultiRaft/PreHandlingTrace.h b/dbms/src/Storages/KVStore/MultiRaft/PreHandlingTrace.h index 1a34c2a9191..e7b5b38a2c9 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/PreHandlingTrace.h +++ b/dbms/src/Storages/KVStore/MultiRaft/PreHandlingTrace.h @@ -31,8 +31,16 @@ struct PreHandlingTrace : MutexLockWrap {} std::atomic_bool abort_flag; }; + std::unordered_map> tasks; + std::atomic ongoing_prehandle_subtask_count{0}; + std::mutex cpu_resource_mut; + std::condition_variable cpu_resource_cv; + LoggerPtr log; + PreHandlingTrace() + : log(Logger::get("PreHandlingTrace")) + {} std::shared_ptr registerTask(uint64_t region_id) { // Automaticlly override the old one. @@ -61,5 +69,18 @@ struct PreHandlingTrace : MutexLockWrap auto _ = genLockGuard(); return tasks.find(region_id) != tasks.end(); } + void waitForSubtaskResources(uint64_t region_id, size_t parallel, size_t parallel_subtask_limit); + void releaseSubtaskResources(uint64_t region_id, size_t split_id) + { + std::unique_lock cpu_resource_lock(cpu_resource_mut); + // TODO(split) refine this to avoid notify_all + auto prev = ongoing_prehandle_subtask_count.fetch_sub(1); + RUNTIME_CHECK_MSG( + prev > 0, + "Try to decrease prehandle subtask count to below 0, region_id={}, split_id={}", + region_id, + split_id); + cpu_resource_cv.notify_all(); + } }; } // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/KVStore/MultiRaft/PrehandleSnapshot.cpp b/dbms/src/Storages/KVStore/MultiRaft/PrehandleSnapshot.cpp index ef0228fa006..0265610f0aa 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/PrehandleSnapshot.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/PrehandleSnapshot.cpp @@ -34,6 +34,7 @@ namespace CurrentMetrics { extern const Metric RaftNumPrehandlingSubTasks; +extern const Metric RaftNumParallelPrehandlingTasks; } // namespace CurrentMetrics namespace DB @@ -43,6 +44,7 @@ namespace FailPoints extern const char force_set_sst_to_dtfile_block_size[]; extern const char force_set_parallel_prehandle_threshold[]; extern const char force_raise_prehandle_exception[]; +extern const char pause_before_prehandle_subtask[]; } // namespace FailPoints namespace ErrorCodes @@ -66,9 +68,57 @@ struct ReadFromStreamResult RegionPtr region; }; +void PreHandlingTrace::waitForSubtaskResources(uint64_t region_id, size_t parallel, size_t parallel_subtask_limit) +{ + { + auto current = ongoing_prehandle_subtask_count.load(); + if (current + parallel <= parallel_subtask_limit + && ongoing_prehandle_subtask_count.compare_exchange_weak(current, current + parallel)) + { + LOG_DEBUG( + log, + "Prehandle resource meet, limit={}, current={}, region_id={}", + parallel_subtask_limit, + ongoing_prehandle_subtask_count.load(), + region_id); + return; + } + } + Stopwatch watch; + LOG_DEBUG( + log, + "Prehandle resource wait begin, limit={} current={} parallel={} region_id={}", + parallel_subtask_limit, + ongoing_prehandle_subtask_count.load(), + parallel, + region_id); + while (true) + { + std::unique_lock cpu_resource_lock{cpu_resource_mut}; + cpu_resource_cv.wait(cpu_resource_lock, [&]() { + return ongoing_prehandle_subtask_count.load() + parallel <= parallel_subtask_limit; + }); + auto current = ongoing_prehandle_subtask_count.load(); + if (current + parallel <= parallel_subtask_limit + && ongoing_prehandle_subtask_count.compare_exchange_weak(current, current + parallel)) + { + break; + } + } + GET_METRIC(tiflash_raft_command_duration_seconds, type_apply_snapshot_predecode_parallel_wait) + .Observe(watch.elapsedSeconds()); + LOG_INFO( + log, + "Prehandle resource acquired after {:.3f} seconds, region_id={} parallel={}", + watch.elapsedSeconds(), + region_id, + parallel); +} + static inline std::tuple executeTransform( LoggerPtr log, const RegionPtr & new_region, + PreHandlingTrace & trace, const std::shared_ptr & prehandle_task, DM::FileConvertJobType job_type, const std::shared_ptr & storage, @@ -76,14 +126,18 @@ static inline std::tuple executeTransform const DM::SSTFilesToBlockInputStreamOpts & opts, TMTContext & tmt) { + auto region_id = new_region->id(); + auto split_id = sst_stream->getSplitId(); CurrentMetrics::add(CurrentMetrics::RaftNumPrehandlingSubTasks); - SCOPE_EXIT({ CurrentMetrics::sub(CurrentMetrics::RaftNumPrehandlingSubTasks); }); + SCOPE_EXIT({ + trace.releaseSubtaskResources(region_id, split_id); + CurrentMetrics::sub(CurrentMetrics::RaftNumPrehandlingSubTasks); + }); LOG_INFO( log, "Add prehandle task split_id={} limit={}", - sst_stream->getSplitId(), + split_id, sst_stream->getSoftLimit().has_value() ? sst_stream->getSoftLimit()->toDebugString() : ""); - auto region_id = new_region->id(); std::shared_ptr> stream; // If any schema changes is detected during decoding SSTs to DTFiles, we need to cancel and recreate DTFiles with // the latest schema. Or we will get trouble in `BoundedSSTFilesToBlockInputStream`. @@ -96,7 +150,7 @@ static inline std::tuple executeTransform sst_stream, ::DB::TiDBPkColumnID, opts.schema_snap, - sst_stream->getSplitId()); + split_id); stream = std::make_shared>( opts.log_prefix, @@ -127,6 +181,8 @@ static inline std::tuple executeTransform } } }); + FAIL_POINT_PAUSE(FailPoints::pause_before_prehandle_subtask); + stream->write(); stream->writeSuffix(); auto res = ReadFromStreamResult{.error = ReadFromStreamError::Ok, .extra_msg = "", .region = new_region}; @@ -215,6 +271,22 @@ PrehandleResult KVStore::preHandleSnapshotToFiles( return result; } +size_t KVStore::getMaxParallelPrehandleSize() const +{ + const auto & proxy_config = getProxyConfigSummay(); + size_t total_concurrency = 0; + if (proxy_config.valid) + { + total_concurrency = proxy_config.snap_handle_pool_size; + } + else + { + auto cpu_num = std::thread::hardware_concurrency(); + total_concurrency = static_cast(std::clamp(cpu_num * 0.7, 2.0, 16.0)); + } + return total_concurrency; +} + // If size is 0, do not parallel prehandle for this snapshot, which is legacy. // If size is non-zero, use extra this many threads to prehandle. static inline std::pair, size_t> getSplitKey( @@ -235,8 +307,6 @@ static inline std::pair, size_t> getSplitKey( // although it is optimized to bring about the minimum overhead. if (new_region->getClusterRaftstoreVer() != RaftstoreVer::V2) return std::make_pair(std::vector{}, 0); - if (kvstore->getOngoingPrehandleTaskCount() >= 2) - return std::make_pair(std::vector{}, 0); auto approx_bytes = sst_stream->getApproxBytes(); if (approx_bytes <= parallel_prehandle_threshold) { @@ -251,17 +321,8 @@ static inline std::pair, size_t> getSplitKey( // Get this info again, since getApproxBytes maybe take some time. auto ongoing_count = kvstore->getOngoingPrehandleTaskCount(); - const auto & proxy_config = kvstore->getProxyConfigSummay(); uint64_t want_split_parts = 0; - size_t total_concurrency = 0; - if (proxy_config.valid) - { - total_concurrency = proxy_config.snap_handle_pool_size; - } - else - { - total_concurrency = std::thread::hardware_concurrency(); - } + auto total_concurrency = kvstore->getMaxParallelPrehandleSize(); if (total_concurrency + 1 > ongoing_count) { // Current thread takes 1 which is in `ongoing_count`. @@ -311,12 +372,15 @@ static inline std::pair, size_t> getSplitKey( LOG_INFO( log, "getSplitKey result {}, total_concurrency={} ongoing={} total_split_parts={} split_keys={} " + "region_range={} approx_bytes={} " "region_id={}", fmt_buf.toString(), total_concurrency, ongoing_count, want_split_parts, split_keys.size(), + new_region->getRange()->toDebugString(), + approx_bytes, new_region->id()); } return std::make_pair(std::move(split_keys), approx_bytes); @@ -333,6 +397,7 @@ using ParallelPrehandleCtxPtr = std::shared_ptr; static void runInParallel( LoggerPtr log, RegionPtr new_region, + PreHandlingTrace & trace, DM::SSTFilesToBlockInputStreamOpts & opt, const SSTViewVec & snaps, const TiFlashRaftProxyHelper * proxy_helper, @@ -358,8 +423,16 @@ static void runInParallel( DM::SSTFilesToBlockInputStreamOpts(opt)); try { - auto [part_result, part_prehandle_result] - = executeTransform(log, part_new_region, prehandle_task, job_type, dm_storage, part_sst_stream, opt, tmt); + auto [part_result, part_prehandle_result] = executeTransform( + log, + part_new_region, + trace, + prehandle_task, + job_type, + dm_storage, + part_sst_stream, + opt, + tmt); LOG_INFO( log, "Finished extra parallel prehandle task limit {} write cf {} lock cf {} default cf {} dmfiles {} error {}, " @@ -403,6 +476,7 @@ static void runInParallel( void executeParallelTransform( LoggerPtr log, ReadFromStreamResult & result, + PreHandlingTrace & trace, PrehandleResult & prehandle_result, const std::vector & split_keys, std::shared_ptr sst_stream, @@ -416,6 +490,8 @@ void executeParallelTransform( uint64_t index, std::shared_ptr storage) { + CurrentMetrics::add(CurrentMetrics::RaftNumParallelPrehandlingTasks); + SCOPE_EXIT({ CurrentMetrics::sub(CurrentMetrics::RaftNumParallelPrehandlingTasks); }); using SingleSnapshotAsyncTasks = AsyncTasks, bool>; auto split_key_count = split_keys.size(); RUNTIME_CHECK_MSG( @@ -428,6 +504,7 @@ void executeParallelTransform( new_region->getRange()->toDebugString(), split_key_count, new_region->id()); + Stopwatch watch; // Make sure the queue is bigger than `split_key_count`, otherwise `addTask` may fail. auto async_tasks = SingleSnapshotAsyncTasks(split_key_count, split_key_count, split_key_count + 5); sst_stream->resetSoftLimit( @@ -445,6 +522,7 @@ void executeParallelTransform( runInParallel( log, new_region, + trace, opt, snaps, proxy_helper, @@ -466,7 +544,7 @@ void executeParallelTransform( } // This will read the keys from the beginning to the first split key auto [head_result, head_prehandle_result] - = executeTransform(log, new_region, prehandle_task, job_type, storage, sst_stream, opt, tmt); + = executeTransform(log, new_region, trace, prehandle_task, job_type, storage, sst_stream, opt, tmt); LOG_INFO( log, "Finished extra parallel prehandle task limit {} write cf {} lock cf {} default cf {} dmfiles {} " @@ -518,10 +596,13 @@ void executeParallelTransform( } LOG_INFO( log, - "Finished all extra parallel prehandle task write cf {} dmfiles {} error {}, region_id={}", + "Finished all extra parallel prehandle task, write_cf={} dmfiles={} error={} splits={} cost={:.3f}s " + "region_id={}", prehandle_result.stats.write_cf_keys, prehandle_result.ingest_ids.size(), magic_enum::enum_name(head_result.error), + split_key_count, + watch.elapsedSeconds(), new_region->id()); } else @@ -614,23 +695,32 @@ PrehandleResult KVStore::preHandleSSTsToDTFiles( // `split_keys` do not begin with 'z'. auto [split_keys, approx_bytes] = getSplitKey(log, this, new_region, sst_stream); - + prehandling_trace.waitForSubtaskResources(region_id, split_keys.size() + 1, getMaxParallelPrehandleSize()); ReadFromStreamResult result; if (split_keys.empty()) { LOG_INFO( log, - "Single threaded prehandling for single big region, range={}, region_id={}", + "Single threaded prehandling for single region, range={} region_id={}", new_region->getRange()->toDebugString(), new_region->id()); - std::tie(result, prehandle_result) - = executeTransform(log, new_region, prehandle_task, job_type, storage, sst_stream, opt, tmt); + std::tie(result, prehandle_result) = executeTransform( + log, + new_region, + prehandling_trace, + prehandle_task, + job_type, + storage, + sst_stream, + opt, + tmt); } else { executeParallelTransform( log, result, + prehandling_trace, prehandle_result, split_keys, sst_stream, diff --git a/dbms/src/Storages/KVStore/tests/gtest_raftstore_v2.cpp b/dbms/src/Storages/KVStore/tests/gtest_raftstore_v2.cpp index 73736ab4a6e..f39be620f41 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_raftstore_v2.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_raftstore_v2.cpp @@ -22,6 +22,7 @@ namespace DB namespace FailPoints { extern const char force_raise_prehandle_exception[]; +extern const char pause_before_prehandle_subtask[]; extern const char force_set_sst_to_dtfile_block_size[]; } // namespace FailPoints @@ -428,5 +429,111 @@ try } } CATCH + +// Test if parallel limit is reached. +TEST_F(RegionKVStoreTest, KVStoreSingleSnap6) +try +{ + auto ctx = TiFlashTestEnv::getGlobalContext(); + proxy_instance->cluster_ver = RaftstoreVer::V2; + proxy_instance->proxy_config_string = R"({"raftstore":{"snap-handle-pool-size":3}})"; + KVStore & kvs = getKVS(); + kvs.fetchProxyConfig(proxy_helper.get()); + ASSERT_NE(proxy_helper->sst_reader_interfaces.fn_key, nullptr); + ASSERT_NE(proxy_helper->fn_get_config_json, nullptr); + UInt64 region_id = 1; + TableID table_id; + FailPointHelper::enableFailPoint(FailPoints::force_set_parallel_prehandle_threshold, static_cast(0)); + SCOPE_EXIT({ FailPointHelper::disableFailPoint("force_set_parallel_prehandle_threshold"); }); + { + initStorages(); + std::vector region_ids = {2, 3, 4}; + region_id = region_ids[0]; + std::vector table_limits = {0, 90, 180, 270}; + table_id = proxy_instance->bootstrapTable(ctx, kvs, ctx.getTMTContext()); + + proxy_instance->bootstrapWithRegion( + kvs, + ctx.getTMTContext(), + region_id, + std::make_pair( + RecordKVFormat::genKey(table_id, table_limits[0]).toString(), + RecordKVFormat::genKey(table_id, table_limits[1]).toString())); + + auto ranges = std::vector>(); + for (size_t i = 1; i + 1 < table_limits.size(); i++) + { + ranges.push_back(std::make_pair( + RecordKVFormat::genKey(table_id, table_limits[i]).toString(), + RecordKVFormat::genKey(table_id, table_limits[i + 1]).toString())); + } + proxy_instance->debugAddRegions( + kvs, + ctx.getTMTContext(), + std::vector(region_ids.begin() + 1, region_ids.end()), + std::move(ranges)); + auto r1 = proxy_instance->getRegion(region_id); + + MockSSTReader::getMockSSTData().clear(); + DB::FailPointHelper::enablePauseFailPoint(DB::FailPoints::pause_before_prehandle_subtask, 100); + std::vector ths; + auto runId = [&](size_t ths_id) { + auto [value_write, value_default] = proxy_instance->generateTiKVKeyValue(111, 999); + MockRaftStoreProxy::Cf default_cf{region_ids[ths_id], table_id, ColumnFamilyType::Default}; + for (HandleID h = table_limits[ths_id]; h < table_limits[ths_id + 1]; h++) + { + auto k = RecordKVFormat::genKey(table_id, h, 111); + default_cf.insert_raw(k, value_default); + } + default_cf.finish_file(SSTFormatKind::KIND_TABLET); + default_cf.freeze(); + MockRaftStoreProxy::Cf write_cf{region_ids[ths_id], table_id, ColumnFamilyType::Write}; + for (HandleID h = table_limits[ths_id]; h < table_limits[ths_id + 1]; h++) + { + auto k = RecordKVFormat::genKey(table_id, h, 111); + write_cf.insert_raw(k, value_default); + } + write_cf.finish_file(SSTFormatKind::KIND_TABLET); + write_cf.freeze(); + + { + auto [kvr1, res] = proxy_instance->snapshot( + kvs, + ctx.getTMTContext(), + region_ids[ths_id], + {default_cf, write_cf}, + 0, + 0, + std::nullopt); + } + }; + ths.push_back(std::thread(runId, 0)); + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + + ASSERT_EQ(kvs.getOngoingPrehandleTaskCount(), 1); + for (size_t ths_id = 1; ths_id < region_ids.size(); ths_id++) + { + ths.push_back(std::thread(runId, ths_id)); + } + + auto loop = 0; + // All threads can be prehandled. + while (kvs.getOngoingPrehandleTaskCount() != 3) + { + loop += 1; + ASSERT(loop < 30); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + ASSERT_EQ(kvs.prehandling_trace.ongoing_prehandle_subtask_count.load(), 3); + DB::FailPointHelper::disableFailPoint(DB::FailPoints::pause_before_prehandle_subtask); + for (auto && t : ths) + { + t.join(); + } + ASSERT_EQ(kvs.getOngoingPrehandleTaskCount(), 0); + } +} +CATCH + } // namespace tests } // namespace DB diff --git a/metrics/grafana/tiflash_summary.json b/metrics/grafana/tiflash_summary.json index b51c44fc702..95d2d4173de 100644 --- a/metrics/grafana/tiflash_summary.json +++ b/metrics/grafana/tiflash_summary.json @@ -11782,6 +11782,14 @@ "intervalFactor": 1, "legendFormat": "PrehandleSubtasks-{{instance}}", "refId": "A" + }, + { + "expr": "sum(tiflash_system_current_metric_RaftNumParallelPrehandlingTasks{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}) by (instance)", + "format": "time_series", + "hide": false, + "intervalFactor": 1, + "legendFormat": "ParallelTasks-{{instance}}", + "refId": "C" } ], "thresholds": [],