diff --git a/dbms/src/Common/TiFlashMetrics.h b/dbms/src/Common/TiFlashMetrics.h index 8daf42ff550..e6c86911422 100644 --- a/dbms/src/Common/TiFlashMetrics.h +++ b/dbms/src/Common/TiFlashMetrics.h @@ -234,7 +234,7 @@ namespace DB F(type_active_rpc_async_worker, {"type", "rpc_async_worker_active"}), \ F(type_total_rpc_async_worker, {"type", "rpc_async_worker_total"})) \ M(tiflash_task_scheduler, "Min-tso task scheduler", Gauge, \ - F(type_min_query_ts, {"type", "min_query_ts"}), \ + F(type_min_tso, {"type", "min_tso"}), \ F(type_waiting_queries_count, {"type", "waiting_queries_count"}), \ F(type_active_queries_count, {"type", "active_queries_count"}), \ F(type_waiting_tasks_count, {"type", "waiting_tasks_count"}), \ diff --git a/dbms/src/Debug/dbgFuncMisc.cpp b/dbms/src/Debug/dbgFuncMisc.cpp index 84486936da8..42c9461ffae 100644 --- a/dbms/src/Debug/dbgFuncMisc.cpp +++ b/dbms/src/Debug/dbgFuncMisc.cpp @@ -29,7 +29,7 @@ inline size_t getReadTSOForLog(const String & line) { std::regex rx(R"((0|[1-9][0-9]*))"); std::smatch m; - // Rely on that MPP task prefix "MPP" + // Rely on that MPP task prefix "MPP" auto pos = line.find(", start_ts:"); if (pos != std::string::npos && regex_search(line.cbegin() + pos, line.cend(), m, rx)) { diff --git a/dbms/src/Flash/FlashService.cpp b/dbms/src/Flash/FlashService.cpp index 43e84269356..db3d42f70a1 100644 --- a/dbms/src/Flash/FlashService.cpp +++ b/dbms/src/Flash/FlashService.cpp @@ -366,7 +366,7 @@ grpc::Status FlashService::CancelMPPTask( auto & tmt_context = context->getTMTContext(); auto task_manager = tmt_context.getMPPTaskManager(); - task_manager->abortMPPQuery(MPPQueryId(request->meta().query_ts(), request->meta().local_query_id(), request->meta().server_id(), request->meta().start_ts()), "Receive cancel request from TiDB", AbortType::ONCANCELLATION); + task_manager->abortMPPQuery(MPPQueryId(request->meta()), "Receive cancel request from TiDB", AbortType::ONCANCELLATION); return grpc::Status::OK; } @@ -408,7 +408,7 @@ ::grpc::Status FlashService::cancelMPPTaskForTest(const ::mpp::CancelTaskRequest } auto & tmt_context = context->getTMTContext(); auto task_manager = tmt_context.getMPPTaskManager(); - task_manager->abortMPPQuery(MPPQueryId(request->meta().query_ts(), request->meta().local_query_id(), request->meta().server_id(), request->meta().start_ts()), "Receive cancel request from GTest", AbortType::ONCANCELLATION); + task_manager->abortMPPQuery(MPPQueryId(request->meta()), "Receive cancel request from GTest", AbortType::ONCANCELLATION); return grpc::Status::OK; } diff --git a/dbms/src/Flash/Mpp/MPPTask.cpp b/dbms/src/Flash/Mpp/MPPTask.cpp index 306fe8c833e..2452fa32b30 100644 --- a/dbms/src/Flash/Mpp/MPPTask.cpp +++ b/dbms/src/Flash/Mpp/MPPTask.cpp @@ -139,7 +139,7 @@ void MPPTask::registerTunnels(const mpp::DispatchTaskRequest & task_request) LOG_DEBUG(log, "begin to register the tunnel {}, is_local: {}, is_async: {}", tunnel->id(), is_local, is_async); if (status != INITIALIZING) throw Exception(fmt::format("The tunnel {} can not be registered, because the task is not in initializing state", tunnel->id())); - tunnel_set_local->registerTunnel(MPPTaskId{task_meta.start_ts(), task_meta.task_id(), task_meta.server_id(), task_meta.query_ts(), task_meta.local_query_id()}, tunnel); + tunnel_set_local->registerTunnel(MPPTaskId(task_meta), tunnel); if (!dag_context->isRootMPPTask()) { FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_during_mpp_register_tunnel_for_non_root_mpp_task); @@ -204,7 +204,7 @@ std::pair MPPTask::getTunnel(const ::mpp::EstablishMPPConn return {nullptr, err_msg}; } - MPPTaskId receiver_id{request->receiver_meta().start_ts(), request->receiver_meta().task_id(), request->receiver_meta().server_id(), request->receiver_meta().query_ts(), request->receiver_meta().local_query_id()}; + MPPTaskId receiver_id(request->receiver_meta()); RUNTIME_ASSERT(tunnel_set != nullptr, log, "mpp task without tunnel set"); auto tunnel_ptr = tunnel_set->getTunnelByReceiverTaskId(receiver_id); if (tunnel_ptr == nullptr) diff --git a/dbms/src/Flash/Mpp/MPPTaskId.cpp b/dbms/src/Flash/Mpp/MPPTaskId.cpp index 8b1ce324644..0872cb6e88a 100644 --- a/dbms/src/Flash/Mpp/MPPTaskId.cpp +++ b/dbms/src/Flash/Mpp/MPPTaskId.cpp @@ -13,17 +13,56 @@ // limitations under the License. #include +#include #include namespace DB { +bool isOldVersion(const MPPQueryId & mpp_query_id) +{ + return mpp_query_id.query_ts == 0 && mpp_query_id.local_query_id == 0 && mpp_query_id.server_id == 0; +} + + bool MPPQueryId::operator<(const MPPQueryId & mpp_query_id) const { - if (query_ts == 0 && local_query_id == 0 && server_id == 0) + // compare with MPP query generated by TiDB that version less than v6.6 + bool left_old_version = isOldVersion(*this); + bool right_old_version = isOldVersion(mpp_query_id); + if (unlikely(left_old_version && right_old_version)) { return start_ts < mpp_query_id.start_ts; } - return (query_ts < mpp_query_id.query_ts) || (local_query_id < mpp_query_id.local_query_id) || (server_id < mpp_query_id.server_id); + if (unlikely(left_old_version)) + { + return true; + } + if (unlikely(right_old_version)) + { + return false; + } + // compare with MPP query generated by TiDB that version after v6.6 + if (query_ts != mpp_query_id.query_ts) + { + return query_ts < mpp_query_id.query_ts; + } + if (server_id == mpp_query_id.server_id) + { + return local_query_id < mpp_query_id.local_query_id; + } + // now we can't compare reasonably, just choose one randomly by hash. + auto lhash = MPPQueryIdHash()(*this); + auto rhash = MPPQueryIdHash()(mpp_query_id); + if (lhash != rhash) + { + return lhash < rhash; + } + // hash values are same, just compare the rest fields. + if (local_query_id != mpp_query_id.local_query_id) + { + return local_query_id < mpp_query_id.local_query_id; + } + return server_id < mpp_query_id.server_id; } bool MPPQueryId::operator==(const MPPQueryId & rid) const { @@ -40,7 +79,7 @@ bool MPPQueryId::operator<=(const MPPQueryId & rid) const size_t MPPQueryIdHash::operator()(MPPQueryId const & mpp_query_id) const noexcept { - if (mpp_query_id.query_ts == 0 && mpp_query_id.local_query_id == 0 && mpp_query_id.server_id == 0) + if (isOldVersion(mpp_query_id)) { return std::hash()(mpp_query_id.start_ts); } @@ -49,7 +88,7 @@ size_t MPPQueryIdHash::operator()(MPPQueryId const & mpp_query_id) const noexcep String MPPTaskId::toString() const { - return isUnknown() ? "MPP" : fmt::format("MPP", query_id.toDebugString(), start_ts, task_id); + return isUnknown() ? "MPP" : fmt::format("MPP", query_id.toString(), task_id); } const MPPTaskId MPPTaskId::unknown_mpp_task_id = MPPTaskId{}; diff --git a/dbms/src/Flash/Mpp/MPPTaskId.h b/dbms/src/Flash/Mpp/MPPTaskId.h index e2690effd87..59045759c1a 100644 --- a/dbms/src/Flash/Mpp/MPPTaskId.h +++ b/dbms/src/Flash/Mpp/MPPTaskId.h @@ -16,6 +16,7 @@ #include #include +#include namespace DB { @@ -32,11 +33,17 @@ struct MPPQueryId , server_id(server_id) , start_ts(start_ts) {} + MPPQueryId(const mpp::TaskMeta & task_meta) + : query_ts(task_meta.query_ts()) + , local_query_id(task_meta.local_query_id()) + , server_id(task_meta.server_id()) + , start_ts(task_meta.start_ts()) + {} bool operator<(const MPPQueryId & mpp_query_id) const; bool operator==(const MPPQueryId & rid) const; bool operator!=(const MPPQueryId & rid) const; bool operator<=(const MPPQueryId & rid) const; - String toDebugString() const + String toString() const { return fmt::format("query_ts:{}, local_query_id:{}, server_id:{}, start_ts:{}", query_ts, local_query_id, server_id, start_ts); } @@ -51,17 +58,19 @@ struct MPPQueryIdHash struct MPPTaskId { MPPTaskId() - : start_ts(0) - , task_id(unknown_task_id) + : task_id(unknown_task_id) , query_id({0, 0, 0, 0}){}; - MPPTaskId(UInt64 start_ts_, Int64 task_id_, UInt64 server_id, UInt64 query_ts, UInt64 local_query_id) - : start_ts(start_ts_) - , task_id(task_id_) - , query_id(query_ts, local_query_id, server_id, start_ts_) + MPPTaskId(UInt64 start_ts, Int64 task_id_, UInt64 server_id, UInt64 query_ts, UInt64 local_query_id) + : task_id(task_id_) + , query_id(query_ts, local_query_id, server_id, start_ts) + {} + + MPPTaskId(const mpp::TaskMeta & task_meta) + : task_id(task_meta.task_id()) + , query_id(task_meta) {} - UInt64 start_ts; Int64 task_id; MPPQueryId query_id; diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.cpp b/dbms/src/Flash/Mpp/MPPTaskManager.cpp index 41637cc0237..6f2eb552a2a 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.cpp +++ b/dbms/src/Flash/Mpp/MPPTaskManager.cpp @@ -53,7 +53,7 @@ void MPPTaskManager::removeMPPQueryTaskSet(const MPPQueryId & query_id, bool on_ std::pair MPPTaskManager::findAsyncTunnel(const ::mpp::EstablishMPPConnectionRequest * request, EstablishCallData * call_data, grpc::CompletionQueue * cq) { const auto & meta = request->sender_meta(); - MPPTaskId id{meta.start_ts(), meta.task_id(), meta.server_id(), meta.query_ts(), meta.local_query_id()}; + MPPTaskId id{meta}; Int64 sender_task_id = meta.task_id(); Int64 receiver_task_id = request->receiver_meta().task_id(); @@ -62,7 +62,7 @@ std::pair MPPTaskManager::findAsyncTunnel(const ::mpp::Est if (query_it != mpp_query_map.end() && !query_it->second->isInNormalState()) { /// if the query is aborted, return the error message - LOG_WARNING(log, fmt::format("Query {} is aborted, all its tasks are invalid.", id.query_id.toDebugString())); + LOG_WARNING(log, fmt::format("Query {} is aborted, all its tasks are invalid.", id.query_id.toString())); /// meet error return {nullptr, query_it->second->error_message}; } @@ -100,7 +100,7 @@ std::pair MPPTaskManager::findAsyncTunnel(const ::mpp::Est cv.notify_all(); } } - return {nullptr, fmt::format("Can't find task [{},{},{}] within 10 s.", id.query_id.toDebugString(), id.start_ts, id.task_id)}; + return {nullptr, fmt::format("Can't find task [{},{}] within 10 s.", id.query_id.toString(), id.task_id)}; } } /// don't need to delete the alarm here because registerMPPTask will delete all the related alarm @@ -112,7 +112,7 @@ std::pair MPPTaskManager::findAsyncTunnel(const ::mpp::Est std::pair MPPTaskManager::findTunnelWithTimeout(const ::mpp::EstablishMPPConnectionRequest * request, std::chrono::seconds timeout) { const auto & meta = request->sender_meta(); - MPPTaskId id{meta.start_ts(), meta.task_id(), meta.server_id(), meta.query_ts(), meta.local_query_id()}; + MPPTaskId id{meta}; std::unordered_map::iterator it; bool cancelled = false; String error_message; @@ -127,7 +127,7 @@ std::pair MPPTaskManager::findTunnelWithTimeout(const ::mp else if (!query_it->second->isInNormalState()) { /// if the query is aborted, return true to stop waiting timeout. - LOG_WARNING(log, fmt::format("Query {} is aborted, all its tasks are invalid.", id.query_id.toDebugString())); + LOG_WARNING(log, fmt::format("Query {} is aborted, all its tasks are invalid.", id.query_id.toString())); cancelled = true; error_message = query_it->second->error_message; return true; @@ -149,7 +149,7 @@ std::pair MPPTaskManager::findTunnelWithTimeout(const ::mp void MPPTaskManager::abortMPPQuery(const MPPQueryId & query_id, const String & reason, AbortType abort_type) { - LOG_WARNING(log, fmt::format("Begin to abort query: {}, abort type: {}, reason: {}", query_id.toDebugString(), magic_enum::enum_name(abort_type), reason)); + LOG_WARNING(log, fmt::format("Begin to abort query: {}, abort type: {}, reason: {}", query_id.toString(), magic_enum::enum_name(abort_type), reason)); MPPQueryTaskSetPtr task_set; { /// abort task may take a long time, so first @@ -159,12 +159,12 @@ void MPPTaskManager::abortMPPQuery(const MPPQueryId & query_id, const String & r auto it = mpp_query_map.find(query_id); if (it == mpp_query_map.end()) { - LOG_WARNING(log, fmt::format("{} does not found in task manager, skip abort", query_id.toDebugString())); + LOG_WARNING(log, fmt::format("{} does not found in task manager, skip abort", query_id.toString())); return; } else if (!it->second->isInNormalState()) { - LOG_WARNING(log, fmt::format("{} already in abort process, skip abort", query_id.toDebugString())); + LOG_WARNING(log, fmt::format("{} already in abort process, skip abort", query_id.toString())); return; } it->second->state = MPPQueryTaskSet::Aborting; @@ -178,7 +178,7 @@ void MPPTaskManager::abortMPPQuery(const MPPQueryId & query_id, const String & r it->second->alarms.clear(); if (it->second->task_map.empty()) { - LOG_INFO(log, fmt::format("There is no mpp task for {}, finish abort", query_id.toDebugString())); + LOG_INFO(log, fmt::format("There is no mpp task for {}, finish abort", query_id.toString())); removeMPPQueryTaskSet(query_id, true); cv.notify_all(); return; @@ -189,7 +189,7 @@ void MPPTaskManager::abortMPPQuery(const MPPQueryId & query_id, const String & r } FmtBuffer fmt_buf; - fmt_buf.fmtAppend("Remaining task in query {} are: ", query_id.toDebugString()); + fmt_buf.fmtAppend("Remaining task in query {} are: ", query_id.toString()); for (auto & it : task_set->task_map) fmt_buf.fmtAppend("{} ", it.first.toString()); LOG_WARNING(log, fmt_buf.toString()); @@ -200,11 +200,11 @@ void MPPTaskManager::abortMPPQuery(const MPPQueryId & query_id, const String & r { std::lock_guard lock(mu); auto it = mpp_query_map.find(query_id); - RUNTIME_ASSERT(it != mpp_query_map.end(), log, "MPPTaskQuerySet {} should remaining in MPPTaskManager", query_id.toDebugString()); + RUNTIME_ASSERT(it != mpp_query_map.end(), log, "MPPTaskQuerySet {} should remaining in MPPTaskManager", query_id.toString()); it->second->state = MPPQueryTaskSet::Aborted; cv.notify_all(); } - LOG_WARNING(log, "Finish abort query: " + query_id.toDebugString()); + LOG_WARNING(log, "Finish abort query: " + query_id.toString()); } std::pair MPPTaskManager::registerTask(MPPTaskPtr task) diff --git a/dbms/src/Flash/Mpp/MPPTaskStatistics.cpp b/dbms/src/Flash/Mpp/MPPTaskStatistics.cpp index 1bef6568b05..a18403ba5d5 100644 --- a/dbms/src/Flash/Mpp/MPPTaskStatistics.cpp +++ b/dbms/src/Flash/Mpp/MPPTaskStatistics.cpp @@ -102,9 +102,9 @@ void MPPTaskStatistics::logTracingJson() R"(,"read_wait_index_start_timestamp":{},"read_wait_index_end_timestamp":{})" R"(,"local_input_bytes":{},"remote_input_bytes":{},"output_bytes":{})" R"(,"status":"{}","error_message":"{}","working_time":{},"memory_peak":{}}})", - id.start_ts, + id.query_id.start_ts, id.task_id, - id.query_id.toDebugString(), + id.query_id.toString(), is_root, sender_executor_id, executor_statistics_collector.resToJson(), diff --git a/dbms/src/Flash/Mpp/MinTSOScheduler.cpp b/dbms/src/Flash/Mpp/MinTSOScheduler.cpp index 5df67f703e4..ef89900b062 100644 --- a/dbms/src/Flash/Mpp/MinTSOScheduler.cpp +++ b/dbms/src/Flash/Mpp/MinTSOScheduler.cpp @@ -57,7 +57,7 @@ MinTSOScheduler::MinTSOScheduler(UInt64 soft_limit, UInt64 hard_limit, UInt64 ac { LOG_INFO(log, "thread_hard_limit is {}, thread_soft_limit is {}, and active_set_soft_limit is {} in MinTSOScheduler.", thread_hard_limit, thread_soft_limit, active_set_soft_limit); } - GET_METRIC(tiflash_task_scheduler, type_min_query_ts).Set(min_query_id.query_ts); + GET_METRIC(tiflash_task_scheduler, type_min_tso).Set(min_query_id.query_ts); GET_METRIC(tiflash_task_scheduler, type_thread_soft_limit).Set(thread_soft_limit); GET_METRIC(tiflash_task_scheduler, type_thread_hard_limit).Set(thread_hard_limit); GET_METRIC(tiflash_task_scheduler, type_estimated_thread_usage).Set(estimated_thread_usage); @@ -96,7 +96,7 @@ void MinTSOScheduler::deleteQuery(const MPPQueryId & query_id, MPPTaskManager & return; } - LOG_DEBUG(log, "{} query {} (is min = {}) is deleted from active set {} left {} or waiting set {} left {}.", is_cancelled ? "Cancelled" : "Finished", query_id.toDebugString(), query_id == min_query_id, active_set.find(query_id) != active_set.end(), active_set.size(), waiting_set.find(query_id) != waiting_set.end(), waiting_set.size()); + LOG_DEBUG(log, "{} query {} (is min = {}) is deleted from active set {} left {} or waiting set {} left {}.", is_cancelled ? "Cancelled" : "Finished", query_id.toString(), query_id == min_query_id, active_set.find(query_id) != active_set.end(), active_set.size(), waiting_set.find(query_id) != waiting_set.end(), waiting_set.size()); active_set.erase(query_id); waiting_set.erase(query_id); GET_METRIC(tiflash_task_scheduler, type_waiting_queries_count).Set(waiting_set.size()); @@ -152,7 +152,7 @@ void MinTSOScheduler::scheduleWaitingQueries(MPPTaskManager & task_manager) auto query_task_set = task_manager.getQueryTaskSetWithoutLock(current_query_id); if (nullptr == query_task_set) /// silently solve this rare case { - LOG_ERROR(log, "the waiting query {} is not in the task manager.", current_query_id.toDebugString()); + LOG_ERROR(log, "the waiting query {} is not in the task manager.", current_query_id.toString()); updateMinQueryId(current_query_id, true, "as it is not in the task manager."); active_set.erase(current_query_id); waiting_set.erase(current_query_id); @@ -161,7 +161,7 @@ void MinTSOScheduler::scheduleWaitingQueries(MPPTaskManager & task_manager) continue; } - LOG_DEBUG(log, "query {} (is min = {}) with {} tasks is to be scheduled from waiting set (size = {}).", current_query_id.toDebugString(), current_query_id == min_query_id, query_task_set->waiting_tasks.size(), waiting_set.size()); + LOG_DEBUG(log, "query {} (is min = {}) with {} tasks is to be scheduled from waiting set (size = {}).", current_query_id.toString(), current_query_id == min_query_id, query_task_set->waiting_tasks.size(), waiting_set.size()); /// schedule tasks one by one while (!query_task_set->waiting_tasks.empty()) { @@ -179,7 +179,7 @@ void MinTSOScheduler::scheduleWaitingQueries(MPPTaskManager & task_manager) query_task_set->waiting_tasks.pop(); GET_METRIC(tiflash_task_scheduler, type_waiting_tasks_count).Decrement(); } - LOG_DEBUG(log, "query {} (is min = {}) is scheduled from waiting set (size = {}).", current_query_id.toDebugString(), current_query_id == min_query_id, waiting_set.size()); + LOG_DEBUG(log, "query {} (is min = {}) is scheduled from waiting set (size = {}).", current_query_id.toString(), current_query_id == min_query_id, waiting_set.size()); waiting_set.erase(current_query_id); /// all waiting tasks of this query are fully active GET_METRIC(tiflash_task_scheduler, type_waiting_queries_count).Set(waiting_set.size()); } @@ -212,7 +212,7 @@ bool MinTSOScheduler::scheduleImp(const MPPQueryId & query_id, const MPPQueryTas if (is_query_id_min) /// the min_query_id query should fully run, otherwise throw errors here. { has_error = true; - auto msg = fmt::format("threads are unavailable for the query {} ({} min_query_id {}) {}, need {}, but used {} of the thread hard limit {}, {} active and {} waiting queries.", query_id.toDebugString(), query_id == min_query_id ? "is" : "is newer than", min_query_id.toDebugString(), isWaiting ? "from the waiting set" : "when directly schedule it", needed_threads, estimated_thread_usage, thread_hard_limit, active_set.size(), waiting_set.size()); + auto msg = fmt::format("threads are unavailable for the query {} ({} min_query_id {}) {}, need {}, but used {} of the thread hard limit {}, {} active and {} waiting queries.", query_id.toString(), query_id == min_query_id ? "is" : "is newer than", min_query_id.toString(), isWaiting ? "from the waiting set" : "when directly schedule it", needed_threads, estimated_thread_usage, thread_hard_limit, active_set.size(), waiting_set.size()); LOG_ERROR(log, "{}", msg); GET_METRIC(tiflash_task_scheduler, type_hard_limit_exceeded_count).Increment(); if (isWaiting) @@ -234,7 +234,7 @@ bool MinTSOScheduler::scheduleImp(const MPPQueryId & query_id, const MPPQueryTas GET_METRIC(tiflash_task_scheduler, type_waiting_queries_count).Set(waiting_set.size()); GET_METRIC(tiflash_task_scheduler, type_waiting_tasks_count).Increment(); } - LOG_INFO(log, "threads are unavailable for the query {} or active set is full (size = {}), need {}, but used {} of the thread soft limit {},{} waiting set size = {}", query_id.toDebugString(), active_set.size(), needed_threads, estimated_thread_usage, thread_soft_limit, isWaiting ? "" : " put into", waiting_set.size()); + LOG_INFO(log, "threads are unavailable for the query {} or active set is full (size = {}), need {}, but used {} of the thread soft limit {},{} waiting set size = {}", query_id.toString(), active_set.size(), needed_threads, estimated_thread_usage, thread_soft_limit, isWaiting ? "" : " put into", waiting_set.size()); return false; } } @@ -259,8 +259,8 @@ bool MinTSOScheduler::updateMinQueryId(const MPPQueryId & query_id, const bool r } if (min_query_id != old_min_query_id) /// if min_query_id == MPPTaskId::Max_Query_Id and the query_id is not to be cancelled, the used_threads, active_set.size() and waiting_set.size() must be 0. { - GET_METRIC(tiflash_task_scheduler, type_min_query_ts).Set(min_query_id.query_ts == 0 ? min_query_id.start_ts : min_query_id.query_ts); - LOG_INFO(log, "min_query_id query is updated from {} to {} {}, used threads = {}, {} active and {} waiting queries.", old_min_query_id.toDebugString(), min_query_id.toDebugString(), msg, estimated_thread_usage, active_set.size(), waiting_set.size()); + GET_METRIC(tiflash_task_scheduler, type_min_tso).Set(min_query_id.query_ts == 0 ? min_query_id.start_ts : min_query_id.query_ts); + LOG_INFO(log, "min_query_id query is updated from {} to {} {}, used threads = {}, {} active and {} waiting queries.", old_min_query_id.toString(), min_query_id.toString(), msg, estimated_thread_usage, active_set.size(), waiting_set.size()); } return force_scheduling; } diff --git a/dbms/src/Flash/Mpp/getMPPTaskTracingLog.cpp b/dbms/src/Flash/Mpp/getMPPTaskTracingLog.cpp index df04d25cac8..dce4e69db1f 100644 --- a/dbms/src/Flash/Mpp/getMPPTaskTracingLog.cpp +++ b/dbms/src/Flash/Mpp/getMPPTaskTracingLog.cpp @@ -13,11 +13,12 @@ // limitations under the License. #include +#include namespace DB { LoggerPtr getMPPTaskTracingLog(const MPPTaskId & mpp_task_id) { - return Logger::get(tracing_log_source, mpp_task_id.toString()); + return Logger::get(DB::tracing_log_source, mpp_task_id.toString()); } } // namespace DB diff --git a/dbms/src/Flash/Mpp/getMPPTaskTracingLog.h b/dbms/src/Flash/Mpp/getMPPTaskTracingLog.h index 1491d26dbf3..4da514f5795 100644 --- a/dbms/src/Flash/Mpp/getMPPTaskTracingLog.h +++ b/dbms/src/Flash/Mpp/getMPPTaskTracingLog.h @@ -19,9 +19,6 @@ namespace DB { -/// Tracing logs are filtered by SourceFilterChannel. -inline constexpr auto tracing_log_source = "mpp_task_tracing"; - /// All tracing logs must logged by the logger that got by `getMPPTaskTracingLog`. LoggerPtr getMPPTaskTracingLog(const MPPTaskId & mpp_task_id); } // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/StorageDisaggregated.cpp b/dbms/src/Storages/StorageDisaggregated.cpp index 8a447637b31..652032b8dc5 100644 --- a/dbms/src/Storages/StorageDisaggregated.cpp +++ b/dbms/src/Storages/StorageDisaggregated.cpp @@ -97,8 +97,11 @@ StorageDisaggregated::RequestAndRegionIDs StorageDisaggregated::buildDispatchMPP std::vector region_ids; auto dispatch_req = std::make_shared<::mpp::DispatchTaskRequest>(); ::mpp::TaskMeta * dispatch_req_meta = dispatch_req->mutable_meta(); - dispatch_req_meta->set_start_ts(sender_target_task_start_ts); - dispatch_req_meta->set_task_id(sender_target_task_task_id); + dispatch_req_meta->set_start_ts(sender_target_mpp_task_id.query_id.start_ts); + dispatch_req_meta->set_query_ts(sender_target_mpp_task_id.query_id.query_ts); + dispatch_req_meta->set_local_query_id(sender_target_mpp_task_id.query_id.local_query_id); + dispatch_req_meta->set_server_id(sender_target_mpp_task_id.query_id.server_id); + dispatch_req_meta->set_task_id(sender_target_mpp_task_id.task_id); dispatch_req_meta->set_address(batch_cop_task.store_addr); const auto & settings = context.getSettings(); dispatch_req->set_timeout(60); @@ -164,10 +167,9 @@ StorageDisaggregated::RequestAndRegionIDs StorageDisaggregated::buildDispatchMPP tipb::Executor * executor = sender_dag_req.mutable_root_executor(); executor->set_tp(tipb::ExecType::TypeExchangeSender); // Exec summary of ExchangeSender will be merged into TableScan. - executor->set_executor_id(fmt::format("{}_{}_{}", + executor->set_executor_id(fmt::format("{}_{}", ExecIDPrefixForTiFlashStorageSender, - sender_target_task_start_ts, - sender_target_task_task_id)); + sender_target_mpp_task_id.toString())); tipb::ExchangeSender * sender = executor->mutable_exchange_sender(); sender->set_tp(tipb::ExchangeType::PassThrough); diff --git a/dbms/src/Storages/StorageDisaggregated.h b/dbms/src/Storages/StorageDisaggregated.h index b878ff21695..59971b05012 100644 --- a/dbms/src/Storages/StorageDisaggregated.h +++ b/dbms/src/Storages/StorageDisaggregated.h @@ -29,7 +29,6 @@ namespace DB { - // Naive implementation of StorageDisaggregated, all region data will be transferred by GRPC, // rewrite this when local cache is supported. // Naive StorageDisaggregated will convert TableScan to ExchangeReceiver(executed in tiflash_compute node), @@ -45,8 +44,7 @@ class StorageDisaggregated : public IStorage , context(context_) , table_scan(table_scan_) , log(Logger::get(context_.getDAGContext()->log ? context_.getDAGContext()->log->identifier() : "")) - , sender_target_task_start_ts(context_.getDAGContext()->getMPPTaskMeta().start_ts()) - , sender_target_task_task_id(context_.getDAGContext()->getMPPTaskMeta().task_id()) + , sender_target_mpp_task_id(context_.getDAGContext()->getMPPTaskMeta()) , push_down_filter(push_down_filter_) {} @@ -87,8 +85,7 @@ class StorageDisaggregated : public IStorage Context & context; const TiDBTableScan & table_scan; LoggerPtr log; - uint64_t sender_target_task_start_ts; - int64_t sender_target_task_task_id; + MPPTaskId sender_target_mpp_task_id; const PushDownFilter & push_down_filter; std::shared_ptr exchange_receiver; diff --git a/dbms/src/Storages/Transaction/RegionTable.h b/dbms/src/Storages/Transaction/RegionTable.h index 5e2154823d7..36686b44d90 100644 --- a/dbms/src/Storages/Transaction/RegionTable.h +++ b/dbms/src/Storages/Transaction/RegionTable.h @@ -59,9 +59,15 @@ using RegionScanFilterPtr = std::shared_ptr; using SafeTS = UInt64; enum : SafeTS { - TsoPhysicalShiftBits = 18, InvalidSafeTS = std::numeric_limits::max(), }; + +using TsoShiftBits = UInt64; +enum : TsoShiftBits +{ + TsoPhysicalShiftBits = 18, +}; + class RegionTable : private boost::noncopyable { public: diff --git a/dbms/src/TestUtils/MPPTaskTestUtils.cpp b/dbms/src/TestUtils/MPPTaskTestUtils.cpp index 5f26f41a065..f9a9123057c 100644 --- a/dbms/src/TestUtils/MPPTaskTestUtils.cpp +++ b/dbms/src/TestUtils/MPPTaskTestUtils.cpp @@ -161,7 +161,7 @@ ::testing::AssertionResult MPPTaskTestUtils::assertQueryActive(const MPPQueryId { if (TiFlashTestEnv::getGlobalContext(i).getTMTContext().getMPPTaskManager()->getQueryTaskSet(query_id) == nullptr) { - return ::testing::AssertionFailure() << "Query " << query_id.toDebugString() << "not active" << std::endl; + return ::testing::AssertionFailure() << "Query " << query_id.toString() << "not active" << std::endl; } } return ::testing::AssertionSuccess(); diff --git a/libs/libcommon/include/common/logger_useful.h b/libs/libcommon/include/common/logger_useful.h index 3858258d2ce..7e9498ec853 100644 --- a/libs/libcommon/include/common/logger_useful.h +++ b/libs/libcommon/include/common/logger_useful.h @@ -26,6 +26,12 @@ #define QUERY_PREVIEW_LENGTH 160 #endif +namespace DB +{ +/// Tracing logs are filtered by SourceFilterChannel. +inline constexpr auto tracing_log_source = "mpp_task_tracing"; +} // namespace DB + namespace LogFmtDetails { // https://stackoverflow.com/questions/8487986/file-macro-shows-full-path/54335644#54335644 diff --git a/libs/libdaemon/src/BaseDaemon.cpp b/libs/libdaemon/src/BaseDaemon.cpp index 61620e48735..0e572b13573 100644 --- a/libs/libdaemon/src/BaseDaemon.cpp +++ b/libs/libdaemon/src/BaseDaemon.cpp @@ -48,7 +48,6 @@ #include #include #include -#include #include #include #include diff --git a/metrics/grafana/tiflash_summary.json b/metrics/grafana/tiflash_summary.json index 46bb02adcfb..f3f65fc33c3 100644 --- a/metrics/grafana/tiflash_summary.json +++ b/metrics/grafana/tiflash_summary.json @@ -51,6 +51,7 @@ "editable": true, "gnetId": null, "graphTooltip": 1, + "iteration": 1670904884485, "links": [], "panels": [ @@ -4127,9 +4128,9 @@ "targets": [ { "exemplar": true, - "expr": "max(tiflash_task_scheduler{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\", type=\"min_query_ts\"}) by (instance,type)", + "expr": "max(tiflash_task_scheduler{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}) by (instance)", "interval": "", - "legendFormat": "{{instance}}-{{type}}", + "legendFormat": "{{instance}}", "queryType": "randomWalk", "refId": "A" }