diff --git a/dbms/src/Common/FailPoint.cpp b/dbms/src/Common/FailPoint.cpp index 6b15cb3db6c..af93d4c9cd3 100644 --- a/dbms/src/Common/FailPoint.cpp +++ b/dbms/src/Common/FailPoint.cpp @@ -67,6 +67,7 @@ std::unordered_map> FailPointHelper::f M(exception_in_creating_set_input_stream) \ M(exception_when_read_from_log) \ M(exception_mpp_hash_build) \ + M(exception_mpp_hash_probe) \ M(exception_before_drop_segment) \ M(exception_after_drop_segment) \ M(exception_between_schema_change_in_the_same_diff) \ diff --git a/dbms/src/DataStreams/CreatingSetsBlockInputStream.cpp b/dbms/src/DataStreams/CreatingSetsBlockInputStream.cpp index 5ce4a8a799d..9376549204f 100644 --- a/dbms/src/DataStreams/CreatingSetsBlockInputStream.cpp +++ b/dbms/src/DataStreams/CreatingSetsBlockInputStream.cpp @@ -30,7 +30,6 @@ namespace DB namespace FailPoints { extern const char exception_in_creating_set_input_stream[]; -extern const char exception_mpp_hash_build[]; } // namespace FailPoints namespace ErrorCodes { @@ -120,7 +119,7 @@ void CreatingSetsBlockInputStream::createAll() for (auto & elem : subqueries_for_sets) { if (elem.second.join) - elem.second.join->setBuildTableState(Join::BuildTableState::WAITING); + elem.second.join->setInitActiveBuildConcurrency(); } } Stopwatch watch; @@ -238,13 +237,6 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery) } } - - if (subquery.join) - { - FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_mpp_hash_build); - subquery.join->setBuildTableState(Join::BuildTableState::SUCCEED); - } - if (table_out) table_out->writeSuffix(); @@ -294,7 +286,7 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery) std::unique_lock lock(exception_mutex); exception_from_workers.push_back(std::current_exception()); if (subquery.join) - subquery.join->setBuildTableState(Join::BuildTableState::FAILED); + subquery.join->meetError(); LOG_ERROR(log, "{} throw exception: {} In {} sec. ", gen_log_msg(), getCurrentExceptionMessage(false, true), watch.elapsedSeconds()); } } diff --git a/dbms/src/DataStreams/HashJoinBuildBlockInputStream.cpp b/dbms/src/DataStreams/HashJoinBuildBlockInputStream.cpp index 61808b48c50..cf8520a6fdd 100644 --- a/dbms/src/DataStreams/HashJoinBuildBlockInputStream.cpp +++ b/dbms/src/DataStreams/HashJoinBuildBlockInputStream.cpp @@ -19,11 +19,22 @@ namespace DB { Block HashJoinBuildBlockInputStream::readImpl() { - Block block = children.back()->read(); - if (!block) + try + { + Block block = children.back()->read(); + if (!block) + { + join->finishOneBuild(); + return block; + } + join->insertFromBlock(block, concurrency_build_index); return block; - join->insertFromBlock(block, concurrency_build_index); - return block; + } + catch (...) + { + join->meetError(); + throw; + } } void HashJoinBuildBlockInputStream::appendInfo(FmtBuffer & buffer) const diff --git a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp index c68e032ea16..283883fee13 100644 --- a/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp +++ b/dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp @@ -83,19 +83,27 @@ void HashJoinProbeBlockInputStream::cancel(bool kill) Block HashJoinProbeBlockInputStream::readImpl() { - // if join finished, return {} directly. - if (squashing_transform.isJoinFinished()) + try { - return Block{}; - } + // if join finished, return {} directly. + if (squashing_transform.isJoinFinished()) + { + return Block{}; + } - while (squashing_transform.needAppendBlock()) + while (squashing_transform.needAppendBlock()) + { + Block result_block = getOutputBlock(); + squashing_transform.appendBlock(result_block); + } + auto ret = squashing_transform.getFinalOutputBlock(); + return ret; + } + catch (...) { - Block result_block = getOutputBlock(); - squashing_transform.appendBlock(result_block); + join->meetError(); + throw; } - auto ret = squashing_transform.getFinalOutputBlock(); - return ret; } void HashJoinProbeBlockInputStream::readSuffixImpl() diff --git a/dbms/src/Debug/DBGInvoker.cpp b/dbms/src/Debug/DBGInvoker.cpp index 29f2eae43ac..67158dfbf43 100644 --- a/dbms/src/Debug/DBGInvoker.cpp +++ b/dbms/src/Debug/DBGInvoker.cpp @@ -127,6 +127,8 @@ DBGInvoker::DBGInvoker() regSchemalessFunc("gc_global_storage_pool", dbgFuncTriggerGlobalPageStorageGC); regSchemalessFunc("read_index_stress_test", ReadIndexStressTest::dbgFuncStressTest); + + regSchemalessFunc("get_active_threads_in_dynamic_thread_pool", dbgFuncActiveThreadsInDynamicThreadPool); } void replaceSubstr(std::string & str, const std::string & target, const std::string & replacement) diff --git a/dbms/src/Debug/dbgFuncMisc.cpp b/dbms/src/Debug/dbgFuncMisc.cpp index cbdf4629cb5..352d9fa6a2c 100644 --- a/dbms/src/Debug/dbgFuncMisc.cpp +++ b/dbms/src/Debug/dbgFuncMisc.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include @@ -130,4 +131,17 @@ void dbgFuncTriggerGlobalPageStorageGC(Context & context, const ASTs & /*args*/, global_storage_pool->gc(); } } + +void dbgFuncActiveThreadsInDynamicThreadPool(Context &, const ASTs &, DBGInvoker::Printer output) +{ + if (DynamicThreadPool::global_instance) + { + auto value = GET_METRIC(tiflash_thread_count, type_active_threads_of_thdpool).Value(); + output(std::to_string(static_cast(value))); + } + else + { + output("0"); + } +} } // namespace DB diff --git a/dbms/src/Debug/dbgFuncMisc.h b/dbms/src/Debug/dbgFuncMisc.h index c256e6b41c0..dcb5d2adfe3 100644 --- a/dbms/src/Debug/dbgFuncMisc.h +++ b/dbms/src/Debug/dbgFuncMisc.h @@ -18,7 +18,6 @@ namespace DB { - class Context; // Find the last occurence of `key` in log file and extract the first number follow the key. @@ -33,4 +32,7 @@ void dbgFuncSearchLogForKey(Context & context, const ASTs & args, DBGInvoker::Pr // ./storage-client.sh "DBGInvoke trigger_global_storage_pool_gc()" void dbgFuncTriggerGlobalPageStorageGC(Context & context, const ASTs & args, DBGInvoker::Printer output); +// Get active threads in dynamic thread pool, if dynamic thread pool is disabled, return 0 +void dbgFuncActiveThreadsInDynamicThreadPool(Context & context, const ASTs & /*args*/, DBGInvoker::Printer /*output*/); + } // namespace DB diff --git a/dbms/src/Interpreters/Join.cpp b/dbms/src/Interpreters/Join.cpp index de0a3a6713e..42234a2d541 100644 --- a/dbms/src/Interpreters/Join.cpp +++ b/dbms/src/Interpreters/Join.cpp @@ -38,6 +38,8 @@ namespace FailPoints { extern const char random_join_build_failpoint[]; extern const char random_join_prob_failpoint[]; +extern const char exception_mpp_hash_build[]; +extern const char exception_mpp_hash_probe[]; } // namespace FailPoints namespace ErrorCodes @@ -135,6 +137,7 @@ Join::Join( , key_names_left(key_names_left_) , key_names_right(key_names_right_) , build_concurrency(0) + , active_build_concurrency(0) , probe_concurrency(0) , active_probe_concurrency(0) , collators(collators_) @@ -145,7 +148,6 @@ Join::Join( , other_condition_ptr(other_condition_ptr_) , original_strictness(strictness) , max_block_size_for_cross_join(max_block_size_) - , build_table_state(BuildTableState::SUCCEED) , log(Logger::get(req_id)) , enable_fine_grained_shuffle(enable_fine_grained_shuffle_) , fine_grained_shuffle_count(fine_grained_shuffle_count_) @@ -165,11 +167,14 @@ Join::Join( LOG_INFO(log, "FineGrainedShuffle flag {}, stream count {}", enable_fine_grained_shuffle, fine_grained_shuffle_count); } -void Join::setBuildTableState(BuildTableState state_) +void Join::meetError() { - std::lock_guard lk(build_table_mutex); - build_table_state = state_; - build_table_cv.notify_all(); + std::lock_guard lk(build_probe_mutex); + if (meet_error) + return; + meet_error = true; + build_cv.notify_all(); + probe_cv.notify_all(); } bool CanAsColumnString(const IColumn * column) @@ -465,6 +470,8 @@ void Join::setBuildConcurrencyAndInitPool(size_t build_concurrency_) { if (unlikely(build_concurrency > 0)) throw Exception("Logical error: `setBuildConcurrencyAndInitPool` shouldn't be called more than once", ErrorCodes::LOGICAL_ERROR); + /// do not set active_build_concurrency because in compile stage, `joinBlock` will be called to get generate header, if active_build_concurrency + /// is set here, `joinBlock` will hang when used to get header build_concurrency = std::max(1, build_concurrency_); for (size_t i = 0; i < getBuildConcurrencyInternal(); ++i) @@ -1992,16 +1999,52 @@ void Join::checkTypesOfKeys(const Block & block_left, const Block & block_right) } } -Block Join::joinBlock(ProbeProcessInfo & probe_process_info) const +void Join::finishOneProbe() { - // ck will use this function to generate header, that's why here is a check. + std::unique_lock lock(build_probe_mutex); + if (active_probe_concurrency == 1) { - std::unique_lock lk(build_table_mutex); - - build_table_cv.wait(lk, [&]() { return build_table_state != BuildTableState::WAITING; }); - if (build_table_state == BuildTableState::FAILED) /// throw this exception once failed to build the hash table - throw Exception("Build failed before join probe!"); + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_mpp_hash_probe); } + --active_probe_concurrency; + if (active_probe_concurrency == 0) + probe_cv.notify_all(); +} +void Join::finishOneBuild() +{ + std::unique_lock lock(build_probe_mutex); + if (active_build_concurrency == 1) + { + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_mpp_hash_build); + } + --active_build_concurrency; + if (active_build_concurrency == 0) + build_cv.notify_all(); +} + +void Join::waitUntilAllProbeFinished() const +{ + std::unique_lock lock(build_probe_mutex); + probe_cv.wait(lock, [&]() { + return meet_error || active_probe_concurrency == 0; + }); + if (meet_error) + throw Exception("Join meet error before all probe finished!"); +} + +void Join::waitUntilAllBuildFinished() const +{ + std::unique_lock lock(build_probe_mutex); + build_cv.wait(lock, [&]() { + return meet_error || active_build_concurrency == 0; + }); + if (meet_error) + throw Exception("Build failed before join probe!"); +} + +Block Join::joinBlock(ProbeProcessInfo & probe_process_info) const +{ + waitUntilAllBuildFinished(); std::shared_lock lock(rwlock); diff --git a/dbms/src/Interpreters/Join.h b/dbms/src/Interpreters/Join.h index 5c5b89a52d3..7cdab351e0a 100644 --- a/dbms/src/Interpreters/Join.h +++ b/dbms/src/Interpreters/Join.h @@ -146,31 +146,27 @@ class Join const Names & getLeftJoinKeys() const { return key_names_left; } + void setInitActiveBuildConcurrency() + { + std::unique_lock lock(build_probe_mutex); + active_build_concurrency = getBuildConcurrencyInternal(); + } + void finishOneBuild(); + void waitUntilAllBuildFinished() const; + size_t getProbeConcurrency() const { - std::unique_lock lock(probe_mutex); + std::unique_lock lock(build_probe_mutex); return probe_concurrency; } void setProbeConcurrency(size_t concurrency) { - std::unique_lock lock(probe_mutex); + std::unique_lock lock(build_probe_mutex); probe_concurrency = concurrency; active_probe_concurrency = probe_concurrency; } - void finishOneProbe() - { - std::unique_lock lock(probe_mutex); - active_probe_concurrency--; - if (active_probe_concurrency == 0) - probe_cv.notify_all(); - } - void waitUntilAllProbeFinished() - { - std::unique_lock lock(probe_mutex); - probe_cv.wait(lock, [&]() { - return active_probe_concurrency == 0; - }); - } + void finishOneProbe(); + void waitUntilAllProbeFinished() const; size_t getBuildConcurrency() const { @@ -178,13 +174,7 @@ class Join return getBuildConcurrencyInternal(); } - enum BuildTableState - { - WAITING, - FAILED, - SUCCEED - }; - void setBuildTableState(BuildTableState state_); + void meetError(); /// Reference to the row in block. struct RowRef @@ -303,13 +293,18 @@ class Join /// Names of key columns (columns for equi-JOIN) in "right" table (in the order they appear in USING clause). const Names key_names_right; + mutable std::mutex build_probe_mutex; + + mutable std::condition_variable build_cv; size_t build_concurrency; + size_t active_build_concurrency; - mutable std::mutex probe_mutex; - std::condition_variable probe_cv; + mutable std::condition_variable probe_cv; size_t probe_concurrency; size_t active_probe_concurrency; + bool meet_error = false; + private: /// collators for the join key const TiDB::TiDBCollators collators; @@ -355,10 +350,6 @@ class Join /// Block with key columns in the same order they appear in the right-side table. Block sample_block_with_keys; - mutable std::mutex build_table_mutex; - mutable std::condition_variable build_table_cv; - BuildTableState build_table_state; - const LoggerPtr log; Block totals; diff --git a/tests/fullstack-test/mpp/mpp_fail.test b/tests/fullstack-test/mpp/mpp_fail.test index ef2eb2286c7..1e580b160f8 100644 --- a/tests/fullstack-test/mpp/mpp_fail.test +++ b/tests/fullstack-test/mpp/mpp_fail.test @@ -16,12 +16,18 @@ => DBGInvoke __init_fail_point() mysql> drop table if exists test.t +mysql> drop table if exists test.t1 mysql> create table test.t (id int, value varchar(64)) mysql> insert into test.t values(1,'a'),(2,'b'),(3,'c') +mysql> create table test.t1 (id int, value varchar(64)) +mysql> insert into test.t1 values(4,'d') mysql> alter table test.t set tiflash replica 1 +mysql> alter table test.t1 set tiflash replica 1 func> wait_table test t +func> wait_table test t1 mysql> analyze table test.t +mysql> analyze table test.t1 # Data. @@ -112,5 +118,40 @@ mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_allow_m {#REGEXP}.*Fail point FailPoints::exception_mpp_hash_build is triggered.* => DBGInvoke __disable_fail_point(exception_mpp_hash_build) +## exception during mpp hash probe +## mysql> desc format='brief' select * from t1 left join t t2 on t1.id = t2.id; +## +-----------------------------------+---------+--------------+---------------+----------------------------------------------------------------------------------------------+ +## | id | estRows | task | access object | operator info | +## +-----------------------------------+---------+--------------+---------------+----------------------------------------------------------------------------------------------+ +## | TableReader | 1.00 | root | | data:ExchangeSender | +## | └─ExchangeSender | 1.00 | mpp[tiflash] | | ExchangeType: PassThrough | +## | └─HashJoin | 1.00 | mpp[tiflash] | | left outer join, equal:[eq(test.t1.id, test.t.id)], stream_count: 8 | +## | ├─ExchangeReceiver(Build) | 1.00 | mpp[tiflash] | | stream_count: 8 | +## | │ └─ExchangeSender | 1.00 | mpp[tiflash] | | ExchangeType: HashPartition, Hash Cols: [name: test.t1.id, collate: binary], stream_count: 8 | +## | │ └─TableFullScan | 1.00 | mpp[tiflash] | table:t1 | keep order:false | +## | └─ExchangeReceiver(Probe) | 5.99 | mpp[tiflash] | | | +## | └─ExchangeSender | 5.99 | mpp[tiflash] | | ExchangeType: HashPartition, Hash Cols: [name: test.t.id, collate: binary] | +## | └─Selection | 5.99 | mpp[tiflash] | | not(isnull(test.t.id)) | +## | └─TableFullScan | 6.00 | mpp[tiflash] | table:t2 | keep order:false, stats:pseudo | +## +-----------------------------------+---------+--------------+---------------+----------------------------------------------------------------------------------------------+ +=> DBGInvoke __enable_fail_point(exception_mpp_hash_probe) +mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_allow_mpp=1; set @@tidb_broadcast_join_threshold_count=0; set @@tidb_broadcast_join_threshold_size=0; select * from t1 left join t t2 on t1.id = t2.id; +{#REGEXP}.*Fail point FailPoints::exception_mpp_hash_probe is triggered.* +=> DBGInvoke __disable_fail_point(exception_mpp_hash_probe) +## sleep 2 seconds to make sure all the running compute threads are finished +mysql> select sleep(2) ++----------+ +| sleep(2) | ++----------+ +| 0 | ++----------+ +## note 1. this test only works if dynamic thread pool is enabled, the result may be false negative if dynamic thread pool is not enabled, it works now because dynamic thread pool is enabled by default +## 2. currently, there are no long live threads that use the dynamic thread pool, so the expected value is 0, need to update the reference if someday some long live threads are using dynamic thread pool +=> DBGInvoke get_active_threads_in_dynamic_thread_pool() +┌─get_active_threads_in_dynamic_thread_pool()─┐ +│ 0 │ +└─────────────────────────────────────────────┘ + # Clean up. -mysql> drop table if exists test.t +# mysql> drop table if exists test.t +# mysql> drop table if exists test.t1