diff --git a/contrib/kvproto b/contrib/kvproto index 26e28e6a281..9ccc6beaf0a 160000 --- a/contrib/kvproto +++ b/contrib/kvproto @@ -1 +1 @@ -Subproject commit 26e28e6a281abb927f91ef992eb8f93b39698ffa +Subproject commit 9ccc6beaf0aa9b0a4adad43b497348898ba653cf diff --git a/dbms/src/Common/TiFlashMetrics.h b/dbms/src/Common/TiFlashMetrics.h index 2ef8fd6cb01..a23bf77eb75 100644 --- a/dbms/src/Common/TiFlashMetrics.h +++ b/dbms/src/Common/TiFlashMetrics.h @@ -104,7 +104,8 @@ namespace DB F(type_exchange_partition, {"type", "exchange_partition"})) \ M(tiflash_schema_apply_duration_seconds, "Bucketed histogram of ddl apply duration", Histogram, \ F(type_ddl_apply_duration, {{"req", "ddl_apply_duration"}}, ExpBuckets{0.001, 2, 20})) \ - M(tiflash_raft_read_index_count, "Total number of raft read index", Counter) \ + M(tiflash_raft_read_index_count, "Total number of raft read index", Counter) \ + M(tiflash_stale_read_count, "Total number of stale read", Counter) \ M(tiflash_raft_read_index_duration_seconds, "Bucketed histogram of raft read index duration", Histogram, \ F(type_raft_read_index_duration, {{"type", "tmt_raft_read_index_duration"}}, ExpBuckets{0.001, 2, 20})) \ M(tiflash_raft_wait_index_duration_seconds, "Bucketed histogram of raft wait index duration", Histogram, \ @@ -232,7 +233,7 @@ namespace DB F(type_active_rpc_async_worker, {"type", "rpc_async_worker_active"}), \ F(type_total_rpc_async_worker, {"type", "rpc_async_worker_total"})) \ M(tiflash_task_scheduler, "Min-tso task scheduler", Gauge, \ - F(type_min_tso, {"type", "min_tso"}), \ + F(type_min_query_ts, {"type", "min_query_ts"}), \ F(type_waiting_queries_count, {"type", "waiting_queries_count"}), \ F(type_active_queries_count, {"type", "active_queries_count"}), \ F(type_waiting_tasks_count, {"type", "waiting_tasks_count"}), \ diff --git a/dbms/src/Debug/DAGProperties.h b/dbms/src/Debug/DAGProperties.h index bcb4170c9ac..c4bfcc9b863 100644 --- a/dbms/src/Debug/DAGProperties.h +++ b/dbms/src/Debug/DAGProperties.h @@ -29,6 +29,11 @@ struct DAGProperties bool use_broadcast_join = false; Int32 mpp_partition_num = 1; Timestamp start_ts = DEFAULT_MAX_READ_TSO; + UInt64 query_ts = 0; + UInt64 server_id = 1; + UInt64 local_query_id = 1; + Int64 task_id = 1; + Int32 mpp_timeout = 10; }; } // namespace DB \ No newline at end of file diff --git a/dbms/src/Debug/MockComputeServerManager.cpp b/dbms/src/Debug/MockComputeServerManager.cpp index 118316bf1cf..1568369db52 100644 --- a/dbms/src/Debug/MockComputeServerManager.cpp +++ b/dbms/src/Debug/MockComputeServerManager.cpp @@ -117,11 +117,14 @@ void MockComputeServerManager::addServer(size_t partition_id, std::unique_ptrset_start_ts(start_ts); + auto [query_ts, local_query_id_, server_id] = MPPTaskId::decodeQueryID(query_id); + meta->set_query_ts(query_ts); + meta->set_local_query_id(local_query_id_); + meta->set_server_id(server_id); mpp::CancelTaskResponse response; for (const auto & server : server_map) server.second->flashService()->cancelMPPTaskForTest(&req, &response); diff --git a/dbms/src/Debug/MockComputeServerManager.h b/dbms/src/Debug/MockComputeServerManager.h index 7e05224572b..06563212988 100644 --- a/dbms/src/Debug/MockComputeServerManager.h +++ b/dbms/src/Debug/MockComputeServerManager.h @@ -22,7 +22,6 @@ namespace DB::tests { - /** Hold Mock Compute Server to manage the lifetime of them. * Maintains Mock Compute Server info. */ @@ -49,7 +48,7 @@ class MockComputeServerManager : public ext::Singleton void resetMockMPPServerInfo(size_t partition_num); - void cancelQuery(size_t start_ts); + void cancelQuery(String query_id); static String queryInfo(); diff --git a/dbms/src/Debug/MockExecutor/AstToPB.h b/dbms/src/Debug/MockExecutor/AstToPB.h index 518d04f89b9..c1560c90355 100644 --- a/dbms/src/Debug/MockExecutor/AstToPB.h +++ b/dbms/src/Debug/MockExecutor/AstToPB.h @@ -57,6 +57,9 @@ using MPPCtxPtr = std::shared_ptr; struct MPPInfo { Timestamp start_ts; + UInt64 query_ts; + UInt64 server_id; + UInt64 local_query_id; Int64 partition_id; Int64 task_id; const std::vector sender_target_task_ids; @@ -64,11 +67,17 @@ struct MPPInfo MPPInfo( Timestamp start_ts_, + UInt64 query_ts_, + UInt64 server_id_, + UInt64 local_query_id_, Int64 partition_id_, Int64 task_id_, const std::vector & sender_target_task_ids_, const std::unordered_map> & receiver_source_task_ids_map_) : start_ts(start_ts_) + , query_ts(query_ts_) + , server_id(server_id_) + , local_query_id(local_query_id_) , partition_id(partition_id_) , task_id(task_id_) , sender_target_task_ids(sender_target_task_ids_) diff --git a/dbms/src/Debug/MockExecutor/ExchangeReceiverBinder.cpp b/dbms/src/Debug/MockExecutor/ExchangeReceiverBinder.cpp index e7f0491b74f..706624856c0 100644 --- a/dbms/src/Debug/MockExecutor/ExchangeReceiverBinder.cpp +++ b/dbms/src/Debug/MockExecutor/ExchangeReceiverBinder.cpp @@ -42,6 +42,9 @@ bool ExchangeReceiverBinder::toTiPBExecutor(tipb::Executor * tipb_executor, int3 { mpp::TaskMeta meta; meta.set_start_ts(mpp_info.start_ts); + meta.set_query_ts(mpp_info.query_ts); + meta.set_server_id(mpp_info.server_id); + meta.set_local_query_id(mpp_info.local_query_id); meta.set_task_id(it->second[i]); meta.set_partition_id(i); auto addr = context.isMPPTest() ? tests::MockComputeServerManager::instance().getServerConfigMap()[i].addr : Debug::LOCAL_HOST; diff --git a/dbms/src/Debug/MockExecutor/ExchangeSenderBinder.cpp b/dbms/src/Debug/MockExecutor/ExchangeSenderBinder.cpp index 065d983cb60..aaba39868e1 100644 --- a/dbms/src/Debug/MockExecutor/ExchangeSenderBinder.cpp +++ b/dbms/src/Debug/MockExecutor/ExchangeSenderBinder.cpp @@ -41,6 +41,9 @@ bool ExchangeSenderBinder::toTiPBExecutor(tipb::Executor * tipb_executor, int32_ { mpp::TaskMeta meta; meta.set_start_ts(mpp_info.start_ts); + meta.set_query_ts(mpp_info.query_ts); + meta.set_server_id(mpp_info.server_id); + meta.set_local_query_id(mpp_info.local_query_id); meta.set_task_id(task_id); meta.set_partition_id(i); auto addr = context.isMPPTest() ? tests::MockComputeServerManager::instance().getServerConfigMap()[i++].addr : Debug::LOCAL_HOST; diff --git a/dbms/src/Debug/dbgQueryCompiler.h b/dbms/src/Debug/dbgQueryCompiler.h index 2ab86df6dad..748b14d41e8 100644 --- a/dbms/src/Debug/dbgQueryCompiler.h +++ b/dbms/src/Debug/dbgQueryCompiler.h @@ -132,6 +132,9 @@ struct QueryFragment { MPPInfo mpp_info( properties.start_ts, + properties.query_ts, + properties.server_id, + properties.local_query_id, partition_id, task_ids[partition_id], sender_target_task_ids, @@ -141,7 +144,7 @@ struct QueryFragment } else { - MPPInfo mpp_info(properties.start_ts, /*partition_id*/ -1, /*task_id*/ -1, /*sender_target_task_ids*/ {}, /*receiver_source_task_ids_map*/ {}); + MPPInfo mpp_info(properties.start_ts, properties.query_ts, properties.server_id, properties.local_query_id, /*partition_id*/ -1, /*task_id*/ -1, /*sender_target_task_ids*/ {}, /*receiver_source_task_ids_map*/ {}); ret.push_back(toQueryTask(properties, mpp_info, context)); } return ret; diff --git a/dbms/src/Debug/dbgQueryExecutor.cpp b/dbms/src/Debug/dbgQueryExecutor.cpp index 579c9adbacb..65d1de67c29 100644 --- a/dbms/src/Debug/dbgQueryExecutor.cpp +++ b/dbms/src/Debug/dbgQueryExecutor.cpp @@ -42,6 +42,9 @@ BlockInputStreamPtr constructExchangeReceiverStream(Context & context, tipb::Exc mpp::TaskMeta root_tm; root_tm.set_start_ts(properties.start_ts); + root_tm.set_query_ts(properties.query_ts); + root_tm.set_local_query_id(properties.local_query_id); + root_tm.set_server_id(properties.server_id); root_tm.set_address(root_addr); root_tm.set_task_id(-1); root_tm.set_partition_id(-1); @@ -71,6 +74,9 @@ BlockInputStreamPtr prepareRootExchangeReceiver(Context & context, const DAGProp { mpp::TaskMeta tm; tm.set_start_ts(properties.start_ts); + tm.set_query_ts(properties.query_ts); + tm.set_local_query_id(properties.local_query_id); + tm.set_server_id(properties.server_id); tm.set_address(Debug::LOCAL_HOST); tm.set_task_id(root_task_id); tm.set_partition_id(-1); @@ -84,6 +90,9 @@ void prepareExchangeReceiverMetaWithMultipleContext(tipb::ExchangeReceiver & tip { mpp::TaskMeta tm; tm.set_start_ts(properties.start_ts); + tm.set_query_ts(properties.query_ts); + tm.set_local_query_id(properties.local_query_id); + tm.set_server_id(properties.server_id); tm.set_address(addr); tm.set_task_id(task_id); tm.set_partition_id(-1); @@ -109,6 +118,9 @@ void prepareDispatchTaskRequest(QueryTask & task, std::shared_ptrmutable_meta(); tm->set_start_ts(properties.start_ts); + tm->set_query_ts(properties.query_ts); + tm->set_local_query_id(properties.local_query_id); + tm->set_server_id(properties.server_id); tm->set_partition_id(task.partition_id); tm->set_address(addr); tm->set_task_id(task.task_id); @@ -128,6 +140,9 @@ void prepareDispatchTaskRequestWithMultipleContext(QueryTask & task, std::shared } auto * tm = req->mutable_meta(); tm->set_start_ts(properties.start_ts); + tm->set_query_ts(properties.query_ts); + tm->set_local_query_id(properties.local_query_id); + tm->set_server_id(properties.server_id); tm->set_partition_id(task.partition_id); tm->set_address(addr); tm->set_task_id(task.task_id); diff --git a/dbms/src/Flash/Coprocessor/DAGContext.h b/dbms/src/Flash/Coprocessor/DAGContext.h index 1aaf583db14..c714eeaa17e 100644 --- a/dbms/src/Flash/Coprocessor/DAGContext.h +++ b/dbms/src/Flash/Coprocessor/DAGContext.h @@ -158,7 +158,7 @@ class DAGContext , flags(dag_request->flags()) , sql_mode(dag_request->sql_mode()) , mpp_task_meta(meta_) - , mpp_task_id(mpp_task_meta.start_ts(), mpp_task_meta.task_id()) + , mpp_task_id(mpp_task_meta.start_ts(), mpp_task_meta.task_id(), mpp_task_meta.server_id(), mpp_task_meta.query_ts(), mpp_task_meta.local_query_id()) , max_recorded_error_count(getMaxErrorCount(*dag_request)) , warnings(max_recorded_error_count) , warning_count(0) diff --git a/dbms/src/Flash/FlashService.cpp b/dbms/src/Flash/FlashService.cpp index cb62855d6c1..906bd75633c 100644 --- a/dbms/src/Flash/FlashService.cpp +++ b/dbms/src/Flash/FlashService.cpp @@ -341,7 +341,7 @@ grpc::Status FlashService::CancelMPPTask( auto & tmt_context = context->getTMTContext(); auto task_manager = tmt_context.getMPPTaskManager(); - task_manager->abortMPPQuery(request->meta().start_ts(), "Receive cancel request from TiDB", AbortType::ONCANCELLATION); + task_manager->abortMPPQuery(MPPTaskId::generateQueryID(request->meta().query_ts(), request->meta().local_query_id(), request->meta().server_id(), request->meta().start_ts()), "Receive cancel request from TiDB", AbortType::ONCANCELLATION); return grpc::Status::OK; } @@ -383,7 +383,7 @@ ::grpc::Status FlashService::cancelMPPTaskForTest(const ::mpp::CancelTaskRequest } auto & tmt_context = context->getTMTContext(); auto task_manager = tmt_context.getMPPTaskManager(); - task_manager->abortMPPQuery(request->meta().start_ts(), "Receive cancel request from GTest", AbortType::ONCANCELLATION); + task_manager->abortMPPQuery(MPPTaskId::generateQueryID(request->meta().query_ts(), request->meta().local_query_id(), request->meta().server_id(), request->meta().start_ts()), "Receive cancel request from GTest", AbortType::ONCANCELLATION); return grpc::Status::OK; } diff --git a/dbms/src/Flash/Mpp/MPPTask.cpp b/dbms/src/Flash/Mpp/MPPTask.cpp index f8f1829545e..1d907a7b902 100644 --- a/dbms/src/Flash/Mpp/MPPTask.cpp +++ b/dbms/src/Flash/Mpp/MPPTask.cpp @@ -54,7 +54,7 @@ extern const char force_no_local_region_for_mpp_task[]; MPPTask::MPPTask(const mpp::TaskMeta & meta_, const ContextPtr & context_) : meta(meta_) - , id(meta.start_ts(), meta.task_id()) + , id(meta.start_ts(), meta.task_id(), meta.server_id(), meta.query_ts(), meta.local_query_id()) , context(context_) , manager(context_->getTMTContext().getMPPTaskManager().get()) , schedule_entry(manager, id) @@ -139,7 +139,7 @@ void MPPTask::registerTunnels(const mpp::DispatchTaskRequest & task_request) LOG_DEBUG(log, "begin to register the tunnel {}, is_local: {}, is_async: {}", tunnel->id(), is_local, is_async); if (status != INITIALIZING) throw Exception(fmt::format("The tunnel {} can not be registered, because the task is not in initializing state", tunnel->id())); - tunnel_set_local->registerTunnel(MPPTaskId{task_meta.start_ts(), task_meta.task_id()}, tunnel); + tunnel_set_local->registerTunnel(MPPTaskId{task_meta.start_ts(), task_meta.task_id(), task_meta.server_id(), task_meta.query_ts(), task_meta.local_query_id()}, tunnel); if (!dag_context->isRootMPPTask()) { FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_during_mpp_register_tunnel_for_non_root_mpp_task); @@ -204,7 +204,7 @@ std::pair MPPTask::getTunnel(const ::mpp::EstablishMPPConn return {nullptr, err_msg}; } - MPPTaskId receiver_id{request->receiver_meta().start_ts(), request->receiver_meta().task_id()}; + MPPTaskId receiver_id{request->receiver_meta().start_ts(), request->receiver_meta().task_id(), request->receiver_meta().server_id(), request->receiver_meta().query_ts(), request->receiver_meta().local_query_id()}; RUNTIME_ASSERT(tunnel_set != nullptr, log, "mpp task without tunnel set"); auto tunnel_ptr = tunnel_set->getTunnelByReceiverTaskId(receiver_id); if (tunnel_ptr == nullptr) @@ -449,7 +449,7 @@ void MPPTask::runImpl() void MPPTask::handleError(const String & error_msg) { auto updated_msg = fmt::format("From {}: {}", id.toString(), error_msg); - manager->abortMPPQuery(id.start_ts, updated_msg, AbortType::ONERROR); + manager->abortMPPQuery(id.query_id, updated_msg, AbortType::ONERROR); if (!registered) // if the task is not registered, need to cancel it explicitly abort(error_msg, AbortType::ONERROR); diff --git a/dbms/src/Flash/Mpp/MPPTaskId.cpp b/dbms/src/Flash/Mpp/MPPTaskId.cpp index bb025e6c24d..b5c2456e4ba 100644 --- a/dbms/src/Flash/Mpp/MPPTaskId.cpp +++ b/dbms/src/Flash/Mpp/MPPTaskId.cpp @@ -13,19 +13,42 @@ // limitations under the License. #include +#include #include namespace DB { String MPPTaskId::toString() const { - return isUnknown() ? "MPP" : fmt::format("MPP", start_ts, task_id); + return isUnknown() ? "MPP" : fmt::format("MPP", query_id, start_ts, query_ts, task_id, server_id, local_query_id); } const MPPTaskId MPPTaskId::unknown_mpp_task_id = MPPTaskId{}; +constexpr UInt64 MAX_UINT64 = std::numeric_limits::max(); +constexpr UInt64 MAX_INT64 = std::numeric_limits::max(); +const String MPPTaskId::Max_Query_Id = generateQueryID(MAX_INT64, MAX_INT64, MAX_UINT64, MAX_UINT64); + +String MPPTaskId::generateQueryID(UInt64 query_ts_, UInt64 local_query_id_, UInt64 server_id_, UInt64 start_ts_) +{ + if (local_query_id_ == 0) + { + return intToHex(0) + intToHex(0) + intToHex(start_ts_); + } + return intToHex(query_ts_) + intToHex(local_query_id_) + intToHex(server_id_); +} + +std::tuple MPPTaskId::decodeQueryID(String query_id_str) +{ + UInt64 decode_query_ts = std::stoull(query_id_str.substr(0, sizeof(Int64) * 2), 0, 16); + UInt64 decode_local_query_id = std::stoull(query_id_str.substr(sizeof(Int64) * 2, sizeof(Int64) * 2), 0, 16); + UInt64 decode_server_id = std::stoull(query_id_str.substr(sizeof(Int64) * 2 * 2, sizeof(UInt64) * 2), 0, 16); + LOG_DEBUG(Logger::get(__FUNCTION__), "query_ts_={}, local_query_id_={}, server_id_={}", decode_query_ts, decode_local_query_id, decode_server_id); + return {decode_query_ts, decode_local_query_id, decode_server_id}; +} + bool operator==(const MPPTaskId & lid, const MPPTaskId & rid) { - return lid.start_ts == rid.start_ts && lid.task_id == rid.task_id; + return lid.query_id == rid.query_id && lid.task_id == rid.task_id; } } // namespace DB diff --git a/dbms/src/Flash/Mpp/MPPTaskId.h b/dbms/src/Flash/Mpp/MPPTaskId.h index f1bee057206..0c411e11f9c 100644 --- a/dbms/src/Flash/Mpp/MPPTaskId.h +++ b/dbms/src/Flash/Mpp/MPPTaskId.h @@ -23,23 +23,45 @@ struct MPPTaskId { MPPTaskId() : start_ts(0) + , server_id(0) + , query_ts(0) , task_id(unknown_task_id){}; - MPPTaskId(UInt64 start_ts_, Int64 task_id_) + MPPTaskId(UInt64 start_ts_, Int64 task_id_, UInt64 server_id_, UInt64 query_ts_, UInt64 local_query_id_) : start_ts(start_ts_) - , task_id(task_id_){}; + , server_id(server_id_) + , query_ts(query_ts_) + , task_id(task_id_) + , local_query_id(local_query_id_) + { + query_id = generateQueryID(query_ts, local_query_id, server_id, start_ts_); + } UInt64 start_ts; + UInt64 server_id; + UInt64 query_ts; Int64 task_id; + UInt64 local_query_id; + String query_id; bool isUnknown() const { return task_id == unknown_task_id; } String toString() const; - static const MPPTaskId unknown_mpp_task_id; + static const String Max_Query_Id; + static String generateQueryID(UInt64 query_ts_, UInt64 local_query_id_, UInt64 server_id_, UInt64 start_ts_); + static std::tuple decodeQueryID(String query_id_str); private: static constexpr Int64 unknown_task_id = -1; + + static String intToHex(UInt64 i) + { + std::stringstream stream; + stream << std::setfill('0') << std::setw(sizeof(UInt64) * 2) + << std::hex << i; + return stream.str(); + } }; bool operator==(const MPPTaskId & lid, const MPPTaskId & rid); @@ -53,7 +75,7 @@ class hash public: size_t operator()(const DB::MPPTaskId & id) const { - return hash()(id.start_ts) ^ hash()(id.task_id); + return hash()(id.query_id); } }; } // namespace std \ No newline at end of file diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.cpp b/dbms/src/Flash/Mpp/MPPTaskManager.cpp index bc54c5db8fc..ed3096db163 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.cpp +++ b/dbms/src/Flash/Mpp/MPPTaskManager.cpp @@ -35,7 +35,7 @@ MPPTaskManager::MPPTaskManager(MPPTaskSchedulerPtr scheduler_) , log(Logger::get()) {} -MPPQueryTaskSetPtr MPPTaskManager::addMPPQueryTaskSet(UInt64 query_id) +MPPQueryTaskSetPtr MPPTaskManager::addMPPQueryTaskSet(String query_id) { auto ptr = std::make_shared(); mpp_query_map.insert({query_id, ptr}); @@ -43,7 +43,7 @@ MPPQueryTaskSetPtr MPPTaskManager::addMPPQueryTaskSet(UInt64 query_id) return ptr; } -void MPPTaskManager::removeMPPQueryTaskSet(UInt64 query_id, bool on_abort) +void MPPTaskManager::removeMPPQueryTaskSet(String query_id, bool on_abort) { scheduler->deleteQuery(query_id, *this, on_abort); mpp_query_map.erase(query_id); @@ -53,16 +53,16 @@ void MPPTaskManager::removeMPPQueryTaskSet(UInt64 query_id, bool on_abort) std::pair MPPTaskManager::findAsyncTunnel(const ::mpp::EstablishMPPConnectionRequest * request, EstablishCallData * call_data, grpc::CompletionQueue * cq) { const auto & meta = request->sender_meta(); - MPPTaskId id{meta.start_ts(), meta.task_id()}; + MPPTaskId id{meta.start_ts(), meta.task_id(), meta.server_id(), meta.query_ts(), meta.local_query_id()}; Int64 sender_task_id = meta.task_id(); Int64 receiver_task_id = request->receiver_meta().task_id(); std::unique_lock lock(mu); - auto query_it = mpp_query_map.find(id.start_ts); + auto query_it = mpp_query_map.find(id.query_id); if (query_it != mpp_query_map.end() && !query_it->second->isInNormalState()) { /// if the query is aborted, return the error message - LOG_WARNING(log, fmt::format("Query {} is aborted, all its tasks are invalid.", id.start_ts)); + LOG_WARNING(log, fmt::format("Query {} is aborted, all its tasks are invalid.", id.query_id)); /// meet error return {nullptr, query_it->second->error_message}; } @@ -73,7 +73,7 @@ std::pair MPPTaskManager::findAsyncTunnel(const ::mpp::Est if (!call_data->isWaitingTunnelState()) { /// if call_data is in new_request state, put it to waiting tunnel state - auto query_set = query_it == mpp_query_map.end() ? addMPPQueryTaskSet(id.start_ts) : query_it->second; + auto query_set = query_it == mpp_query_map.end() ? addMPPQueryTaskSet(id.query_id) : query_it->second; auto & alarm = query_set->alarms[sender_task_id][receiver_task_id]; call_data->setToWaitingTunnelState(); alarm.Set(cq, Clock::now() + std::chrono::seconds(10), call_data); @@ -96,11 +96,11 @@ std::pair MPPTaskManager::findAsyncTunnel(const ::mpp::Est { /// if the query task set has no mpp task, it has to be removed if there is no alarms left, /// otherwise the query task set itself may be left in MPPTaskManager forever - removeMPPQueryTaskSet(id.start_ts, false); + removeMPPQueryTaskSet(id.query_id, false); cv.notify_all(); } } - return {nullptr, fmt::format("Can't find task [{},{}] within 10 s.", id.start_ts, id.task_id)}; + return {nullptr, fmt::format("Can't find task [{},{},{}] within 10 s.", id.query_id, id.start_ts, id.task_id)}; } } /// don't need to delete the alarm here because registerMPPTask will delete all the related alarm @@ -112,13 +112,13 @@ std::pair MPPTaskManager::findAsyncTunnel(const ::mpp::Est std::pair MPPTaskManager::findTunnelWithTimeout(const ::mpp::EstablishMPPConnectionRequest * request, std::chrono::seconds timeout) { const auto & meta = request->sender_meta(); - MPPTaskId id{meta.start_ts(), meta.task_id()}; + MPPTaskId id{meta.start_ts(), meta.task_id(), meta.server_id(), meta.query_ts(), meta.local_query_id()}; std::unordered_map::iterator it; bool cancelled = false; String error_message; std::unique_lock lock(mu); auto ret = cv.wait_for(lock, timeout, [&] { - auto query_it = mpp_query_map.find(id.start_ts); + auto query_it = mpp_query_map.find(id.query_id); // TODO: how about the query has been cancelled in advance? if (query_it == mpp_query_map.end()) { @@ -127,7 +127,7 @@ std::pair MPPTaskManager::findTunnelWithTimeout(const ::mp else if (!query_it->second->isInNormalState()) { /// if the query is aborted, return true to stop waiting timeout. - LOG_WARNING(log, fmt::format("Query {} is aborted, all its tasks are invalid.", id.start_ts)); + LOG_WARNING(log, fmt::format("Query {} is aborted, all its tasks are invalid.", id.query_id)); cancelled = true; error_message = query_it->second->error_message; return true; @@ -147,7 +147,7 @@ std::pair MPPTaskManager::findTunnelWithTimeout(const ::mp return it->second->getTunnel(request); } -void MPPTaskManager::abortMPPQuery(UInt64 query_id, const String & reason, AbortType abort_type) +void MPPTaskManager::abortMPPQuery(String query_id, const String & reason, AbortType abort_type) { LOG_WARNING(log, fmt::format("Begin to abort query: {}, abort type: {}, reason: {}", query_id, magic_enum::enum_name(abort_type), reason)); MPPQueryTaskSetPtr task_set; @@ -204,7 +204,7 @@ void MPPTaskManager::abortMPPQuery(UInt64 query_id, const String & reason, Abort it->second->state = MPPQueryTaskSet::Aborted; cv.notify_all(); } - LOG_WARNING(log, "Finish abort query: " + std::to_string(query_id)); + LOG_WARNING(log, "Finish abort query: " + query_id); } std::pair MPPTaskManager::registerTask(MPPTaskPtr task) @@ -214,7 +214,7 @@ std::pair MPPTaskManager::registerTask(MPPTaskPtr task) FAIL_POINT_PAUSE(FailPoints::pause_before_register_non_root_mpp_task); } std::unique_lock lock(mu); - const auto & it = mpp_query_map.find(task->id.start_ts); + const auto & it = mpp_query_map.find(task->id.query_id); if (it != mpp_query_map.end() && !it->second->isInNormalState()) { return {false, fmt::format("query is being aborted, error message = {}", it->second->error_message)}; @@ -226,7 +226,7 @@ std::pair MPPTaskManager::registerTask(MPPTaskPtr task) MPPQueryTaskSetPtr query_set; if (it == mpp_query_map.end()) /// the first one { - query_set = addMPPQueryTaskSet(task->id.start_ts); + query_set = addMPPQueryTaskSet(task->id.query_id); } else { @@ -251,7 +251,7 @@ std::pair MPPTaskManager::unregisterTask(const MPPTaskId & id) std::unique_lock lock(mu); auto it = mpp_query_map.end(); cv.wait(lock, [&] { - it = mpp_query_map.find(id.start_ts); + it = mpp_query_map.find(id.query_id); return it == mpp_query_map.end() || it->second->allowUnregisterTask(); }); if (it != mpp_query_map.end()) @@ -261,7 +261,7 @@ std::pair MPPTaskManager::unregisterTask(const MPPTaskId & id) { it->second->task_map.erase(task_it); if (it->second->task_map.empty() && it->second->alarms.empty()) - removeMPPQueryTaskSet(id.start_ts, false); + removeMPPQueryTaskSet(id.query_id, false); cv.notify_all(); return {true, ""}; } @@ -282,13 +282,13 @@ String MPPTaskManager::toString() return res + ")"; } -MPPQueryTaskSetPtr MPPTaskManager::getQueryTaskSetWithoutLock(UInt64 query_id) +MPPQueryTaskSetPtr MPPTaskManager::getQueryTaskSetWithoutLock(String query_id) { auto it = mpp_query_map.find(query_id); return it == mpp_query_map.end() ? nullptr : it->second; } -MPPQueryTaskSetPtr MPPTaskManager::getQueryTaskSet(UInt64 query_id) +MPPQueryTaskSetPtr MPPTaskManager::getQueryTaskSet(String query_id) { std::lock_guard lock(mu); return getQueryTaskSetWithoutLock(query_id); diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.h b/dbms/src/Flash/Mpp/MPPTaskManager.h index 018a8631880..ac7ce2fb24e 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.h +++ b/dbms/src/Flash/Mpp/MPPTaskManager.h @@ -55,9 +55,9 @@ struct MPPQueryTaskSet using MPPQueryTaskSetPtr = std::shared_ptr; /// a map from the mpp query id to mpp query task set, we use -/// the start ts of a query as the query id as TiDB will guarantee -/// the uniqueness of the start ts -using MPPQueryMap = std::unordered_map; +/// the query_ts + local_query_id + serverID as the query id, because TiDB can't guarantee +/// the uniqueness of the start ts when stale read or set snapshot +using MPPQueryMap = std::unordered_map; // MPPTaskManger holds all running mpp tasks. It's a single instance holden in Context. class MPPTaskManager : private boost::noncopyable @@ -77,9 +77,9 @@ class MPPTaskManager : private boost::noncopyable ~MPPTaskManager() = default; - MPPQueryTaskSetPtr getQueryTaskSetWithoutLock(UInt64 query_id); + MPPQueryTaskSetPtr getQueryTaskSetWithoutLock(String query_id); - MPPQueryTaskSetPtr getQueryTaskSet(UInt64 query_id); + MPPQueryTaskSetPtr getQueryTaskSet(String query_id); std::pair registerTask(MPPTaskPtr task); @@ -93,13 +93,13 @@ class MPPTaskManager : private boost::noncopyable std::pair findAsyncTunnel(const ::mpp::EstablishMPPConnectionRequest * request, EstablishCallData * call_data, grpc::CompletionQueue * cq); - void abortMPPQuery(UInt64 query_id, const String & reason, AbortType abort_type); + void abortMPPQuery(String query_id, const String & reason, AbortType abort_type); String toString(); private: - MPPQueryTaskSetPtr addMPPQueryTaskSet(UInt64 query_id); - void removeMPPQueryTaskSet(UInt64 query_id, bool on_abort); + MPPQueryTaskSetPtr addMPPQueryTaskSet(String query_id); + void removeMPPQueryTaskSet(String query_id, bool on_abort); }; } // namespace DB diff --git a/dbms/src/Flash/Mpp/MPPTaskStatistics.cpp b/dbms/src/Flash/Mpp/MPPTaskStatistics.cpp index 523103611ca..a4437ba10b6 100644 --- a/dbms/src/Flash/Mpp/MPPTaskStatistics.cpp +++ b/dbms/src/Flash/Mpp/MPPTaskStatistics.cpp @@ -96,14 +96,17 @@ void MPPTaskStatistics::logTracingJson() { LOG_INFO( logger, - R"({{"query_tso":{},"task_id":{},"is_root":{},"sender_executor_id":"{}","executors":{},"host":"{}")" + R"({{"query_tso":{},"query_ts":{},"task_id":{},"local_query_id":{},"server_id":{},"is_root":{},"sender_executor_id":"{}","executors":{},"host":"{}")" R"(,"task_init_timestamp":{},"task_start_timestamp":{},"task_end_timestamp":{})" R"(,"compile_start_timestamp":{},"compile_end_timestamp":{})" R"(,"read_wait_index_start_timestamp":{},"read_wait_index_end_timestamp":{})" R"(,"local_input_bytes":{},"remote_input_bytes":{},"output_bytes":{})" R"(,"status":"{}","error_message":"{}","working_time":{},"memory_peak":{}}})", id.start_ts, + id.query_ts, id.task_id, + id.local_query_id, + id.server_id, is_root, sender_executor_id, executor_statistics_collector.resToJson(), diff --git a/dbms/src/Flash/Mpp/MinTSOScheduler.cpp b/dbms/src/Flash/Mpp/MinTSOScheduler.cpp index 32816d90cd3..59e148aa549 100644 --- a/dbms/src/Flash/Mpp/MinTSOScheduler.cpp +++ b/dbms/src/Flash/Mpp/MinTSOScheduler.cpp @@ -24,11 +24,10 @@ namespace FailPoints extern const char random_min_tso_scheduler_failpoint[]; } // namespace FailPoints -constexpr UInt64 MAX_UINT64 = std::numeric_limits::max(); constexpr UInt64 OS_THREAD_SOFT_LIMIT = 100000; MinTSOScheduler::MinTSOScheduler(UInt64 soft_limit, UInt64 hard_limit, UInt64 active_set_soft_limit_) - : min_tso(MAX_UINT64) + : min_query_id(MPPTaskId::Max_Query_Id) , thread_soft_limit(soft_limit) , thread_hard_limit(hard_limit) , estimated_thread_usage(0) @@ -57,7 +56,8 @@ MinTSOScheduler::MinTSOScheduler(UInt64 soft_limit, UInt64 hard_limit, UInt64 ac { LOG_INFO(log, "thread_hard_limit is {}, thread_soft_limit is {}, and active_set_soft_limit is {} in MinTSOScheduler.", thread_hard_limit, thread_soft_limit, active_set_soft_limit); } - GET_METRIC(tiflash_task_scheduler, type_min_tso).Set(min_tso); + auto [query_ts, local_query_id, server_id] = MPPTaskId::decodeQueryID(min_query_id); + GET_METRIC(tiflash_task_scheduler, type_min_query_ts).Set(query_ts); GET_METRIC(tiflash_task_scheduler, type_thread_soft_limit).Set(thread_soft_limit); GET_METRIC(tiflash_task_scheduler, type_thread_hard_limit).Set(thread_hard_limit); GET_METRIC(tiflash_task_scheduler, type_estimated_thread_usage).Set(estimated_thread_usage); @@ -77,34 +77,34 @@ bool MinTSOScheduler::tryToSchedule(MPPTaskScheduleEntry & schedule_entry, MPPTa return true; } const auto & id = schedule_entry.getMPPTaskId(); - auto query_task_set = task_manager.getQueryTaskSetWithoutLock(id.start_ts); + auto query_task_set = task_manager.getQueryTaskSetWithoutLock(id.query_id); if (nullptr == query_task_set || !query_task_set->isInNormalState()) { LOG_WARNING(log, "{} is scheduled with miss or abort.", id.toString()); return true; } bool has_error = false; - return scheduleImp(id.start_ts, query_task_set, schedule_entry, false, has_error); + return scheduleImp(id.query_id, query_task_set, schedule_entry, false, has_error); } -/// after finishing the query, there would be no threads released soon, so the updated min-tso query with waiting tasks should be scheduled. +/// after finishing the query, there would be no threads released soon, so the updated min-query-id query with waiting tasks should be scheduled. /// the cancelled query maybe hang, so trigger scheduling as needed when deleting cancelled query. -void MinTSOScheduler::deleteQuery(const UInt64 tso, MPPTaskManager & task_manager, const bool is_cancelled) +void MinTSOScheduler::deleteQuery(const String query_id, MPPTaskManager & task_manager, const bool is_cancelled) { if (isDisabled()) { return; } - LOG_DEBUG(log, "{} query {} (is min = {}) is deleted from active set {} left {} or waiting set {} left {}.", is_cancelled ? "Cancelled" : "Finished", tso, tso == min_tso, active_set.find(tso) != active_set.end(), active_set.size(), waiting_set.find(tso) != waiting_set.end(), waiting_set.size()); - active_set.erase(tso); - waiting_set.erase(tso); + LOG_DEBUG(log, "{} query {} (is min = {}) is deleted from active set {} left {} or waiting set {} left {}.", is_cancelled ? "Cancelled" : "Finished", query_id, query_id == min_query_id, active_set.find(query_id) != active_set.end(), active_set.size(), waiting_set.find(query_id) != waiting_set.end(), waiting_set.size()); + active_set.erase(query_id); + waiting_set.erase(query_id); GET_METRIC(tiflash_task_scheduler, type_waiting_queries_count).Set(waiting_set.size()); GET_METRIC(tiflash_task_scheduler, type_active_queries_count).Set(active_set.size()); if (is_cancelled) /// cancelled queries may have waiting tasks, and finished queries haven't. { - auto query_task_set = task_manager.getQueryTaskSetWithoutLock(tso); + auto query_task_set = task_manager.getQueryTaskSetWithoutLock(query_id); if (query_task_set) /// release all waiting tasks { while (!query_task_set->waiting_tasks.empty()) @@ -118,8 +118,8 @@ void MinTSOScheduler::deleteQuery(const UInt64 tso, MPPTaskManager & task_manage } } - /// NOTE: if updated min_tso query has waiting tasks, they should be scheduled, especially when the soft-limited threads are amost used and active tasks are in resources deadlock which cannot release threads soon. - if (updateMinTSO(tso, true, is_cancelled ? "when cancelling it" : "as finishing it")) + /// NOTE: if updated min_query_id query has waiting tasks, they should be scheduled, especially when the soft-limited threads are amost used and active tasks are in resources deadlock which cannot release threads soon. + if (updateMinTSO(query_id, true, is_cancelled ? "when cancelling it" : "as finishing it")) { scheduleWaitingQueries(task_manager); } @@ -161,7 +161,7 @@ void MinTSOScheduler::scheduleWaitingQueries(MPPTaskManager & task_manager) continue; } - LOG_DEBUG(log, "query {} (is min = {}) with {} tasks is to be scheduled from waiting set (size = {}).", current_query_id, current_query_id == min_tso, query_task_set->waiting_tasks.size(), waiting_set.size()); + LOG_DEBUG(log, "query {} (is min = {}) with {} tasks is to be scheduled from waiting set (size = {}).", current_query_id, current_query_id == min_query_id, query_task_set->waiting_tasks.size(), waiting_set.size()); /// schedule tasks one by one while (!query_task_set->waiting_tasks.empty()) { @@ -179,22 +179,22 @@ void MinTSOScheduler::scheduleWaitingQueries(MPPTaskManager & task_manager) query_task_set->waiting_tasks.pop(); GET_METRIC(tiflash_task_scheduler, type_waiting_tasks_count).Decrement(); } - LOG_DEBUG(log, "query {} (is min = {}) is scheduled from waiting set (size = {}).", current_query_id, current_query_id == min_tso, waiting_set.size()); + LOG_DEBUG(log, "query {} (is min = {}) is scheduled from waiting set (size = {}).", current_query_id, current_query_id == min_query_id, waiting_set.size()); waiting_set.erase(current_query_id); /// all waiting tasks of this query are fully active GET_METRIC(tiflash_task_scheduler, type_waiting_queries_count).Set(waiting_set.size()); } } -/// [directly schedule, from waiting set] * [is min_tso query, not] * [can schedule, can't] totally 8 cases. -bool MinTSOScheduler::scheduleImp(const UInt64 tso, const MPPQueryTaskSetPtr & query_task_set, MPPTaskScheduleEntry & schedule_entry, const bool isWaiting, bool & has_error) +/// [directly schedule, from waiting set] * [is min_query_id query, not] * [can schedule, can't] totally 8 cases. +bool MinTSOScheduler::scheduleImp(const String query_id, const MPPQueryTaskSetPtr & query_task_set, MPPTaskScheduleEntry & schedule_entry, const bool isWaiting, bool & has_error) { auto needed_threads = schedule_entry.getNeededThreads(); - auto check_for_new_min_tso = tso <= min_tso && estimated_thread_usage + needed_threads <= thread_hard_limit; - auto check_for_not_min_tso = (active_set.size() < active_set_soft_limit || tso <= *active_set.rbegin()) && (estimated_thread_usage + needed_threads <= thread_soft_limit); + auto check_for_new_min_tso = query_id <= min_query_id && estimated_thread_usage + needed_threads <= thread_hard_limit; + auto check_for_not_min_tso = (active_set.size() < active_set_soft_limit || query_id <= *active_set.rbegin()) && (estimated_thread_usage + needed_threads <= thread_soft_limit); if (check_for_new_min_tso || check_for_not_min_tso) { - updateMinTSO(tso, false, isWaiting ? "from the waiting set" : "when directly schedule it"); - active_set.insert(tso); + updateMinTSO(query_id, false, isWaiting ? "from the waiting set" : "when directly schedule it"); + active_set.insert(query_id); if (schedule_entry.schedule(ScheduleState::SCHEDULED)) { estimated_thread_usage += needed_threads; @@ -202,24 +202,24 @@ bool MinTSOScheduler::scheduleImp(const UInt64 tso, const MPPQueryTaskSetPtr & q } GET_METRIC(tiflash_task_scheduler, type_active_queries_count).Set(active_set.size()); GET_METRIC(tiflash_task_scheduler, type_estimated_thread_usage).Set(estimated_thread_usage); - LOG_INFO(log, "{} is scheduled (active set size = {}) due to available threads {}, after applied for {} threads, used {} of the thread {} limit {}.", schedule_entry.getMPPTaskId().toString(), active_set.size(), isWaiting ? "from the waiting set" : "directly", needed_threads, estimated_thread_usage, min_tso == tso ? "hard" : "soft", min_tso == tso ? thread_hard_limit : thread_soft_limit); + LOG_INFO(log, "{} is scheduled (active set size = {}) due to available threads {}, after applied for {} threads, used {} of the thread {} limit {}.", schedule_entry.getMPPTaskId().toString(), active_set.size(), isWaiting ? "from the waiting set" : "directly", needed_threads, estimated_thread_usage, min_query_id == query_id ? "hard" : "soft", min_query_id == query_id ? thread_hard_limit : thread_soft_limit); return true; } else { - bool is_tso_min = tso <= min_tso; - fiu_do_on(FailPoints::random_min_tso_scheduler_failpoint, is_tso_min = true;); - if (is_tso_min) /// the min_tso query should fully run, otherwise throw errors here. + bool is_query_id_min = query_id <= min_query_id; + fiu_do_on(FailPoints::random_min_tso_scheduler_failpoint, is_query_id_min = true;); + if (is_query_id_min) /// the min_query_id query should fully run, otherwise throw errors here. { has_error = true; - auto msg = fmt::format("threads are unavailable for the query {} ({} min_tso {}) {}, need {}, but used {} of the thread hard limit {}, {} active and {} waiting queries.", tso, tso == min_tso ? "is" : "is newer than", min_tso, isWaiting ? "from the waiting set" : "when directly schedule it", needed_threads, estimated_thread_usage, thread_hard_limit, active_set.size(), waiting_set.size()); + auto msg = fmt::format("threads are unavailable for the query {} ({} min_query_id {}) {}, need {}, but used {} of the thread hard limit {}, {} active and {} waiting queries.", query_id, query_id == min_query_id ? "is" : "is newer than", min_query_id, isWaiting ? "from the waiting set" : "when directly schedule it", needed_threads, estimated_thread_usage, thread_hard_limit, active_set.size(), waiting_set.size()); LOG_ERROR(log, "{}", msg); GET_METRIC(tiflash_task_scheduler, type_hard_limit_exceeded_count).Increment(); if (isWaiting) { - /// set this task be failed to schedule, and the task will throw exception, then TiDB will finally notify this tiflash node canceling all tasks of this tso and update metrics. + /// set this task be failed to schedule, and the task will throw exception, then TiDB will finally notify this tiflash node canceling all tasks of this query_id and update metrics. schedule_entry.schedule(ScheduleState::EXCEEDED); - waiting_set.erase(tso); /// avoid the left waiting tasks of this query reaching here many times. + waiting_set.erase(query_id); /// avoid the left waiting tasks of this query reaching here many times. } else { @@ -229,38 +229,43 @@ bool MinTSOScheduler::scheduleImp(const UInt64 tso, const MPPQueryTaskSetPtr & q } if (!isWaiting) { - waiting_set.insert(tso); + waiting_set.insert(query_id); query_task_set->waiting_tasks.push(schedule_entry.getMPPTaskId()); GET_METRIC(tiflash_task_scheduler, type_waiting_queries_count).Set(waiting_set.size()); GET_METRIC(tiflash_task_scheduler, type_waiting_tasks_count).Increment(); } - LOG_INFO(log, "threads are unavailable for the query {} or active set is full (size = {}), need {}, but used {} of the thread soft limit {},{} waiting set size = {}", tso, active_set.size(), needed_threads, estimated_thread_usage, thread_soft_limit, isWaiting ? "" : " put into", waiting_set.size()); + LOG_INFO(log, "threads are unavailable for the query {} or active set is full (size = {}), need {}, but used {} of the thread soft limit {},{} waiting set size = {}", query_id, active_set.size(), needed_threads, estimated_thread_usage, thread_soft_limit, isWaiting ? "" : " put into", waiting_set.size()); return false; } } -/// if return true, then need to schedule the waiting tasks of the min_tso. -bool MinTSOScheduler::updateMinTSO(const UInt64 tso, const bool retired, const String & msg) +/// if return true, then need to schedule the waiting tasks of the min_query_id. +bool MinTSOScheduler::updateMinTSO(const String tso, const bool retired, const String & msg) { - auto old_min_tso = min_tso; + auto old_min_tso = min_query_id; bool force_scheduling = false; if (retired) { - if (tso == min_tso) /// elect a new min_tso from all queries. + if (tso == min_query_id) /// elect a new min_query_id from all queries. { - min_tso = active_set.empty() ? MAX_UINT64 : *active_set.begin(); - min_tso = waiting_set.empty() ? min_tso : std::min(*waiting_set.begin(), min_tso); - force_scheduling = waiting_set.find(min_tso) != waiting_set.end(); /// if this min_tso has waiting tasks, these tasks should force being scheduled. + min_query_id = active_set.empty() ? MPPTaskId::Max_Query_Id : *active_set.begin(); + min_query_id = waiting_set.empty() ? min_query_id : std::min(*waiting_set.begin(), min_query_id); + force_scheduling = waiting_set.find(min_query_id) != waiting_set.end(); /// if this min_query_id has waiting tasks, these tasks should force being scheduled. } } else { - min_tso = std::min(tso, min_tso); + min_query_id = std::min(tso, min_query_id); } - if (min_tso != old_min_tso) /// if min_tso == MAX_UINT64 and the query tso is not to be cancelled, the used_threads, active_set.size() and waiting_set.size() must be 0. + if (min_query_id != old_min_tso) /// if min_query_id == MPPTaskId::Max_Query_Id and the query tso is not to be cancelled, the used_threads, active_set.size() and waiting_set.size() must be 0. { - GET_METRIC(tiflash_task_scheduler, type_min_tso).Set(min_tso); - LOG_INFO(log, "min_tso query is updated from {} to {} {}, used threads = {}, {} active and {} waiting queries.", old_min_tso, min_tso, msg, estimated_thread_usage, active_set.size(), waiting_set.size()); + auto [query_ts, local_query_id, server_id] = MPPTaskId::decodeQueryID(min_query_id); + if (query_ts == 0) + { + query_ts = server_id; + } + GET_METRIC(tiflash_task_scheduler, type_min_query_ts).Set(query_ts); + LOG_INFO(log, "min_query_id query is updated from {} to {} {}, used threads = {}, {} active and {} waiting queries.", old_min_tso, min_query_id, msg, estimated_thread_usage, active_set.size(), waiting_set.size()); } return force_scheduling; } diff --git a/dbms/src/Flash/Mpp/MinTSOScheduler.h b/dbms/src/Flash/Mpp/MinTSOScheduler.h index 25abb770e44..e14ca3a9a64 100644 --- a/dbms/src/Flash/Mpp/MinTSOScheduler.h +++ b/dbms/src/Flash/Mpp/MinTSOScheduler.h @@ -28,8 +28,8 @@ using MPPTaskManagerPtr = std::shared_ptr; struct MPPQueryTaskSet; using MPPQueryTaskSetPtr = std::shared_ptr; -/// scheduling tasks in the set according to the tso order under the soft limit of threads, but allow the min_tso query to preempt threads under the hard limit of threads. -/// The min_tso query avoids the deadlock resulted from threads competition among nodes. +/// scheduling tasks in the set according to the tso order under the soft limit of threads, but allow the min_query_id query to preempt threads under the hard limit of threads. +/// The min_query_id query avoids the deadlock resulted from threads competition among nodes. /// schedule tasks under the lock protection of the task manager. /// NOTE: if the updated min-tso query has waiting tasks, necessarily scheduling them, otherwise the query would hang. class MinTSOScheduler : private boost::noncopyable @@ -37,28 +37,28 @@ class MinTSOScheduler : private boost::noncopyable public: MinTSOScheduler(UInt64 soft_limit, UInt64 hard_limit, UInt64 active_set_soft_limit_); ~MinTSOScheduler() = default; - /// try to schedule this task if it is the min_tso query or there are enough threads, otherwise put it into the waiting set. + /// try to schedule this task if it is the min_query_id query or there are enough threads, otherwise put it into the waiting set. /// NOTE: call tryToSchedule under the lock protection of MPPTaskManager bool tryToSchedule(MPPTaskScheduleEntry & schedule_entry, MPPTaskManager & task_manager); - /// delete this to-be cancelled/finished query from scheduler and update min_tso if needed, so that there aren't cancelled/finished queries in the scheduler. + /// delete this to-be cancelled/finished query from scheduler and update min_query_id if needed, so that there aren't cancelled/finished queries in the scheduler. /// NOTE: call deleteQuery under the lock protection of MPPTaskManager - void deleteQuery(const UInt64 tso, MPPTaskManager & task_manager, const bool is_cancelled); + void deleteQuery(const String query_id, MPPTaskManager & task_manager, const bool is_cancelled); /// all scheduled tasks should finally call this function to release threads and schedule new tasks void releaseThreadsThenSchedule(const int needed_threads, MPPTaskManager & task_manager); private: - bool scheduleImp(const UInt64 tso, const MPPQueryTaskSetPtr & query_task_set, MPPTaskScheduleEntry & schedule_entry, const bool isWaiting, bool & has_error); - bool updateMinTSO(const UInt64 tso, const bool retired, const String & msg); + bool scheduleImp(const String query_id, const MPPQueryTaskSetPtr & query_task_set, MPPTaskScheduleEntry & schedule_entry, const bool isWaiting, bool & has_error); + bool updateMinTSO(const String query_id, const bool retired, const String & msg); void scheduleWaitingQueries(MPPTaskManager & task_manager); bool isDisabled() { return thread_hard_limit == 0 && thread_soft_limit == 0; } - std::set waiting_set; - std::set active_set; - UInt64 min_tso; + std::set waiting_set; + std::set active_set; + String min_query_id; UInt64 thread_soft_limit; UInt64 thread_hard_limit; UInt64 estimated_thread_usage; diff --git a/dbms/src/Flash/Mpp/tests/gtest_mpp_exchange_writer.cpp b/dbms/src/Flash/Mpp/tests/gtest_mpp_exchange_writer.cpp index fa3456d96eb..bf4a72693de 100644 --- a/dbms/src/Flash/Mpp/tests/gtest_mpp_exchange_writer.cpp +++ b/dbms/src/Flash/Mpp/tests/gtest_mpp_exchange_writer.cpp @@ -136,6 +136,23 @@ struct MockExchangeWriter uint16_t part_num; }; +TEST_F(TestMPPExchangeWriter, testQueryId) +try +{ + constexpr UInt64 MAX_UINT64 = std::numeric_limits::max(); + constexpr UInt64 MAX_INT64 = std::numeric_limits::max(); + Int64 test = std::stoull("7FFFFFFFFFFFFFFF", 0, 16); + EXPECT_EQ(test, MAX_INT64); + + String query_id = MPPTaskId::generateQueryID(MAX_INT64, 1, MAX_UINT64, MAX_UINT64); + EXPECT_EQ(query_id, "7fffffffffffffff0000000000000001ffffffffffffffff"); + auto [query_ts, local_query_id, server_id] = MPPTaskId::decodeQueryID(query_id); + EXPECT_EQ(query_ts, MAX_INT64); + EXPECT_EQ(server_id, MAX_UINT64); + EXPECT_EQ(double(local_query_id), 1); +} +CATCH + // Input block data is distributed uniform. // partition_num: 4 // fine_grained_shuffle_stream_count: 8 diff --git a/dbms/src/Flash/tests/bench_exchange.cpp b/dbms/src/Flash/tests/bench_exchange.cpp index 3454242f2a7..6691a3b74a4 100644 --- a/dbms/src/Flash/tests/bench_exchange.cpp +++ b/dbms/src/Flash/tests/bench_exchange.cpp @@ -28,7 +28,6 @@ namespace DB { namespace tests { - std::random_device rd; MockBlockInputStream::MockBlockInputStream(const std::vector & blocks_, StopFlag & stop_flag_) @@ -167,6 +166,9 @@ ReceiverHelper::ReceiverHelper(int concurrency_, int source_num_, uint32_t fine_ { mpp::TaskMeta task; task.set_start_ts(0); + task.set_query_ts(i); + task.set_server_id(i); + task.set_local_query_id(i); task.set_task_id(i); task.set_partition_id(i); task.set_address(""); diff --git a/dbms/src/Flash/tests/bench_window.cpp b/dbms/src/Flash/tests/bench_window.cpp index 9f68ba9beb9..eb14e12f314 100644 --- a/dbms/src/Flash/tests/bench_window.cpp +++ b/dbms/src/Flash/tests/bench_window.cpp @@ -47,7 +47,7 @@ class WindowFunctionBench : public ExchangeBench buildDefaultRowsFrame(), fine_grained_shuffle_stream_count); tipb::DAGRequest req; - MPPInfo mpp_info(0, -1, -1, {}, std::unordered_map>{}); + MPPInfo mpp_info(0, 0, 0, 0, -1, -1, {}, std::unordered_map>{}); builder.getRoot()->toTiPBExecutor(req.mutable_root_executor(), /*collator_id=*/0, mpp_info, TiFlashTestEnv::getContext()); assert(req.root_executor().tp() == tipb::TypeWindow); window = req.root_executor().window(); diff --git a/dbms/src/Flash/tests/gtest_compute_server.cpp b/dbms/src/Flash/tests/gtest_compute_server.cpp index 43da894ec64..63ca54371e8 100644 --- a/dbms/src/Flash/tests/gtest_compute_server.cpp +++ b/dbms/src/Flash/tests/gtest_compute_server.cpp @@ -18,7 +18,6 @@ namespace DB { namespace tests { - LoggerPtr MPPTaskTestUtils::log_ptr = nullptr; size_t MPPTaskTestUtils::server_num = 0; MPPTestMeta MPPTaskTestUtils::test_meta = {}; @@ -296,13 +295,13 @@ try { startServers(4); { - auto [start_ts, res] = prepareMPPStreams(context + auto [query_id, res] = prepareMPPStreams(context .scan("test_db", "test_table_1") .aggregation({Max(col("s1"))}, {col("s2"), col("s3")}) .project({"max(s1)"})); - EXPECT_TRUE(assertQueryActive(start_ts)); - MockComputeServerManager::instance().cancelQuery(start_ts); - EXPECT_TRUE(assertQueryCancelled(start_ts)); + EXPECT_TRUE(assertQueryActive(query_id)); + MockComputeServerManager::instance().cancelQuery(query_id); + EXPECT_TRUE(assertQueryCancelled(query_id)); } } CATCH @@ -312,12 +311,12 @@ try { startServers(4); { - auto [start_ts, res] = prepareMPPStreams(context + auto [query_id, res] = prepareMPPStreams(context .scan("test_db", "l_table") .join(context.scan("test_db", "r_table"), tipb::JoinType::TypeLeftOuterJoin, {col("join_c")})); - EXPECT_TRUE(assertQueryActive(start_ts)); - MockComputeServerManager::instance().cancelQuery(start_ts); - EXPECT_TRUE(assertQueryCancelled(start_ts)); + EXPECT_TRUE(assertQueryActive(query_id)); + MockComputeServerManager::instance().cancelQuery(query_id); + EXPECT_TRUE(assertQueryCancelled(query_id)); } } CATCH @@ -327,14 +326,14 @@ try { startServers(4); { - auto [start_ts, _] = prepareMPPStreams(context + auto [query_id, _] = prepareMPPStreams(context .scan("test_db", "l_table") .join(context.scan("test_db", "r_table"), tipb::JoinType::TypeLeftOuterJoin, {col("join_c")}) .aggregation({Max(col("l_table.s"))}, {col("l_table.s")}) .project({col("max(l_table.s)"), col("l_table.s")})); - EXPECT_TRUE(assertQueryActive(start_ts)); - MockComputeServerManager::instance().cancelQuery(start_ts); - EXPECT_TRUE(assertQueryCancelled(start_ts)); + EXPECT_TRUE(assertQueryActive(query_id)); + MockComputeServerManager::instance().cancelQuery(query_id); + EXPECT_TRUE(assertQueryCancelled(query_id)); } } CATCH @@ -344,27 +343,27 @@ try { startServers(4); { - auto [start_ts1, res1] = prepareMPPStreams(context + auto [query_id1, res1] = prepareMPPStreams(context .scan("test_db", "l_table") .join(context.scan("test_db", "r_table"), tipb::JoinType::TypeLeftOuterJoin, {col("join_c")})); - auto [start_ts2, res2] = prepareMPPStreams(context + auto [query_id2, res2] = prepareMPPStreams(context .scan("test_db", "l_table") .join(context.scan("test_db", "r_table"), tipb::JoinType::TypeLeftOuterJoin, {col("join_c")}) .aggregation({Max(col("l_table.s"))}, {col("l_table.s")}) .project({col("max(l_table.s)"), col("l_table.s")})); - EXPECT_TRUE(assertQueryActive(start_ts1)); - MockComputeServerManager::instance().cancelQuery(start_ts1); - EXPECT_TRUE(assertQueryCancelled(start_ts1)); + EXPECT_TRUE(assertQueryActive(query_id1)); + MockComputeServerManager::instance().cancelQuery(query_id1); + EXPECT_TRUE(assertQueryCancelled(query_id1)); - EXPECT_TRUE(assertQueryActive(start_ts2)); - MockComputeServerManager::instance().cancelQuery(start_ts2); - EXPECT_TRUE(assertQueryCancelled(start_ts2)); + EXPECT_TRUE(assertQueryActive(query_id2)); + MockComputeServerManager::instance().cancelQuery(query_id2); + EXPECT_TRUE(assertQueryCancelled(query_id2)); } // start 10 queries { - std::vector>> queries; + std::vector>> queries; for (size_t i = 0; i < 10; ++i) { queries.push_back(prepareMPPStreams(context @@ -373,10 +372,10 @@ try } for (size_t i = 0; i < 10; ++i) { - auto start_ts = std::get<0>(queries[i]); - EXPECT_TRUE(assertQueryActive(start_ts)); - MockComputeServerManager::instance().cancelQuery(start_ts); - EXPECT_TRUE(assertQueryCancelled(start_ts)); + auto query_id = std::get<0>(queries[i]); + EXPECT_TRUE(assertQueryActive(query_id)); + MockComputeServerManager::instance().cancelQuery(query_id); + EXPECT_TRUE(assertQueryCancelled(query_id)); } } } diff --git a/dbms/src/Storages/Transaction/LearnerRead.cpp b/dbms/src/Storages/Transaction/LearnerRead.cpp index 0a52118eed4..3453cd6f52e 100644 --- a/dbms/src/Storages/Transaction/LearnerRead.cpp +++ b/dbms/src/Storages/Transaction/LearnerRead.cpp @@ -208,29 +208,40 @@ LearnerReadSnapshot doLearnerRead( std::vector batch_read_index_req; batch_read_index_req.reserve(ori_batch_region_size); + size_t stale_read_count = 0; { // If using `std::numeric_limits::max()`, set `start-ts` 0 to get the latest index but let read-index-worker do not record as history. auto read_index_tso = mvcc_query_info->read_tso == std::numeric_limits::max() ? 0 : mvcc_query_info->read_tso; - + RegionTable & region_table = tmt.getRegionTable(); for (size_t region_idx = region_begin_idx; region_idx < region_end_idx; ++region_idx) { const auto & region_to_query = regions_info[region_idx]; const RegionID region_id = region_to_query.region_id; - if (auto ori_read_index = mvcc_query_info.getReadIndexRes(region_id); ori_read_index) + UInt64 physical_tso = read_index_tso >> TsoPhysicalShiftBits; + bool can_stale_read = physical_tso < region_table.getSelfSafeTS(region_id); + if (!can_stale_read) { - auto resp = kvrpcpb::ReadIndexResponse(); - resp.set_read_index(ori_read_index); - batch_read_index_result.emplace(region_id, std::move(resp)); + if (auto ori_read_index = mvcc_query_info.getReadIndexRes(region_id); ori_read_index) + { + auto resp = kvrpcpb::ReadIndexResponse(); + resp.set_read_index(ori_read_index); + batch_read_index_result.emplace(region_id, std::move(resp)); + } + else + { + auto & region = regions_snapshot.find(region_id)->second; + batch_read_index_req.emplace_back(GenRegionReadIndexReq(*region, read_index_tso)); + } } else { - auto & region = regions_snapshot.find(region_id)->second; - batch_read_index_req.emplace_back(GenRegionReadIndexReq(*region, read_index_tso)); + batch_read_index_result.emplace(region_id, kvrpcpb::ReadIndexResponse()); + stale_read_count++; } } } - + GET_METRIC(tiflash_stale_read_count).Increment(stale_read_count); GET_METRIC(tiflash_raft_read_index_count).Increment(batch_read_index_req.size()); const auto & make_default_batch_read_index_result = [&](bool with_region_error) { diff --git a/dbms/src/Storages/Transaction/RegionTable.cpp b/dbms/src/Storages/Transaction/RegionTable.cpp index a82fef8b1ad..956ca227051 100644 --- a/dbms/src/Storages/Transaction/RegionTable.cpp +++ b/dbms/src/Storages/Transaction/RegionTable.cpp @@ -505,6 +505,17 @@ bool RegionTable::isSafeTSLag(UInt64 region_id, UInt64 * leader_safe_ts, UInt64 return (*leader_safe_ts > *self_safe_ts) && (*leader_safe_ts - *self_safe_ts > SafeTsDiffThreshold); } +UInt64 RegionTable::getSelfSafeTS(UInt64 region_id) +{ + std::shared_lock lock(rw_lock); + auto it = safe_ts_map.find(region_id); + if (it == safe_ts_map.end()) + { + return 0; + } + return it->second->self_safe_ts.load(std::memory_order_relaxed); +} + void RegionTable::updateSafeTS(UInt64 region_id, UInt64 leader_safe_ts, UInt64 self_safe_ts) { { diff --git a/dbms/src/Storages/Transaction/RegionTable.h b/dbms/src/Storages/Transaction/RegionTable.h index 9b4fe1a4286..5e2154823d7 100644 --- a/dbms/src/Storages/Transaction/RegionTable.h +++ b/dbms/src/Storages/Transaction/RegionTable.h @@ -59,7 +59,8 @@ using RegionScanFilterPtr = std::shared_ptr; using SafeTS = UInt64; enum : SafeTS { - InvalidSafeTS = std::numeric_limits::max() + TsoPhysicalShiftBits = 18, + InvalidSafeTS = std::numeric_limits::max(), }; class RegionTable : private boost::noncopyable { @@ -189,6 +190,7 @@ class RegionTable : private boost::noncopyable static const UInt64 SafeTsDiffThreshold = 2 * 60 * 1000; bool isSafeTSLag(UInt64 region_id, UInt64 * leader_safe_ts, UInt64 * self_safe_ts); + UInt64 getSelfSafeTS(UInt64 region_id); private: friend class MockTiDB; diff --git a/dbms/src/TestUtils/MPPTaskTestUtils.cpp b/dbms/src/TestUtils/MPPTaskTestUtils.cpp index d86ce0befd8..19e0083dd3e 100644 --- a/dbms/src/TestUtils/MPPTaskTestUtils.cpp +++ b/dbms/src/TestUtils/MPPTaskTestUtils.cpp @@ -23,6 +23,7 @@ DAGProperties getDAGPropertiesForTest(int server_num) properties.is_mpp_query = true; properties.mpp_partition_num = server_num; properties.start_ts = MockTimeStampGenerator::instance().nextTs(); + properties.local_query_id = properties.start_ts; return properties; } @@ -68,7 +69,7 @@ size_t MPPTaskTestUtils::serverNum() return server_num; } -std::tuple> MPPTaskTestUtils::prepareMPPStreams(DAGRequestBuilder builder) +std::tuple> MPPTaskTestUtils::prepareMPPStreams(DAGRequestBuilder builder) { auto properties = DB::tests::getDAGPropertiesForTest(serverNum()); auto tasks = builder.buildMPPTasks(context, properties); @@ -76,7 +77,7 @@ std::tuple> MPPTaskTestUtils::prepareMP TiFlashTestEnv::getGlobalContext(i).setCancelTest(); MockComputeServerManager::instance().setMockStorage(context.mockStorage()); auto res = executeMPPQueryWithMultipleContext(properties, tasks, MockComputeServerManager::instance().getServerConfigMap()); - return {properties.start_ts, res}; + return {MPPTaskId::generateQueryID(properties.query_ts, properties.local_query_id, properties.server_id, properties.start_ts), res}; } ColumnsWithTypeAndName MPPTaskTestUtils::exeucteMPPTasks(QueryTasks & tasks, const DAGProperties & properties, std::unordered_map & server_config_map) @@ -133,14 +134,14 @@ String MPPTaskTestUtils::queryInfo(size_t server_id) return buf.toString(); } -::testing::AssertionResult MPPTaskTestUtils::assertQueryCancelled(size_t start_ts) +::testing::AssertionResult MPPTaskTestUtils::assertQueryCancelled(String query_id) { auto seconds = std::chrono::seconds(1); auto retry_times = 0; for (int i = test_meta.context_idx; i < TiFlashTestEnv::globalContextSize(); ++i) { // wait until the task is empty for - while (TiFlashTestEnv::getGlobalContext(i).getTMTContext().getMPPTaskManager()->getQueryTaskSet(start_ts) != nullptr) + while (TiFlashTestEnv::getGlobalContext(i).getTMTContext().getMPPTaskManager()->getQueryTaskSet(query_id) != nullptr) { std::this_thread::sleep_for(seconds); retry_times++; @@ -154,13 +155,13 @@ ::testing::AssertionResult MPPTaskTestUtils::assertQueryCancelled(size_t start_t return ::testing::AssertionSuccess(); } -::testing::AssertionResult MPPTaskTestUtils::assertQueryActive(size_t start_ts) +::testing::AssertionResult MPPTaskTestUtils::assertQueryActive(String query_id) { for (int i = test_meta.context_idx; i < TiFlashTestEnv::globalContextSize(); ++i) { - if (TiFlashTestEnv::getGlobalContext(i).getTMTContext().getMPPTaskManager()->getQueryTaskSet(start_ts) == nullptr) + if (TiFlashTestEnv::getGlobalContext(i).getTMTContext().getMPPTaskManager()->getQueryTaskSet(query_id) == nullptr) { - return ::testing::AssertionFailure() << "Query " << start_ts << "not active" << std::endl; + return ::testing::AssertionFailure() << "Query " << query_id << "not active" << std::endl; } } return ::testing::AssertionSuccess(); diff --git a/dbms/src/TestUtils/MPPTaskTestUtils.h b/dbms/src/TestUtils/MPPTaskTestUtils.h index ab147864492..d8aa64592d3 100644 --- a/dbms/src/TestUtils/MPPTaskTestUtils.h +++ b/dbms/src/TestUtils/MPPTaskTestUtils.h @@ -23,7 +23,6 @@ namespace DB::tests { - DAGProperties getDAGPropertiesForTest(int server_num); class MockTimeStampGenerator : public ext::Singleton { @@ -81,14 +80,14 @@ class MPPTaskTestUtils : public ExecutorTest static size_t serverNum(); // run mpp tasks which are ready to cancel, the return value is the start_ts of query. - std::tuple> prepareMPPStreams(DAGRequestBuilder builder); + std::tuple> prepareMPPStreams(DAGRequestBuilder builder); ColumnsWithTypeAndName exeucteMPPTasks(QueryTasks & tasks, const DAGProperties & properties, std::unordered_map & server_config_map); ColumnsWithTypeAndName executeCoprocessorTask(std::shared_ptr & dag_request); - static ::testing::AssertionResult assertQueryCancelled(size_t start_ts); - static ::testing::AssertionResult assertQueryActive(size_t start_ts); + static ::testing::AssertionResult assertQueryCancelled(String query_id); + static ::testing::AssertionResult assertQueryActive(String query_id); static String queryInfo(size_t server_id); protected: diff --git a/dbms/src/TestUtils/mockExecutor.cpp b/dbms/src/TestUtils/mockExecutor.cpp index 2ed1de79057..e6c9a82a231 100644 --- a/dbms/src/TestUtils/mockExecutor.cpp +++ b/dbms/src/TestUtils/mockExecutor.cpp @@ -90,7 +90,7 @@ void DAGRequestBuilder::initDAGRequest(tipb::DAGRequest & dag_request) std::shared_ptr DAGRequestBuilder::build(MockDAGRequestContext & mock_context, DAGRequestType type) { // build tree struct base executor - MPPInfo mpp_info(properties.start_ts, -1, -1, {}, mock_context.receiver_source_task_ids_map); + MPPInfo mpp_info(properties.start_ts, properties.query_ts, properties.server_id, properties.local_query_id, -1, -1, {}, mock_context.receiver_source_task_ids_map); std::shared_ptr dag_request_ptr = std::make_shared(); tipb::DAGRequest & dag_request = *dag_request_ptr; initDAGRequest(dag_request); diff --git a/metrics/grafana/tiflash_summary.json b/metrics/grafana/tiflash_summary.json index 9d69c8ec6d8..cf1cd15ea5b 100644 --- a/metrics/grafana/tiflash_summary.json +++ b/metrics/grafana/tiflash_summary.json @@ -4029,9 +4029,9 @@ "targets": [ { "exemplar": true, - "expr": "max(tiflash_task_scheduler{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\", type=\"min_tso\"}) by (instance)", + "expr": "max(tiflash_task_scheduler{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\", type=\"min_query_ts\"}) by (instance,type)", "interval": "", - "legendFormat": "{{instance}}", + "legendFormat": "{{instance}}-{{type}}", "queryType": "randomWalk", "refId": "A" } @@ -8668,10 +8668,10 @@ "h": 7, "w": 12, "x": 0, - "y": 107 + "y": 9 }, "hiddenSeries": false, - "id": 35, + "id": 166, "legend": { "alignAsTable": false, "avg": false, @@ -8701,8 +8701,10 @@ "steppedLine": false, "targets": [ { - "expr": "sum(rate(tiflash_raft_read_index_count{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[1m])) by (instance)", + "exemplar": true, + "expr": "sum(rate(tiflash_stale_read_count{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[1m])) by (instance)", "format": "time_series", + "interval": "", "intervalFactor": 1, "legendFormat": "{{instance}}", "refId": "A" @@ -8712,7 +8714,7 @@ "timeFrom": null, "timeRegions": [], "timeShift": null, - "title": "Raft Read Index OPS", + "title": "Stale Read OPS", "tooltip": { "shared": true, "sort": 0, @@ -8728,6 +8730,7 @@ }, "yaxes": [ { + "$$hashKey": "object:435", "decimals": null, "format": "ops", "label": null, @@ -8737,6 +8740,7 @@ "show": true }, { + "$$hashKey": "object:436", "format": "none", "label": null, "logBase": 1, @@ -8760,16 +8764,16 @@ "defaults": {}, "overrides": [] }, - "fill": 1, + "fill": 0, "fillGradient": 0, "gridPos": { "h": 7, "w": 12, "x": 12, - "y": 107 + "y": 9 }, "hiddenSeries": false, - "id": 36, + "id": 35, "legend": { "alignAsTable": false, "avg": false, @@ -8799,39 +8803,18 @@ "steppedLine": false, "targets": [ { - "expr": "histogram_quantile(1.00, sum(rate(tiflash_raft_read_index_duration_seconds_bucket{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[1m])) by (le))", + "expr": "sum(rate(tiflash_raft_read_index_count{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[1m])) by (instance)", "format": "time_series", "intervalFactor": 1, - "legendFormat": "max", + "legendFormat": "{{instance}}", "refId": "A" - }, - { - "expr": "histogram_quantile(0.99, sum(rate(tiflash_raft_read_index_duration_seconds_bucket{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[1m])) by (le))", - "format": "time_series", - "intervalFactor": 1, - "legendFormat": "99", - "refId": "B" - }, - { - "expr": "histogram_quantile(0.95, sum(rate(tiflash_raft_read_index_duration_seconds_bucket{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[1m])) by (le))", - "format": "time_series", - "intervalFactor": 1, - "legendFormat": "95", - "refId": "C" - }, - { - "expr": "histogram_quantile(0.80, sum(rate(tiflash_raft_read_index_duration_seconds_bucket{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[1m])) by (le))", - "format": "time_series", - "intervalFactor": 1, - "legendFormat": "80", - "refId": "D" } ], "thresholds": [], "timeFrom": null, "timeRegions": [], "timeShift": null, - "title": "Raft Batch Read Index Duration", + "title": "Raft Read Index OPS", "tooltip": { "shared": true, "sort": 0, @@ -8847,7 +8830,8 @@ }, "yaxes": [ { - "format": "s", + "decimals": null, + "format": "ops", "label": null, "logBase": 1, "max": null, @@ -8855,7 +8839,7 @@ "show": true }, { - "format": "short", + "format": "none", "label": null, "logBase": 1, "max": null, @@ -8884,7 +8868,7 @@ "h": 7, "w": 12, "x": 0, - "y": 114 + "y": 16 }, "hiddenSeries": false, "id": 37, @@ -9018,7 +9002,126 @@ "h": 7, "w": 12, "x": 12, - "y": 114 + "y": 16 + }, + "hiddenSeries": false, + "id": 36, + "legend": { + "alignAsTable": false, + "avg": false, + "current": false, + "max": false, + "min": false, + "rightSide": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null as zero", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.5.11", + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "histogram_quantile(1.00, sum(rate(tiflash_raft_read_index_duration_seconds_bucket{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 1, + "legendFormat": "max", + "refId": "A" + }, + { + "expr": "histogram_quantile(0.99, sum(rate(tiflash_raft_read_index_duration_seconds_bucket{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 1, + "legendFormat": "99", + "refId": "B" + }, + { + "expr": "histogram_quantile(0.95, sum(rate(tiflash_raft_read_index_duration_seconds_bucket{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 1, + "legendFormat": "95", + "refId": "C" + }, + { + "expr": "histogram_quantile(0.80, sum(rate(tiflash_raft_read_index_duration_seconds_bucket{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[1m])) by (le))", + "format": "time_series", + "intervalFactor": 1, + "legendFormat": "80", + "refId": "D" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Raft Batch Read Index Duration", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "s", + "label": null, + "logBase": 1, + "max": null, + "min": "0", + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "The number of currently applying snapshots.", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 7, + "w": 24, + "x": 0, + "y": 23 }, "hiddenSeries": false, "id": 75, @@ -9122,7 +9225,7 @@ "h": 7, "w": 24, "x": 0, - "y": 121 + "y": 30 }, "hiddenSeries": false, "id": 82, @@ -9275,7 +9378,7 @@ "h": 7, "w": 12, "x": 0, - "y": 128 + "y": 37 }, "heatmap": {}, "hideZeroBuckets": true, @@ -9345,7 +9448,7 @@ "h": 7, "w": 12, "x": 12, - "y": 128 + "y": 37 }, "heatmap": {}, "hideZeroBuckets": true, @@ -9415,7 +9518,7 @@ "h": 7, "w": 12, "x": 0, - "y": 135 + "y": 44 }, "heatmap": {}, "hideZeroBuckets": true, @@ -9485,7 +9588,7 @@ "h": 7, "w": 12, "x": 12, - "y": 135 + "y": 44 }, "heatmap": {}, "hideZeroBuckets": true, @@ -9549,7 +9652,7 @@ "h": 7, "w": 24, "x": 0, - "y": 142 + "y": 51 }, "height": "", "hiddenSeries": false, @@ -9663,7 +9766,7 @@ "h": 7, "w": 12, "x": 0, - "y": 149 + "y": 58 }, "heatmap": {}, "hideZeroBuckets": true, @@ -9732,7 +9835,7 @@ "h": 7, "w": 12, "x": 12, - "y": 149 + "y": 58 }, "heatmap": {}, "hideZeroBuckets": true, @@ -9802,7 +9905,7 @@ "h": 7, "w": 12, "x": 0, - "y": 156 + "y": 65 }, "heatmap": {}, "hideZeroBuckets": true, @@ -9872,7 +9975,7 @@ "h": 7, "w": 12, "x": 12, - "y": 156 + "y": 65 }, "heatmap": {}, "hideZeroBuckets": true, @@ -9942,7 +10045,7 @@ "h": 7, "w": 12, "x": 0, - "y": 163 + "y": 72 }, "heatmap": {}, "hideZeroBuckets": true, @@ -10008,7 +10111,7 @@ "h": 7, "w": 12, "x": 12, - "y": 163 + "y": 72 }, "hiddenSeries": false, "id": 91,