diff --git a/dbms/src/Common/TiFlashMetrics.h b/dbms/src/Common/TiFlashMetrics.h index a23bf77eb75..6d94848d9ed 100644 --- a/dbms/src/Common/TiFlashMetrics.h +++ b/dbms/src/Common/TiFlashMetrics.h @@ -104,8 +104,8 @@ namespace DB F(type_exchange_partition, {"type", "exchange_partition"})) \ M(tiflash_schema_apply_duration_seconds, "Bucketed histogram of ddl apply duration", Histogram, \ F(type_ddl_apply_duration, {{"req", "ddl_apply_duration"}}, ExpBuckets{0.001, 2, 20})) \ - M(tiflash_raft_read_index_count, "Total number of raft read index", Counter) \ - M(tiflash_stale_read_count, "Total number of stale read", Counter) \ + M(tiflash_raft_read_index_count, "Total number of raft read index", Counter) \ + M(tiflash_stale_read_count, "Total number of stale read", Counter) \ M(tiflash_raft_read_index_duration_seconds, "Bucketed histogram of raft read index duration", Histogram, \ F(type_raft_read_index_duration, {{"type", "tmt_raft_read_index_duration"}}, ExpBuckets{0.001, 2, 20})) \ M(tiflash_raft_wait_index_duration_seconds, "Bucketed histogram of raft wait index duration", Histogram, \ diff --git a/dbms/src/Debug/MockComputeServerManager.cpp b/dbms/src/Debug/MockComputeServerManager.cpp index 1568369db52..7b71846e63b 100644 --- a/dbms/src/Debug/MockComputeServerManager.cpp +++ b/dbms/src/Debug/MockComputeServerManager.cpp @@ -117,14 +117,14 @@ void MockComputeServerManager::addServer(size_t partition_id, std::unique_ptrset_query_ts(query_ts); - meta->set_local_query_id(local_query_id_); - meta->set_server_id(server_id); + meta->set_query_ts(query_id.query_ts); + meta->set_local_query_id(query_id.local_query_id); + meta->set_server_id(query_id.server_id); + meta->set_start_ts(query_id.start_ts); mpp::CancelTaskResponse response; for (const auto & server : server_map) server.second->flashService()->cancelMPPTaskForTest(&req, &response); diff --git a/dbms/src/Debug/MockComputeServerManager.h b/dbms/src/Debug/MockComputeServerManager.h index 06563212988..6642388659f 100644 --- a/dbms/src/Debug/MockComputeServerManager.h +++ b/dbms/src/Debug/MockComputeServerManager.h @@ -48,7 +48,7 @@ class MockComputeServerManager : public ext::Singleton void resetMockMPPServerInfo(size_t partition_num); - void cancelQuery(String query_id); + void cancelQuery(const MPPQueryId & query_id); static String queryInfo(); diff --git a/dbms/src/Flash/FlashService.cpp b/dbms/src/Flash/FlashService.cpp index 906bd75633c..69b4107a704 100644 --- a/dbms/src/Flash/FlashService.cpp +++ b/dbms/src/Flash/FlashService.cpp @@ -341,7 +341,7 @@ grpc::Status FlashService::CancelMPPTask( auto & tmt_context = context->getTMTContext(); auto task_manager = tmt_context.getMPPTaskManager(); - task_manager->abortMPPQuery(MPPTaskId::generateQueryID(request->meta().query_ts(), request->meta().local_query_id(), request->meta().server_id(), request->meta().start_ts()), "Receive cancel request from TiDB", AbortType::ONCANCELLATION); + task_manager->abortMPPQuery(MPPQueryId(request->meta().query_ts(), request->meta().local_query_id(), request->meta().server_id(), request->meta().start_ts()), "Receive cancel request from TiDB", AbortType::ONCANCELLATION); return grpc::Status::OK; } @@ -383,7 +383,7 @@ ::grpc::Status FlashService::cancelMPPTaskForTest(const ::mpp::CancelTaskRequest } auto & tmt_context = context->getTMTContext(); auto task_manager = tmt_context.getMPPTaskManager(); - task_manager->abortMPPQuery(MPPTaskId::generateQueryID(request->meta().query_ts(), request->meta().local_query_id(), request->meta().server_id(), request->meta().start_ts()), "Receive cancel request from GTest", AbortType::ONCANCELLATION); + task_manager->abortMPPQuery(MPPQueryId(request->meta().query_ts(), request->meta().local_query_id(), request->meta().server_id(), request->meta().start_ts()), "Receive cancel request from GTest", AbortType::ONCANCELLATION); return grpc::Status::OK; } diff --git a/dbms/src/Flash/Mpp/MPPTaskId.cpp b/dbms/src/Flash/Mpp/MPPTaskId.cpp index a9dd93eca90..8b1ce324644 100644 --- a/dbms/src/Flash/Mpp/MPPTaskId.cpp +++ b/dbms/src/Flash/Mpp/MPPTaskId.cpp @@ -13,39 +13,50 @@ // limitations under the License. #include -#include #include namespace DB { -String MPPTaskId::toString() const +bool MPPQueryId::operator<(const MPPQueryId & mpp_query_id) const { - return isUnknown() ? "MPP" : fmt::format("MPP", query_id, start_ts, query_ts, task_id, server_id, local_query_id); + if (query_ts == 0 && local_query_id == 0 && server_id == 0) + { + return start_ts < mpp_query_id.start_ts; + } + return (query_ts < mpp_query_id.query_ts) || (local_query_id < mpp_query_id.local_query_id) || (server_id < mpp_query_id.server_id); +} +bool MPPQueryId::operator==(const MPPQueryId & rid) const +{ + return query_ts == rid.query_ts && local_query_id == rid.local_query_id && server_id == rid.server_id && start_ts == rid.start_ts; +} +bool MPPQueryId::operator!=(const MPPQueryId & rid) const +{ + return !(*this == rid); +} +bool MPPQueryId::operator<=(const MPPQueryId & rid) const +{ + return *this < rid || *this == rid; } -const MPPTaskId MPPTaskId::unknown_mpp_task_id = MPPTaskId{}; - -constexpr UInt64 MAX_UINT64 = std::numeric_limits::max(); -const String MPPTaskId::Max_Query_Id = generateQueryID(MAX_UINT64, MAX_UINT64, MAX_UINT64, MAX_UINT64); - -String MPPTaskId::generateQueryID(UInt64 query_ts_, UInt64 local_query_id_, UInt64 server_id_, UInt64 start_ts_) +size_t MPPQueryIdHash::operator()(MPPQueryId const & mpp_query_id) const noexcept { - if (local_query_id_ == 0) + if (mpp_query_id.query_ts == 0 && mpp_query_id.local_query_id == 0 && mpp_query_id.server_id == 0) { - return intToHex(0) + intToHex(0) + intToHex(start_ts_); + return std::hash()(mpp_query_id.start_ts); } - return intToHex(query_ts_) + intToHex(local_query_id_) + intToHex(server_id_); + return std::hash()(mpp_query_id.query_ts) ^ std::hash()(mpp_query_id.local_query_id) ^ std::hash()(mpp_query_id.server_id); } -std::tuple MPPTaskId::decodeQueryID(String query_id_str) +String MPPTaskId::toString() const { - UInt64 decode_query_ts = std::stoull(query_id_str.substr(0, sizeof(Int64) * 2), 0, 16); - UInt64 decode_local_query_id = std::stoull(query_id_str.substr(sizeof(Int64) * 2, sizeof(Int64) * 2), 0, 16); - UInt64 decode_server_id = std::stoull(query_id_str.substr(sizeof(Int64) * 2 * 2, sizeof(UInt64) * 2), 0, 16); - LOG_DEBUG(Logger::get(__FUNCTION__), "query_ts_={}, local_query_id_={}, server_id_={}", decode_query_ts, decode_local_query_id, decode_server_id); - return {decode_query_ts, decode_local_query_id, decode_server_id}; + return isUnknown() ? "MPP" : fmt::format("MPP", query_id.toDebugString(), start_ts, task_id); } +const MPPTaskId MPPTaskId::unknown_mpp_task_id = MPPTaskId{}; + +constexpr UInt64 MAX_UINT64 = std::numeric_limits::max(); +const MPPQueryId MPPTaskId::Max_Query_Id = MPPQueryId(MAX_UINT64, MAX_UINT64, MAX_UINT64, MAX_UINT64); + bool operator==(const MPPTaskId & lid, const MPPTaskId & rid) { return lid.query_id == rid.query_id && lid.task_id == rid.task_id; diff --git a/dbms/src/Flash/Mpp/MPPTaskId.h b/dbms/src/Flash/Mpp/MPPTaskId.h index 0c411e11f9c..e2690effd87 100644 --- a/dbms/src/Flash/Mpp/MPPTaskId.h +++ b/dbms/src/Flash/Mpp/MPPTaskId.h @@ -15,53 +15,64 @@ #pragma once #include +#include namespace DB { +// global unique MPP query id. +struct MPPQueryId +{ + UInt64 query_ts; + UInt64 local_query_id; + UInt64 server_id; + UInt64 start_ts; + MPPQueryId(UInt64 query_ts, UInt64 local_query_id, UInt64 server_id, UInt64 start_ts) + : query_ts(query_ts) + , local_query_id(local_query_id) + , server_id(server_id) + , start_ts(start_ts) + {} + bool operator<(const MPPQueryId & mpp_query_id) const; + bool operator==(const MPPQueryId & rid) const; + bool operator!=(const MPPQueryId & rid) const; + bool operator<=(const MPPQueryId & rid) const; + String toDebugString() const + { + return fmt::format("query_ts:{}, local_query_id:{}, server_id:{}, start_ts:{}", query_ts, local_query_id, server_id, start_ts); + } +}; + +struct MPPQueryIdHash +{ + size_t operator()(MPPQueryId const & mpp_query_id) const noexcept; +}; + // Identify a mpp task. struct MPPTaskId { MPPTaskId() : start_ts(0) - , server_id(0) - , query_ts(0) - , task_id(unknown_task_id){}; + , task_id(unknown_task_id) + , query_id({0, 0, 0, 0}){}; - MPPTaskId(UInt64 start_ts_, Int64 task_id_, UInt64 server_id_, UInt64 query_ts_, UInt64 local_query_id_) + MPPTaskId(UInt64 start_ts_, Int64 task_id_, UInt64 server_id, UInt64 query_ts, UInt64 local_query_id) : start_ts(start_ts_) - , server_id(server_id_) - , query_ts(query_ts_) , task_id(task_id_) - , local_query_id(local_query_id_) - { - query_id = generateQueryID(query_ts, local_query_id, server_id, start_ts_); - } + , query_id(query_ts, local_query_id, server_id, start_ts_) + {} UInt64 start_ts; - UInt64 server_id; - UInt64 query_ts; Int64 task_id; - UInt64 local_query_id; - String query_id; + MPPQueryId query_id; bool isUnknown() const { return task_id == unknown_task_id; } String toString() const; static const MPPTaskId unknown_mpp_task_id; - static const String Max_Query_Id; - static String generateQueryID(UInt64 query_ts_, UInt64 local_query_id_, UInt64 server_id_, UInt64 start_ts_); - static std::tuple decodeQueryID(String query_id_str); + static const MPPQueryId Max_Query_Id; private: static constexpr Int64 unknown_task_id = -1; - - static String intToHex(UInt64 i) - { - std::stringstream stream; - stream << std::setfill('0') << std::setw(sizeof(UInt64) * 2) - << std::hex << i; - return stream.str(); - } }; bool operator==(const MPPTaskId & lid, const MPPTaskId & rid); @@ -75,7 +86,7 @@ class hash public: size_t operator()(const DB::MPPTaskId & id) const { - return hash()(id.query_id); + return DB::MPPQueryIdHash()(id.query_id) ^ hash()(id.task_id); } }; } // namespace std \ No newline at end of file diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.cpp b/dbms/src/Flash/Mpp/MPPTaskManager.cpp index ed3096db163..41637cc0237 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.cpp +++ b/dbms/src/Flash/Mpp/MPPTaskManager.cpp @@ -35,7 +35,7 @@ MPPTaskManager::MPPTaskManager(MPPTaskSchedulerPtr scheduler_) , log(Logger::get()) {} -MPPQueryTaskSetPtr MPPTaskManager::addMPPQueryTaskSet(String query_id) +MPPQueryTaskSetPtr MPPTaskManager::addMPPQueryTaskSet(const MPPQueryId & query_id) { auto ptr = std::make_shared(); mpp_query_map.insert({query_id, ptr}); @@ -43,7 +43,7 @@ MPPQueryTaskSetPtr MPPTaskManager::addMPPQueryTaskSet(String query_id) return ptr; } -void MPPTaskManager::removeMPPQueryTaskSet(String query_id, bool on_abort) +void MPPTaskManager::removeMPPQueryTaskSet(const MPPQueryId & query_id, bool on_abort) { scheduler->deleteQuery(query_id, *this, on_abort); mpp_query_map.erase(query_id); @@ -62,7 +62,7 @@ std::pair MPPTaskManager::findAsyncTunnel(const ::mpp::Est if (query_it != mpp_query_map.end() && !query_it->second->isInNormalState()) { /// if the query is aborted, return the error message - LOG_WARNING(log, fmt::format("Query {} is aborted, all its tasks are invalid.", id.query_id)); + LOG_WARNING(log, fmt::format("Query {} is aborted, all its tasks are invalid.", id.query_id.toDebugString())); /// meet error return {nullptr, query_it->second->error_message}; } @@ -100,7 +100,7 @@ std::pair MPPTaskManager::findAsyncTunnel(const ::mpp::Est cv.notify_all(); } } - return {nullptr, fmt::format("Can't find task [{},{},{}] within 10 s.", id.query_id, id.start_ts, id.task_id)}; + return {nullptr, fmt::format("Can't find task [{},{},{}] within 10 s.", id.query_id.toDebugString(), id.start_ts, id.task_id)}; } } /// don't need to delete the alarm here because registerMPPTask will delete all the related alarm @@ -127,7 +127,7 @@ std::pair MPPTaskManager::findTunnelWithTimeout(const ::mp else if (!query_it->second->isInNormalState()) { /// if the query is aborted, return true to stop waiting timeout. - LOG_WARNING(log, fmt::format("Query {} is aborted, all its tasks are invalid.", id.query_id)); + LOG_WARNING(log, fmt::format("Query {} is aborted, all its tasks are invalid.", id.query_id.toDebugString())); cancelled = true; error_message = query_it->second->error_message; return true; @@ -147,9 +147,9 @@ std::pair MPPTaskManager::findTunnelWithTimeout(const ::mp return it->second->getTunnel(request); } -void MPPTaskManager::abortMPPQuery(String query_id, const String & reason, AbortType abort_type) +void MPPTaskManager::abortMPPQuery(const MPPQueryId & query_id, const String & reason, AbortType abort_type) { - LOG_WARNING(log, fmt::format("Begin to abort query: {}, abort type: {}, reason: {}", query_id, magic_enum::enum_name(abort_type), reason)); + LOG_WARNING(log, fmt::format("Begin to abort query: {}, abort type: {}, reason: {}", query_id.toDebugString(), magic_enum::enum_name(abort_type), reason)); MPPQueryTaskSetPtr task_set; { /// abort task may take a long time, so first @@ -159,12 +159,12 @@ void MPPTaskManager::abortMPPQuery(String query_id, const String & reason, Abort auto it = mpp_query_map.find(query_id); if (it == mpp_query_map.end()) { - LOG_WARNING(log, fmt::format("{} does not found in task manager, skip abort", query_id)); + LOG_WARNING(log, fmt::format("{} does not found in task manager, skip abort", query_id.toDebugString())); return; } else if (!it->second->isInNormalState()) { - LOG_WARNING(log, fmt::format("{} already in abort process, skip abort", query_id)); + LOG_WARNING(log, fmt::format("{} already in abort process, skip abort", query_id.toDebugString())); return; } it->second->state = MPPQueryTaskSet::Aborting; @@ -178,7 +178,7 @@ void MPPTaskManager::abortMPPQuery(String query_id, const String & reason, Abort it->second->alarms.clear(); if (it->second->task_map.empty()) { - LOG_INFO(log, fmt::format("There is no mpp task for {}, finish abort", query_id)); + LOG_INFO(log, fmt::format("There is no mpp task for {}, finish abort", query_id.toDebugString())); removeMPPQueryTaskSet(query_id, true); cv.notify_all(); return; @@ -189,7 +189,7 @@ void MPPTaskManager::abortMPPQuery(String query_id, const String & reason, Abort } FmtBuffer fmt_buf; - fmt_buf.fmtAppend("Remaining task in query {} are: ", query_id); + fmt_buf.fmtAppend("Remaining task in query {} are: ", query_id.toDebugString()); for (auto & it : task_set->task_map) fmt_buf.fmtAppend("{} ", it.first.toString()); LOG_WARNING(log, fmt_buf.toString()); @@ -200,11 +200,11 @@ void MPPTaskManager::abortMPPQuery(String query_id, const String & reason, Abort { std::lock_guard lock(mu); auto it = mpp_query_map.find(query_id); - RUNTIME_ASSERT(it != mpp_query_map.end(), log, "MPPTaskQuerySet {} should remaining in MPPTaskManager", query_id); + RUNTIME_ASSERT(it != mpp_query_map.end(), log, "MPPTaskQuerySet {} should remaining in MPPTaskManager", query_id.toDebugString()); it->second->state = MPPQueryTaskSet::Aborted; cv.notify_all(); } - LOG_WARNING(log, "Finish abort query: " + query_id); + LOG_WARNING(log, "Finish abort query: " + query_id.toDebugString()); } std::pair MPPTaskManager::registerTask(MPPTaskPtr task) @@ -282,13 +282,13 @@ String MPPTaskManager::toString() return res + ")"; } -MPPQueryTaskSetPtr MPPTaskManager::getQueryTaskSetWithoutLock(String query_id) +MPPQueryTaskSetPtr MPPTaskManager::getQueryTaskSetWithoutLock(const MPPQueryId & query_id) { auto it = mpp_query_map.find(query_id); return it == mpp_query_map.end() ? nullptr : it->second; } -MPPQueryTaskSetPtr MPPTaskManager::getQueryTaskSet(String query_id) +MPPQueryTaskSetPtr MPPTaskManager::getQueryTaskSet(const MPPQueryId & query_id) { std::lock_guard lock(mu); return getQueryTaskSetWithoutLock(query_id); diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.h b/dbms/src/Flash/Mpp/MPPTaskManager.h index ac7ce2fb24e..b986d8ae866 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.h +++ b/dbms/src/Flash/Mpp/MPPTaskManager.h @@ -57,7 +57,7 @@ using MPPQueryTaskSetPtr = std::shared_ptr; /// a map from the mpp query id to mpp query task set, we use /// the query_ts + local_query_id + serverID as the query id, because TiDB can't guarantee /// the uniqueness of the start ts when stale read or set snapshot -using MPPQueryMap = std::unordered_map; +using MPPQueryMap = std::unordered_map; // MPPTaskManger holds all running mpp tasks. It's a single instance holden in Context. class MPPTaskManager : private boost::noncopyable @@ -77,9 +77,9 @@ class MPPTaskManager : private boost::noncopyable ~MPPTaskManager() = default; - MPPQueryTaskSetPtr getQueryTaskSetWithoutLock(String query_id); + MPPQueryTaskSetPtr getQueryTaskSetWithoutLock(const MPPQueryId & query_id); - MPPQueryTaskSetPtr getQueryTaskSet(String query_id); + MPPQueryTaskSetPtr getQueryTaskSet(const MPPQueryId & query_id); std::pair registerTask(MPPTaskPtr task); @@ -93,13 +93,13 @@ class MPPTaskManager : private boost::noncopyable std::pair findAsyncTunnel(const ::mpp::EstablishMPPConnectionRequest * request, EstablishCallData * call_data, grpc::CompletionQueue * cq); - void abortMPPQuery(String query_id, const String & reason, AbortType abort_type); + void abortMPPQuery(const MPPQueryId & query_id, const String & reason, AbortType abort_type); String toString(); private: - MPPQueryTaskSetPtr addMPPQueryTaskSet(String query_id); - void removeMPPQueryTaskSet(String query_id, bool on_abort); + MPPQueryTaskSetPtr addMPPQueryTaskSet(const MPPQueryId & query_id); + void removeMPPQueryTaskSet(const MPPQueryId & query_id, bool on_abort); }; } // namespace DB diff --git a/dbms/src/Flash/Mpp/MPPTaskStatistics.cpp b/dbms/src/Flash/Mpp/MPPTaskStatistics.cpp index a4437ba10b6..1bef6568b05 100644 --- a/dbms/src/Flash/Mpp/MPPTaskStatistics.cpp +++ b/dbms/src/Flash/Mpp/MPPTaskStatistics.cpp @@ -96,17 +96,15 @@ void MPPTaskStatistics::logTracingJson() { LOG_INFO( logger, - R"({{"query_tso":{},"query_ts":{},"task_id":{},"local_query_id":{},"server_id":{},"is_root":{},"sender_executor_id":"{}","executors":{},"host":"{}")" + R"({{"query_tso":{},"task_id":{},"query_id":{},"is_root":{},"sender_executor_id":"{}","executors":{},"host":"{}")" R"(,"task_init_timestamp":{},"task_start_timestamp":{},"task_end_timestamp":{})" R"(,"compile_start_timestamp":{},"compile_end_timestamp":{})" R"(,"read_wait_index_start_timestamp":{},"read_wait_index_end_timestamp":{})" R"(,"local_input_bytes":{},"remote_input_bytes":{},"output_bytes":{})" R"(,"status":"{}","error_message":"{}","working_time":{},"memory_peak":{}}})", id.start_ts, - id.query_ts, id.task_id, - id.local_query_id, - id.server_id, + id.query_id.toDebugString(), is_root, sender_executor_id, executor_statistics_collector.resToJson(), diff --git a/dbms/src/Flash/Mpp/MinTSOScheduler.cpp b/dbms/src/Flash/Mpp/MinTSOScheduler.cpp index e5e4feb24d5..e91cc75dc10 100644 --- a/dbms/src/Flash/Mpp/MinTSOScheduler.cpp +++ b/dbms/src/Flash/Mpp/MinTSOScheduler.cpp @@ -56,8 +56,7 @@ MinTSOScheduler::MinTSOScheduler(UInt64 soft_limit, UInt64 hard_limit, UInt64 ac { LOG_INFO(log, "thread_hard_limit is {}, thread_soft_limit is {}, and active_set_soft_limit is {} in MinTSOScheduler.", thread_hard_limit, thread_soft_limit, active_set_soft_limit); } - auto [query_ts, local_query_id, server_id] = MPPTaskId::decodeQueryID(min_query_id); - GET_METRIC(tiflash_task_scheduler, type_min_query_ts).Set(query_ts); + GET_METRIC(tiflash_task_scheduler, type_min_query_ts).Set(min_query_id.query_ts); GET_METRIC(tiflash_task_scheduler, type_thread_soft_limit).Set(thread_soft_limit); GET_METRIC(tiflash_task_scheduler, type_thread_hard_limit).Set(thread_hard_limit); GET_METRIC(tiflash_task_scheduler, type_estimated_thread_usage).Set(estimated_thread_usage); @@ -89,14 +88,14 @@ bool MinTSOScheduler::tryToSchedule(MPPTaskScheduleEntry & schedule_entry, MPPTa /// after finishing the query, there would be no threads released soon, so the updated min-query-id query with waiting tasks should be scheduled. /// the cancelled query maybe hang, so trigger scheduling as needed when deleting cancelled query. -void MinTSOScheduler::deleteQuery(const String query_id, MPPTaskManager & task_manager, const bool is_cancelled) +void MinTSOScheduler::deleteQuery(const MPPQueryId & query_id, MPPTaskManager & task_manager, const bool is_cancelled) { if (isDisabled()) { return; } - LOG_DEBUG(log, "{} query {} (is min = {}) is deleted from active set {} left {} or waiting set {} left {}.", is_cancelled ? "Cancelled" : "Finished", query_id, query_id == min_query_id, active_set.find(query_id) != active_set.end(), active_set.size(), waiting_set.find(query_id) != waiting_set.end(), waiting_set.size()); + LOG_DEBUG(log, "{} query {} (is min = {}) is deleted from active set {} left {} or waiting set {} left {}.", is_cancelled ? "Cancelled" : "Finished", query_id.toDebugString(), query_id == min_query_id, active_set.find(query_id) != active_set.end(), active_set.size(), waiting_set.find(query_id) != waiting_set.end(), waiting_set.size()); active_set.erase(query_id); waiting_set.erase(query_id); GET_METRIC(tiflash_task_scheduler, type_waiting_queries_count).Set(waiting_set.size()); @@ -152,7 +151,7 @@ void MinTSOScheduler::scheduleWaitingQueries(MPPTaskManager & task_manager) auto query_task_set = task_manager.getQueryTaskSetWithoutLock(current_query_id); if (nullptr == query_task_set) /// silently solve this rare case { - LOG_ERROR(log, "the waiting query {} is not in the task manager.", current_query_id); + LOG_ERROR(log, "the waiting query {} is not in the task manager.", current_query_id.toDebugString()); updateMinQueryId(current_query_id, true, "as it is not in the task manager."); active_set.erase(current_query_id); waiting_set.erase(current_query_id); @@ -161,7 +160,7 @@ void MinTSOScheduler::scheduleWaitingQueries(MPPTaskManager & task_manager) continue; } - LOG_DEBUG(log, "query {} (is min = {}) with {} tasks is to be scheduled from waiting set (size = {}).", current_query_id, current_query_id == min_query_id, query_task_set->waiting_tasks.size(), waiting_set.size()); + LOG_DEBUG(log, "query {} (is min = {}) with {} tasks is to be scheduled from waiting set (size = {}).", current_query_id.toDebugString(), current_query_id == min_query_id, query_task_set->waiting_tasks.size(), waiting_set.size()); /// schedule tasks one by one while (!query_task_set->waiting_tasks.empty()) { @@ -179,14 +178,14 @@ void MinTSOScheduler::scheduleWaitingQueries(MPPTaskManager & task_manager) query_task_set->waiting_tasks.pop(); GET_METRIC(tiflash_task_scheduler, type_waiting_tasks_count).Decrement(); } - LOG_DEBUG(log, "query {} (is min = {}) is scheduled from waiting set (size = {}).", current_query_id, current_query_id == min_query_id, waiting_set.size()); + LOG_DEBUG(log, "query {} (is min = {}) is scheduled from waiting set (size = {}).", current_query_id.toDebugString(), current_query_id == min_query_id, waiting_set.size()); waiting_set.erase(current_query_id); /// all waiting tasks of this query are fully active GET_METRIC(tiflash_task_scheduler, type_waiting_queries_count).Set(waiting_set.size()); } } /// [directly schedule, from waiting set] * [is min_query_id query, not] * [can schedule, can't] totally 8 cases. -bool MinTSOScheduler::scheduleImp(const String query_id, const MPPQueryTaskSetPtr & query_task_set, MPPTaskScheduleEntry & schedule_entry, const bool isWaiting, bool & has_error) +bool MinTSOScheduler::scheduleImp(const MPPQueryId & query_id, const MPPQueryTaskSetPtr & query_task_set, MPPTaskScheduleEntry & schedule_entry, const bool isWaiting, bool & has_error) { auto needed_threads = schedule_entry.getNeededThreads(); auto check_for_new_min_tso = query_id <= min_query_id && estimated_thread_usage + needed_threads <= thread_hard_limit; @@ -212,7 +211,7 @@ bool MinTSOScheduler::scheduleImp(const String query_id, const MPPQueryTaskSetPt if (is_query_id_min) /// the min_query_id query should fully run, otherwise throw errors here. { has_error = true; - auto msg = fmt::format("threads are unavailable for the query {} ({} min_query_id {}) {}, need {}, but used {} of the thread hard limit {}, {} active and {} waiting queries.", query_id, query_id == min_query_id ? "is" : "is newer than", min_query_id, isWaiting ? "from the waiting set" : "when directly schedule it", needed_threads, estimated_thread_usage, thread_hard_limit, active_set.size(), waiting_set.size()); + auto msg = fmt::format("threads are unavailable for the query {} ({} min_query_id {}) {}, need {}, but used {} of the thread hard limit {}, {} active and {} waiting queries.", query_id.toDebugString(), query_id == min_query_id ? "is" : "is newer than", min_query_id.toDebugString(), isWaiting ? "from the waiting set" : "when directly schedule it", needed_threads, estimated_thread_usage, thread_hard_limit, active_set.size(), waiting_set.size()); LOG_ERROR(log, "{}", msg); GET_METRIC(tiflash_task_scheduler, type_hard_limit_exceeded_count).Increment(); if (isWaiting) @@ -234,13 +233,13 @@ bool MinTSOScheduler::scheduleImp(const String query_id, const MPPQueryTaskSetPt GET_METRIC(tiflash_task_scheduler, type_waiting_queries_count).Set(waiting_set.size()); GET_METRIC(tiflash_task_scheduler, type_waiting_tasks_count).Increment(); } - LOG_INFO(log, "threads are unavailable for the query {} or active set is full (size = {}), need {}, but used {} of the thread soft limit {},{} waiting set size = {}", query_id, active_set.size(), needed_threads, estimated_thread_usage, thread_soft_limit, isWaiting ? "" : " put into", waiting_set.size()); + LOG_INFO(log, "threads are unavailable for the query {} or active set is full (size = {}), need {}, but used {} of the thread soft limit {},{} waiting set size = {}", query_id.toDebugString(), active_set.size(), needed_threads, estimated_thread_usage, thread_soft_limit, isWaiting ? "" : " put into", waiting_set.size()); return false; } } /// if return true, then need to schedule the waiting tasks of the min_query_id. -bool MinTSOScheduler::updateMinQueryId(const String query_id, const bool retired, const String & msg) +bool MinTSOScheduler::updateMinQueryId(const MPPQueryId & query_id, const bool retired, const String & msg) { auto old_min_query_id = min_query_id; bool force_scheduling = false; @@ -259,13 +258,8 @@ bool MinTSOScheduler::updateMinQueryId(const String query_id, const bool retired } if (min_query_id != old_min_query_id) /// if min_query_id == MPPTaskId::Max_Query_Id and the query_id is not to be cancelled, the used_threads, active_set.size() and waiting_set.size() must be 0. { - auto [query_ts, local_query_id, server_id] = MPPTaskId::decodeQueryID(min_query_id); - if (query_ts == 0) - { - query_ts = server_id; - } - GET_METRIC(tiflash_task_scheduler, type_min_query_ts).Set(query_ts); - LOG_INFO(log, "min_query_id query is updated from {} to {} {}, used threads = {}, {} active and {} waiting queries.", old_min_query_id, min_query_id, msg, estimated_thread_usage, active_set.size(), waiting_set.size()); + GET_METRIC(tiflash_task_scheduler, type_min_query_ts).Set(min_query_id.query_ts == 0 ? min_query_id.start_ts : min_query_id.query_ts); + LOG_INFO(log, "min_query_id query is updated from {} to {} {}, used threads = {}, {} active and {} waiting queries.", old_min_query_id.toDebugString(), min_query_id.toDebugString(), msg, estimated_thread_usage, active_set.size(), waiting_set.size()); } return force_scheduling; } diff --git a/dbms/src/Flash/Mpp/MinTSOScheduler.h b/dbms/src/Flash/Mpp/MinTSOScheduler.h index e202f22a11c..dfe2921d1fb 100644 --- a/dbms/src/Flash/Mpp/MinTSOScheduler.h +++ b/dbms/src/Flash/Mpp/MinTSOScheduler.h @@ -43,22 +43,22 @@ class MinTSOScheduler : private boost::noncopyable /// delete this to-be cancelled/finished query from scheduler and update min_query_id if needed, so that there aren't cancelled/finished queries in the scheduler. /// NOTE: call deleteQuery under the lock protection of MPPTaskManager - void deleteQuery(const String query_id, MPPTaskManager & task_manager, const bool is_cancelled); + void deleteQuery(const MPPQueryId & query_id, MPPTaskManager & task_manager, const bool is_cancelled); /// all scheduled tasks should finally call this function to release threads and schedule new tasks void releaseThreadsThenSchedule(const int needed_threads, MPPTaskManager & task_manager); private: - bool scheduleImp(const String query_id, const MPPQueryTaskSetPtr & query_task_set, MPPTaskScheduleEntry & schedule_entry, const bool isWaiting, bool & has_error); - bool updateMinQueryId(const String query_id, const bool retired, const String & msg); + bool scheduleImp(const MPPQueryId & query_id, const MPPQueryTaskSetPtr & query_task_set, MPPTaskScheduleEntry & schedule_entry, const bool isWaiting, bool & has_error); + bool updateMinQueryId(const MPPQueryId & query_id, const bool retired, const String & msg); void scheduleWaitingQueries(MPPTaskManager & task_manager); bool isDisabled() { return thread_hard_limit == 0 && thread_soft_limit == 0; } - std::set waiting_set; - std::set active_set; - String min_query_id; + std::set waiting_set; + std::set active_set; + MPPQueryId min_query_id; UInt64 thread_soft_limit; UInt64 thread_hard_limit; UInt64 estimated_thread_usage; diff --git a/dbms/src/Flash/Mpp/tests/gtest_mpp_exchange_writer.cpp b/dbms/src/Flash/Mpp/tests/gtest_mpp_exchange_writer.cpp index bf4a72693de..fa3456d96eb 100644 --- a/dbms/src/Flash/Mpp/tests/gtest_mpp_exchange_writer.cpp +++ b/dbms/src/Flash/Mpp/tests/gtest_mpp_exchange_writer.cpp @@ -136,23 +136,6 @@ struct MockExchangeWriter uint16_t part_num; }; -TEST_F(TestMPPExchangeWriter, testQueryId) -try -{ - constexpr UInt64 MAX_UINT64 = std::numeric_limits::max(); - constexpr UInt64 MAX_INT64 = std::numeric_limits::max(); - Int64 test = std::stoull("7FFFFFFFFFFFFFFF", 0, 16); - EXPECT_EQ(test, MAX_INT64); - - String query_id = MPPTaskId::generateQueryID(MAX_INT64, 1, MAX_UINT64, MAX_UINT64); - EXPECT_EQ(query_id, "7fffffffffffffff0000000000000001ffffffffffffffff"); - auto [query_ts, local_query_id, server_id] = MPPTaskId::decodeQueryID(query_id); - EXPECT_EQ(query_ts, MAX_INT64); - EXPECT_EQ(server_id, MAX_UINT64); - EXPECT_EQ(double(local_query_id), 1); -} -CATCH - // Input block data is distributed uniform. // partition_num: 4 // fine_grained_shuffle_stream_count: 8 diff --git a/dbms/src/Flash/tests/gtest_compute_server.cpp b/dbms/src/Flash/tests/gtest_compute_server.cpp index 63ca54371e8..264db3ea876 100644 --- a/dbms/src/Flash/tests/gtest_compute_server.cpp +++ b/dbms/src/Flash/tests/gtest_compute_server.cpp @@ -363,7 +363,7 @@ try // start 10 queries { - std::vector>> queries; + std::vector>> queries; for (size_t i = 0; i < 10; ++i) { queries.push_back(prepareMPPStreams(context diff --git a/dbms/src/Storages/Transaction/LearnerRead.cpp b/dbms/src/Storages/Transaction/LearnerRead.cpp index 3453cd6f52e..8db5c5d5715 100644 --- a/dbms/src/Storages/Transaction/LearnerRead.cpp +++ b/dbms/src/Storages/Transaction/LearnerRead.cpp @@ -237,7 +237,7 @@ LearnerReadSnapshot doLearnerRead( else { batch_read_index_result.emplace(region_id, kvrpcpb::ReadIndexResponse()); - stale_read_count++; + ++stale_read_count; } } } diff --git a/dbms/src/TestUtils/MPPTaskTestUtils.cpp b/dbms/src/TestUtils/MPPTaskTestUtils.cpp index 19e0083dd3e..5f26f41a065 100644 --- a/dbms/src/TestUtils/MPPTaskTestUtils.cpp +++ b/dbms/src/TestUtils/MPPTaskTestUtils.cpp @@ -69,7 +69,7 @@ size_t MPPTaskTestUtils::serverNum() return server_num; } -std::tuple> MPPTaskTestUtils::prepareMPPStreams(DAGRequestBuilder builder) +std::tuple> MPPTaskTestUtils::prepareMPPStreams(DAGRequestBuilder builder) { auto properties = DB::tests::getDAGPropertiesForTest(serverNum()); auto tasks = builder.buildMPPTasks(context, properties); @@ -77,7 +77,7 @@ std::tuple> MPPTaskTestUtils::prepareMP TiFlashTestEnv::getGlobalContext(i).setCancelTest(); MockComputeServerManager::instance().setMockStorage(context.mockStorage()); auto res = executeMPPQueryWithMultipleContext(properties, tasks, MockComputeServerManager::instance().getServerConfigMap()); - return {MPPTaskId::generateQueryID(properties.query_ts, properties.local_query_id, properties.server_id, properties.start_ts), res}; + return {MPPQueryId(properties.query_ts, properties.local_query_id, properties.server_id, properties.start_ts), res}; } ColumnsWithTypeAndName MPPTaskTestUtils::exeucteMPPTasks(QueryTasks & tasks, const DAGProperties & properties, std::unordered_map & server_config_map) @@ -134,7 +134,7 @@ String MPPTaskTestUtils::queryInfo(size_t server_id) return buf.toString(); } -::testing::AssertionResult MPPTaskTestUtils::assertQueryCancelled(String query_id) +::testing::AssertionResult MPPTaskTestUtils::assertQueryCancelled(const MPPQueryId & query_id) { auto seconds = std::chrono::seconds(1); auto retry_times = 0; @@ -155,13 +155,13 @@ ::testing::AssertionResult MPPTaskTestUtils::assertQueryCancelled(String query_i return ::testing::AssertionSuccess(); } -::testing::AssertionResult MPPTaskTestUtils::assertQueryActive(String query_id) +::testing::AssertionResult MPPTaskTestUtils::assertQueryActive(const MPPQueryId & query_id) { for (int i = test_meta.context_idx; i < TiFlashTestEnv::globalContextSize(); ++i) { if (TiFlashTestEnv::getGlobalContext(i).getTMTContext().getMPPTaskManager()->getQueryTaskSet(query_id) == nullptr) { - return ::testing::AssertionFailure() << "Query " << query_id << "not active" << std::endl; + return ::testing::AssertionFailure() << "Query " << query_id.toDebugString() << "not active" << std::endl; } } return ::testing::AssertionSuccess(); diff --git a/dbms/src/TestUtils/MPPTaskTestUtils.h b/dbms/src/TestUtils/MPPTaskTestUtils.h index d8aa64592d3..cb0e84a2a14 100644 --- a/dbms/src/TestUtils/MPPTaskTestUtils.h +++ b/dbms/src/TestUtils/MPPTaskTestUtils.h @@ -80,14 +80,14 @@ class MPPTaskTestUtils : public ExecutorTest static size_t serverNum(); // run mpp tasks which are ready to cancel, the return value is the start_ts of query. - std::tuple> prepareMPPStreams(DAGRequestBuilder builder); + std::tuple> prepareMPPStreams(DAGRequestBuilder builder); ColumnsWithTypeAndName exeucteMPPTasks(QueryTasks & tasks, const DAGProperties & properties, std::unordered_map & server_config_map); ColumnsWithTypeAndName executeCoprocessorTask(std::shared_ptr & dag_request); - static ::testing::AssertionResult assertQueryCancelled(String query_id); - static ::testing::AssertionResult assertQueryActive(String query_id); + static ::testing::AssertionResult assertQueryCancelled(const MPPQueryId & query_id); + static ::testing::AssertionResult assertQueryActive(const MPPQueryId & query_id); static String queryInfo(size_t server_id); protected: