From 7f170254c16c54476cabd45d7fda742f43c5e72c Mon Sep 17 00:00:00 2001 From: xufei Date: Mon, 8 Aug 2022 16:32:47 +0800 Subject: [PATCH] This is an automated cherry-pick of #5557 Signed-off-by: ti-chi-bot --- dbms/src/Flash/Mpp/MPPTask.cpp | 4 +++- dbms/src/Flash/Mpp/MPPTask.h | 2 +- dbms/src/Flash/Mpp/MinTSOScheduler.cpp | 14 ++++++++++++-- 3 files changed, 16 insertions(+), 4 deletions(-) diff --git a/dbms/src/Flash/Mpp/MPPTask.cpp b/dbms/src/Flash/Mpp/MPPTask.cpp index def9bd70efd..9d7b99ed0a9 100644 --- a/dbms/src/Flash/Mpp/MPPTask.cpp +++ b/dbms/src/Flash/Mpp/MPPTask.cpp @@ -531,7 +531,7 @@ void MPPTask::scheduleOrWait() } } -void MPPTask::scheduleThisTask(ScheduleState state) +bool MPPTask::scheduleThisTask(ScheduleState state) { std::unique_lock lock(schedule_mu); if (schedule_state == ScheduleState::WAITING) @@ -539,7 +539,9 @@ void MPPTask::scheduleThisTask(ScheduleState state) LOG_FMT_INFO(log, "task is {}.", state == ScheduleState::SCHEDULED ? "scheduled" : " failed to schedule"); schedule_state = state; schedule_cv.notify_one(); + return true; } + return false; } int MPPTask::estimateCountOfNewThreads() diff --git a/dbms/src/Flash/Mpp/MPPTask.h b/dbms/src/Flash/Mpp/MPPTask.h index d4771159df8..dfa9f8a2ea8 100644 --- a/dbms/src/Flash/Mpp/MPPTask.h +++ b/dbms/src/Flash/Mpp/MPPTask.h @@ -76,7 +76,7 @@ class MPPTask : public std::enable_shared_from_this COMPLETED }; - void scheduleThisTask(ScheduleState state); + bool scheduleThisTask(ScheduleState state); bool isScheduled(); diff --git a/dbms/src/Flash/Mpp/MinTSOScheduler.cpp b/dbms/src/Flash/Mpp/MinTSOScheduler.cpp index 967bfcecfa3..bf6b4ee3c8c 100644 --- a/dbms/src/Flash/Mpp/MinTSOScheduler.cpp +++ b/dbms/src/Flash/Mpp/MinTSOScheduler.cpp @@ -167,7 +167,10 @@ void MinTSOScheduler::scheduleWaitingQueries(MPPTaskManager & task_manager) if (task_it != query_task_set->task_map.end() && task_it->second != nullptr && !scheduleImp(current_query_id, query_task_set, task_it->second, true, has_error)) { if (has_error) + { query_task_set->waiting_tasks.pop(); /// it should be pop from the waiting queue, because the task is scheduled with errors. + GET_METRIC(tiflash_task_scheduler, type_waiting_tasks_count).Decrement(); + } return; } query_task_set->waiting_tasks.pop(); @@ -189,12 +192,19 @@ bool MinTSOScheduler::scheduleImp(const UInt64 tso, const MPPQueryTaskSetPtr & q { updateMinTSO(tso, false, isWaiting ? "from the waiting set" : "when directly schedule it"); active_set.insert(tso); - estimated_thread_usage += needed_threads; - task->scheduleThisTask(MPPTask::ScheduleState::SCHEDULED); + if (task->scheduleThisTask(MPPTask::ScheduleState::SCHEDULED)) + { + estimated_thread_usage += needed_threads; + GET_METRIC(tiflash_task_scheduler, type_active_tasks_count).Increment(); + } GET_METRIC(tiflash_task_scheduler, type_active_queries_count).Set(active_set.size()); GET_METRIC(tiflash_task_scheduler, type_estimated_thread_usage).Set(estimated_thread_usage); +<<<<<<< HEAD GET_METRIC(tiflash_task_scheduler, type_active_tasks_count).Increment(); LOG_FMT_INFO(log, "{} is scheduled (active set size = {}) due to available threads {}, after applied for {} threads, used {} of the thread {} limit {}.", task->getId().toString(), active_set.size(), isWaiting ? " from the waiting set" : " directly", needed_threads, estimated_thread_usage, min_tso == tso ? "hard" : "soft", min_tso == tso ? thread_hard_limit : thread_soft_limit); +======= + LOG_FMT_INFO(log, "{} is scheduled (active set size = {}) due to available threads {}, after applied for {} threads, used {} of the thread {} limit {}.", task->getId().toString(), active_set.size(), isWaiting ? "from the waiting set" : "directly", needed_threads, estimated_thread_usage, min_tso == tso ? "hard" : "soft", min_tso == tso ? thread_hard_limit : thread_soft_limit); +>>>>>>> 89e7df8fe7 (fix bug in `MinTSOScheduler` that `estimated_thread_usage`/`waiting_tasks_count`/`active_tasks_count` are not 0 even if there is no queries running (#5557)) return true; } else