From 8a5dc2963af649fc02d15a4034106a50aa9c093e Mon Sep 17 00:00:00 2001 From: yibin Date: Wed, 22 Jun 2022 17:50:37 +0800 Subject: [PATCH] Add random failpoint in critical paths (#4876) close pingcap/tiflash#4807 --- dbms/src/Common/FailPoint.cpp | 65 ++++++++++++++++++- dbms/src/Common/FailPoint.h | 20 +++++- dbms/src/Common/wrapInvocable.h | 1 - .../DataStreams/SharedQueryBlockInputStream.h | 7 ++ dbms/src/DataStreams/SizeLimits.cpp | 23 +++++-- dbms/src/Flash/EstablishCall.cpp | 7 ++ dbms/src/Flash/Mpp/ExchangeReceiver.cpp | 15 ++++- dbms/src/Flash/Mpp/MPPTask.cpp | 11 +++- dbms/src/Flash/Mpp/MPPTaskManager.cpp | 7 ++ dbms/src/Flash/Mpp/MPPTunnel.cpp | 2 + dbms/src/Flash/Mpp/MinTSOScheduler.cpp | 10 ++- dbms/src/Interpreters/Aggregator.cpp | 9 +++ dbms/src/Interpreters/Join.cpp | 12 +++- dbms/src/Interpreters/executeQuery.cpp | 7 +- dbms/src/Server/Server.cpp | 2 + 15 files changed, 181 insertions(+), 17 deletions(-) diff --git a/dbms/src/Common/FailPoint.cpp b/dbms/src/Common/FailPoint.cpp index 10d0a558a50..1dff46c273b 100644 --- a/dbms/src/Common/FailPoint.cpp +++ b/dbms/src/Common/FailPoint.cpp @@ -12,7 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include +#include +#include +#include +#include +#include #include #include @@ -21,7 +27,6 @@ namespace DB { std::unordered_map> FailPointHelper::fail_point_wait_channels; - #define APPLY_FOR_FAILPOINTS_ONCE(M) \ M(exception_between_drop_meta_and_data) \ M(exception_between_alter_data_and_meta) \ @@ -109,6 +114,22 @@ std::unordered_map> FailPointHelper::f M(pause_query_init) +#define APPLY_FOR_RANDOM_FAILPOINTS(M) \ + M(random_tunnel_wait_timeout_failpoint) \ + M(random_tunnel_init_rpc_failure_failpoint) \ + M(random_receiver_sync_msg_push_failure_failpoint) \ + M(random_receiver_async_msg_push_failure_failpoint) \ + M(random_limit_check_failpoint) \ + M(random_join_build_failpoint) \ + M(random_join_prob_failpoint) \ + M(random_aggregate_create_state_failpoint) \ + M(random_aggregate_merge_failpoint) \ + M(random_sharedquery_failpoint) \ + M(random_interpreter_failpoint) \ + M(random_task_lifecycle_failpoint) \ + M(random_task_manager_find_task_failure_failpoint) \ + M(random_min_tso_scheduler_failpoint) + namespace FailPoints { #define M(NAME) extern const char(NAME)[] = #NAME ""; @@ -116,6 +137,7 @@ APPLY_FOR_FAILPOINTS_ONCE(M) APPLY_FOR_FAILPOINTS(M) APPLY_FOR_PAUSEABLE_FAILPOINTS_ONCE(M) APPLY_FOR_PAUSEABLE_FAILPOINTS(M) +APPLY_FOR_RANDOM_FAILPOINTS(M) #undef M } // namespace FailPoints @@ -179,7 +201,7 @@ void FailPointHelper::enableFailPoint(const String & fail_point_name) #undef M #undef SUB_M - throw Exception("Cannot find fail point " + fail_point_name, ErrorCodes::FAIL_POINT_ERROR); + throw Exception(fmt::format("Cannot find fail point {}", fail_point_name), ErrorCodes::FAIL_POINT_ERROR); } void FailPointHelper::disableFailPoint(const String & fail_point_name) @@ -204,6 +226,41 @@ void FailPointHelper::wait(const String & fail_point_name) ptr->wait(); } } + +void FailPointHelper::initRandomFailPoints(Poco::Util::LayeredConfiguration & config, Poco::Logger * log) +{ + String random_fail_point_cfg = config.getString("flash.random_fail_points", ""); + if (random_fail_point_cfg.empty()) + return; + + Poco::StringTokenizer string_tokens(random_fail_point_cfg, ","); + for (const auto & string_token : string_tokens) + { + Poco::StringTokenizer pair_tokens(string_token, "-"); + RUNTIME_ASSERT((pair_tokens.count() == 2), log, "RandomFailPoints config should be FailPointA-RatioA,FailPointB-RatioB,... format"); + double rate = atof(pair_tokens[1].c_str()); //NOLINT(cert-err34-c): check conversion error manually + RUNTIME_ASSERT((0 <= rate && rate <= 1.0), log, "RandomFailPoint trigger rate should in [0,1], while {}", rate); + enableRandomFailPoint(pair_tokens[0], rate); + } + LOG_FMT_INFO(log, "Enable RandomFailPoints: {}", random_fail_point_cfg); +} + +void FailPointHelper::enableRandomFailPoint(const String & fail_point_name, double rate) +{ +#define SUB_M(NAME) \ + if (fail_point_name == FailPoints::NAME) \ + { \ + fiu_enable_random(FailPoints::NAME, 1, nullptr, 0, rate); \ + return; \ + } + +#define M(NAME) SUB_M(NAME) + APPLY_FOR_RANDOM_FAILPOINTS(M) +#undef M +#undef SUB_M + + throw Exception(fmt::format("Cannot find fail point {}", fail_point_name), ErrorCodes::FAIL_POINT_ERROR); +} #else class FailPointChannel { @@ -214,6 +271,10 @@ void FailPointHelper::enableFailPoint(const String &) {} void FailPointHelper::disableFailPoint(const String &) {} void FailPointHelper::wait(const String &) {} + +void FailPointHelper::initRandomFailPoints(Poco::Util::LayeredConfiguration & config, Poco::Logger * log) {} + +void FailPointHelper::enableRandomFailPoint(const String & fail_point_name, double rate) {} #endif } // namespace DB diff --git a/dbms/src/Common/FailPoint.h b/dbms/src/Common/FailPoint.h index 2cf40ad55e4..31df2dbdcd2 100644 --- a/dbms/src/Common/FailPoint.h +++ b/dbms/src/Common/FailPoint.h @@ -21,6 +21,15 @@ #include +namespace Poco +{ +class Logger; +namespace Util +{ +class LayeredConfiguration; +} +} // namespace Poco + namespace DB { namespace ErrorCodes @@ -35,7 +44,6 @@ extern const int FAIL_POINT_ERROR; // When `fail_point` is enabled, wait till it is disabled #define FAIL_POINT_PAUSE(fail_point) fiu_do_on(fail_point, FailPointHelper::wait(fail_point);) - class FailPointChannel; class FailPointHelper { @@ -46,6 +54,16 @@ class FailPointHelper static void wait(const String & fail_point_name); + /* + * For Server RandomFailPoint test usage. When FIU_ENABLE is defined, this function does the following work: + * 1. Return if TiFlash config has empty flash.random_fail_points cfg + * 2. Parse flash.random_fail_points, which expect to has "FailPointA-RatioA,FailPointB-RatioB,..." format + * 3. Call enableRandomFailPoint method with parsed FailPointName and Rate + */ + static void initRandomFailPoints(Poco::Util::LayeredConfiguration & config, Poco::Logger * log); + + static void enableRandomFailPoint(const String & fail_point_name, double rate); + private: static std::unordered_map> fail_point_wait_channels; }; diff --git a/dbms/src/Common/wrapInvocable.h b/dbms/src/Common/wrapInvocable.h index d6cee519835..1c93bb3e782 100644 --- a/dbms/src/Common/wrapInvocable.h +++ b/dbms/src/Common/wrapInvocable.h @@ -35,7 +35,6 @@ inline auto wrapInvocable(bool propagate_memory_tracker, Func && func, Args &&.. // run the task with the parameters provided return std::apply(std::move(func), std::move(args)); }; - return capture; } } // namespace DB diff --git a/dbms/src/DataStreams/SharedQueryBlockInputStream.h b/dbms/src/DataStreams/SharedQueryBlockInputStream.h index e7cece67f0b..d7c0707b5aa 100644 --- a/dbms/src/DataStreams/SharedQueryBlockInputStream.h +++ b/dbms/src/DataStreams/SharedQueryBlockInputStream.h @@ -14,6 +14,7 @@ #pragma once +#include #include #include #include @@ -24,6 +25,11 @@ namespace DB { +namespace FailPoints +{ +extern const char random_sharedquery_failpoint[]; +} // namespace FailPoints + /** This block input stream is used by SharedQuery. * It enable multiple threads read from one stream. */ @@ -136,6 +142,7 @@ class SharedQueryBlockInputStream : public IProfilingBlockInputStream in->readPrefix(); while (true) { + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_sharedquery_failpoint); Block block = in->read(); // in is finished or queue is canceled if (!block || !queue.push(block)) diff --git a/dbms/src/DataStreams/SizeLimits.cpp b/dbms/src/DataStreams/SizeLimits.cpp index 7dd5e1524ba..4d1bfaae997 100644 --- a/dbms/src/DataStreams/SizeLimits.cpp +++ b/dbms/src/DataStreams/SizeLimits.cpp @@ -12,22 +12,30 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include -#include #include -#include +#include +#include +#include +#include namespace DB { +namespace FailPoints +{ +extern const char random_limit_check_failpoint[]; +} // namespace FailPoints bool SizeLimits::check(UInt64 rows, UInt64 bytes, const char * what, int exception_code) const { - if (max_rows && rows > max_rows) + bool rows_exceed_limit = max_rows && rows > max_rows; + fiu_do_on(FailPoints::random_limit_check_failpoint, rows_exceed_limit = true;); + if (rows_exceed_limit) { if (overflow_mode == OverflowMode::THROW) throw Exception("Limit for " + std::string(what) + " exceeded, max rows: " + formatReadableQuantity(max_rows) - + ", current rows: " + formatReadableQuantity(rows), exception_code); + + ", current rows: " + formatReadableQuantity(rows), + exception_code); else return false; } @@ -36,7 +44,8 @@ bool SizeLimits::check(UInt64 rows, UInt64 bytes, const char * what, int excepti { if (overflow_mode == OverflowMode::THROW) throw Exception("Limit for " + std::string(what) + " exceeded, max bytes: " + formatReadableSizeWithBinarySuffix(max_bytes) - + ", current bytes: " + formatReadableSizeWithBinarySuffix(bytes), exception_code); + + ", current bytes: " + formatReadableSizeWithBinarySuffix(bytes), + exception_code); else return false; } @@ -44,4 +53,4 @@ bool SizeLimits::check(UInt64 rows, UInt64 bytes, const char * what, int excepti return true; } -} +} // namespace DB diff --git a/dbms/src/Flash/EstablishCall.cpp b/dbms/src/Flash/EstablishCall.cpp index 8af81e30962..89857a2407e 100644 --- a/dbms/src/Flash/EstablishCall.cpp +++ b/dbms/src/Flash/EstablishCall.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include @@ -19,6 +20,11 @@ namespace DB { +namespace FailPoints +{ +extern const char random_tunnel_init_rpc_failure_failpoint[]; +} // namespace FailPoints + EstablishCallData::EstablishCallData(AsyncFlashService * service, grpc::ServerCompletionQueue * cq, grpc::ServerCompletionQueue * notify_cq, const std::shared_ptr> & is_shutdown) : service(service) , cq(cq) @@ -71,6 +77,7 @@ void EstablishCallData::initRpc() std::exception_ptr eptr = nullptr; try { + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_tunnel_init_rpc_failure_failpoint); service->establishMPPConnectionSyncOrAsync(&ctx, &request, nullptr, this); } catch (...) diff --git a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp index f194afee31f..ec8bde51469 100644 --- a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp +++ b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp @@ -13,6 +13,7 @@ // limitations under the License. #include +#include #include #include #include @@ -22,6 +23,12 @@ namespace DB { +namespace FailPoints +{ +extern const char random_receiver_sync_msg_push_failure_failpoint[]; +extern const char random_receiver_async_msg_push_failure_failpoint[]; +} // namespace FailPoints + namespace { String getReceiverStateStr(const ExchangeReceiverState & s) @@ -257,7 +264,9 @@ class AsyncRequestHandler : public UnaryCallback recv_msg->packet = std::move(packet); recv_msg->source_index = request->source_index; recv_msg->req_info = req_info; - if (!msg_channel->push(std::move(recv_msg))) + bool push_success = msg_channel->push(std::move(recv_msg)); + fiu_do_on(FailPoints::random_receiver_async_msg_push_failure_failpoint, push_success = false;); + if (!push_success) return false; // can't reuse packet since it is sent to readers. packet = std::make_shared(); @@ -483,7 +492,9 @@ void ExchangeReceiverBase::readLoop(const Request & req) if (recv_msg->packet->has_error()) throw Exception("Exchange receiver meet error : " + recv_msg->packet->error().msg()); - if (!msg_channel.push(std::move(recv_msg))) + bool push_success = msg_channel.push(std::move(recv_msg)); + fiu_do_on(FailPoints::random_receiver_sync_msg_push_failure_failpoint, push_success = false;); + if (!push_success) { meet_error = true; auto local_state = getState(); diff --git a/dbms/src/Flash/Mpp/MPPTask.cpp b/dbms/src/Flash/Mpp/MPPTask.cpp index 0381bbdfa04..ac084ba4550 100644 --- a/dbms/src/Flash/Mpp/MPPTask.cpp +++ b/dbms/src/Flash/Mpp/MPPTask.cpp @@ -51,6 +51,7 @@ extern const char exception_before_mpp_register_tunnel_for_root_mpp_task[]; extern const char exception_during_mpp_register_tunnel_for_non_root_mpp_task[]; extern const char exception_during_mpp_write_err_to_tunnel[]; extern const char force_no_local_region_for_mpp_task[]; +extern const char random_task_lifecycle_failpoint[]; } // namespace FailPoints MPPTask::MPPTask(const mpp::TaskMeta & meta_, const ContextPtr & context_) @@ -394,7 +395,15 @@ void MPPTask::runImpl() writeErrToAllTunnels(err_msg); } LOG_FMT_INFO(log, "task ends, time cost is {} ms.", stopwatch.elapsedMilliseconds()); - unregisterTask(); + // unregister flag is only for FailPoint usage, to produce the situation that MPPTask is destructed + // by grpc CancelMPPTask thread; + bool unregister = true; + fiu_do_on(FailPoints::random_task_lifecycle_failpoint, { + if (!err_msg.empty()) + unregister = false; + }); + if (unregister) + unregisterTask(); if (switchStatus(RUNNING, FINISHED)) LOG_INFO(log, "finish task"); diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.cpp b/dbms/src/Flash/Mpp/MPPTaskManager.cpp index 531f8f7a10d..3df4af5de5f 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.cpp +++ b/dbms/src/Flash/Mpp/MPPTaskManager.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include @@ -22,6 +23,11 @@ namespace DB { +namespace FailPoints +{ +extern const char random_task_manager_find_task_failure_failpoint[]; +} // namespace FailPoints + MPPTaskManager::MPPTaskManager(MPPTaskSchedulerPtr scheduler_) : scheduler(std::move(scheduler_)) , log(&Poco::Logger::get("TaskManager")) @@ -50,6 +56,7 @@ MPPTaskPtr MPPTaskManager::findTaskWithTimeout(const mpp::TaskMeta & meta, std:: it = query_it->second->task_map.find(id); return it != query_it->second->task_map.end(); }); + fiu_do_on(FailPoints::random_task_manager_find_task_failure_failpoint, ret = false;); if (cancelled) { errMsg = fmt::format("Task [{},{}] has been cancelled.", meta.start_ts(), meta.task_id()); diff --git a/dbms/src/Flash/Mpp/MPPTunnel.cpp b/dbms/src/Flash/Mpp/MPPTunnel.cpp index 826e7fea88a..13a7eaad95e 100644 --- a/dbms/src/Flash/Mpp/MPPTunnel.cpp +++ b/dbms/src/Flash/Mpp/MPPTunnel.cpp @@ -25,6 +25,7 @@ namespace DB namespace FailPoints { extern const char exception_during_mpp_close_tunnel[]; +extern const char random_tunnel_wait_timeout_failpoint[]; } // namespace FailPoints template @@ -322,6 +323,7 @@ void MPPTunnelBase::waitUntilConnectedOrFinished(std::unique_lock #include #include #include namespace DB { +namespace FailPoints +{ +extern const char random_min_tso_scheduler_failpoint[]; +} // namespace FailPoints + constexpr UInt64 MAX_UINT64 = std::numeric_limits::max(); constexpr UInt64 OS_THREAD_SOFT_LIMIT = 100000; @@ -193,7 +199,9 @@ bool MinTSOScheduler::scheduleImp(const UInt64 tso, const MPPQueryTaskSetPtr & q } else { - if (tso <= min_tso) /// the min_tso query should fully run, otherwise throw errors here. + bool is_tso_min = tso <= min_tso; + fiu_do_on(FailPoints::random_min_tso_scheduler_failpoint, is_tso_min = true;); + if (is_tso_min) /// the min_tso query should fully run, otherwise throw errors here. { has_error = true; auto msg = fmt::format("threads are unavailable for the query {} ({} min_tso {}) {}, need {}, but used {} of the thread hard limit {}, {} active and {} waiting queries.", tso, tso == min_tso ? "is" : "is newer than", min_tso, isWaiting ? "from the waiting set" : "when directly schedule it", needed_threads, estimated_thread_usage, thread_hard_limit, active_set.size(), waiting_set.size()); diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index 6a39bc333a8..6cb947a1bfa 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -48,6 +49,11 @@ extern const int CANNOT_MERGE_DIFFERENT_AGGREGATED_DATA_VARIANTS; extern const int LOGICAL_ERROR; } // namespace ErrorCodes +namespace FailPoints +{ +extern const char random_aggregate_create_state_failpoint[]; +extern const char random_aggregate_merge_failpoint[]; +} // namespace FailPoints AggregatedDataVariants::~AggregatedDataVariants() { @@ -317,6 +323,7 @@ void Aggregator::createAggregateStates(AggregateDataPtr & aggregate_data) const * In order that then everything is properly destroyed, we "roll back" some of the created states. * The code is not very convenient. */ + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_aggregate_create_state_failpoint); aggregate_functions[j]->create(aggregate_data + offsets_of_aggregate_states[j]); } catch (...) @@ -1504,6 +1511,8 @@ class MergingAndConvertingBlockInputStream : public IProfilingBlockInputStream if (current_bucket_num >= NUM_BUCKETS) return {}; + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_aggregate_merge_failpoint); + AggregatedDataVariantsPtr & first = data[0]; if (current_bucket_num == -1) diff --git a/dbms/src/Interpreters/Join.cpp b/dbms/src/Interpreters/Join.cpp index 820618a6e8b..181ebcaaa64 100644 --- a/dbms/src/Interpreters/Join.cpp +++ b/dbms/src/Interpreters/Join.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -26,9 +27,17 @@ #include #include #include +#include + namespace DB { +namespace FailPoints +{ +extern const char random_join_build_failpoint[]; +extern const char random_join_prob_failpoint[]; +} // namespace FailPoints + namespace ErrorCodes { extern const int UNKNOWN_SET_DATA_VARIANT; @@ -621,6 +630,7 @@ void NO_INLINE insertFromBlockImplTypeCaseWithLock( } for (size_t insert_index = 0; insert_index < segment_index_info.size(); insert_index++) { + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_join_build_failpoint); size_t segment_index = (insert_index + stream_index) % segment_index_info.size(); if (segment_index == segment_size) { @@ -1513,7 +1523,7 @@ void Join::joinBlockImpl(Block & block, const Maps & maps) const default: throw Exception("Unknown JOIN keys variant.", ErrorCodes::UNKNOWN_SET_DATA_VARIANT); } - + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_join_prob_failpoint); for (size_t i = 0; i < num_columns_to_add; ++i) { const ColumnWithTypeAndName & sample_col = sample_block_with_columns_to_add.getByPosition(i); diff --git a/dbms/src/Interpreters/executeQuery.cpp b/dbms/src/Interpreters/executeQuery.cpp index 96cfc0a58ae..78ad4b41ce6 100644 --- a/dbms/src/Interpreters/executeQuery.cpp +++ b/dbms/src/Interpreters/executeQuery.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include @@ -53,7 +54,10 @@ extern const int LOGICAL_ERROR; extern const int QUERY_IS_TOO_LARGE; extern const int INTO_OUTFILE_NOT_ALLOWED; } // namespace ErrorCodes - +namespace FailPoints +{ +extern const char random_interpreter_failpoint[]; +} // namespace FailPoints namespace { void checkASTSizeLimits(const IAST & ast, const Settings & settings) @@ -226,6 +230,7 @@ std::tuple executeQueryImpl( context.setProcessListElement(&process_list_entry->get()); } + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::random_interpreter_failpoint); auto interpreter = query_src.interpreter(context, stage); res = interpreter->execute(); diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 1bb35e51866..571ba8fe3a5 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -977,6 +978,7 @@ int Server::main(const std::vector & /*args*/) Poco::Logger * log = &logger(); #ifdef FIU_ENABLE fiu_init(0); // init failpoint + FailPointHelper::initRandomFailPoints(config(), log); #endif UpdateMallocConfig(log);