diff --git a/dbms/src/Debug/DBGInvoker.cpp b/dbms/src/Debug/DBGInvoker.cpp index 67158dfbf43..aa9f300cacc 100644 --- a/dbms/src/Debug/DBGInvoker.cpp +++ b/dbms/src/Debug/DBGInvoker.cpp @@ -128,7 +128,7 @@ DBGInvoker::DBGInvoker() regSchemalessFunc("read_index_stress_test", ReadIndexStressTest::dbgFuncStressTest); - regSchemalessFunc("get_active_threads_in_dynamic_thread_pool", dbgFuncActiveThreadsInDynamicThreadPool); + regSchemalessFunc("wait_until_no_temp_active_threads_in_dynamic_thread_pool", dbgFuncWaitUntilNoTempActiveThreadsInDynamicThreadPool); } void replaceSubstr(std::string & str, const std::string & target, const std::string & replacement) diff --git a/dbms/src/Debug/dbgFuncMisc.cpp b/dbms/src/Debug/dbgFuncMisc.cpp index 352d9fa6a2c..4e80071fc04 100644 --- a/dbms/src/Debug/dbgFuncMisc.cpp +++ b/dbms/src/Debug/dbgFuncMisc.cpp @@ -94,7 +94,7 @@ void dbgFuncSearchLogForKey(Context & context, const ASTs & args, DBGInvoker::Pr return; } String target_line; - for (auto iter = key_line_candidates.rbegin(); iter != key_line_candidates.rend(); iter++) + for (auto iter = key_line_candidates.rbegin(); iter != key_line_candidates.rend(); iter++) // NOLINT { if (getReadTSOForLog(*iter) == target_read_tso) { @@ -132,12 +132,32 @@ void dbgFuncTriggerGlobalPageStorageGC(Context & context, const ASTs & /*args*/, } } -void dbgFuncActiveThreadsInDynamicThreadPool(Context &, const ASTs &, DBGInvoker::Printer output) +void dbgFuncWaitUntilNoTempActiveThreadsInDynamicThreadPool(Context &, const ASTs & args, DBGInvoker::Printer output) { if (DynamicThreadPool::global_instance) { - auto value = GET_METRIC(tiflash_thread_count, type_active_threads_of_thdpool).Value(); - output(std::to_string(static_cast(value))); + static const UInt64 MAX_WAIT_TIME = 10; + auto wait_time = safeGet(typeid_cast(*args[0]).value); + wait_time = std::min(wait_time, MAX_WAIT_TIME); + /// should update the value when there is long running threads using dynamic thread pool + static const int expected_value = 0; + + while (wait_time > 0) + { + if (GET_METRIC(tiflash_thread_count, type_active_threads_of_thdpool).Value() == expected_value) + { + output("0"); + return; + } + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + --wait_time; + } + if (GET_METRIC(tiflash_thread_count, type_active_threads_of_thdpool).Value() == expected_value) + { + output("0"); + return; + } + output("1"); } else { diff --git a/dbms/src/Debug/dbgFuncMisc.h b/dbms/src/Debug/dbgFuncMisc.h index dcb5d2adfe3..9e43e8e0145 100644 --- a/dbms/src/Debug/dbgFuncMisc.h +++ b/dbms/src/Debug/dbgFuncMisc.h @@ -32,7 +32,7 @@ void dbgFuncSearchLogForKey(Context & context, const ASTs & args, DBGInvoker::Pr // ./storage-client.sh "DBGInvoke trigger_global_storage_pool_gc()" void dbgFuncTriggerGlobalPageStorageGC(Context & context, const ASTs & args, DBGInvoker::Printer output); -// Get active threads in dynamic thread pool, if dynamic thread pool is disabled, return 0 -void dbgFuncActiveThreadsInDynamicThreadPool(Context & context, const ASTs & /*args*/, DBGInvoker::Printer /*output*/); +// Wait until no active threads in dynamic thread pool finish, if timeout, return 1, else return 0 +void dbgFuncWaitUntilNoTempActiveThreadsInDynamicThreadPool(Context & context, const ASTs & /*args*/, DBGInvoker::Printer /*output*/); } // namespace DB diff --git a/tests/fullstack-test/mpp/mpp_fail.test b/tests/fullstack-test/mpp/mpp_fail.test index e411278c286..320b1ea3f50 100644 --- a/tests/fullstack-test/mpp/mpp_fail.test +++ b/tests/fullstack-test/mpp/mpp_fail.test @@ -31,11 +31,10 @@ mysql> analyze table test.t1 # Data. -## make sure the there is no active threads before mpp_fail.test run -=> DBGInvoke get_active_threads_in_dynamic_thread_pool() -┌─get_active_threads_in_dynamic_thread_pool()─┐ -│ 0 │ -└─────────────────────────────────────────────┘ +=> DBGInvoke wait_until_no_temp_active_threads_in_dynamic_thread_pool(5) +┌─wait_until_no_temp_active_threads_in_dynamic_thread_pool(5)─┐ +│ 0 │ +└─────────────────────────────────────────────────────────────┘ ## exception during mpp hash probe ## mysql> desc format='brief' select * from t1 left join t t2 on t1.id = t2.id; ## +-----------------------------------+---------+--------------+---------------+----------------------------------------------------------------------------------------------+ @@ -56,19 +55,11 @@ mysql> analyze table test.t1 mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_allow_mpp=1; set @@tidb_broadcast_join_threshold_count=0; set @@tidb_broadcast_join_threshold_size=0; select * from t1 left join t t2 on t1.id = t2.id; {#REGEXP}.*Fail point FailPoints::exception_mpp_hash_probe is triggered.* => DBGInvoke __disable_fail_point(exception_mpp_hash_probe) -## sleep 2 seconds to make sure all the running compute threads are finished -mysql> select sleep(2) -+----------+ -| sleep(2) | -+----------+ -| 0 | -+----------+ -## note 1. this test only works if dynamic thread pool is enabled, the result may be false negative if dynamic thread pool is not enabled, it works now because dynamic thread pool is enabled by default -## 2. currently, there are no long live threads that use the dynamic thread pool, so the expected value is 0, need to update the reference if someday some long live threads are using dynamic thread pool -=> DBGInvoke get_active_threads_in_dynamic_thread_pool() -┌─get_active_threads_in_dynamic_thread_pool()─┐ -│ 0 │ -└─────────────────────────────────────────────┘ +## note: this test only works if dynamic thread pool is enabled, the result may be false negative if dynamic thread pool is not enabled, it works now because dynamic thread pool is enabled by default +=> DBGInvoke wait_until_no_temp_active_threads_in_dynamic_thread_pool(5) +┌─wait_until_no_temp_active_threads_in_dynamic_thread_pool(5)─┐ +│ 0 │ +└─────────────────────────────────────────────────────────────┘ ## exception before mpp register non root mpp task => DBGInvoke __enable_fail_point(exception_before_mpp_register_non_root_mpp_task)