diff --git a/dbms/src/Flash/Mpp/MPPTask.cpp b/dbms/src/Flash/Mpp/MPPTask.cpp index def9bd70efd..9d7b99ed0a9 100644 --- a/dbms/src/Flash/Mpp/MPPTask.cpp +++ b/dbms/src/Flash/Mpp/MPPTask.cpp @@ -531,7 +531,7 @@ void MPPTask::scheduleOrWait() } } -void MPPTask::scheduleThisTask(ScheduleState state) +bool MPPTask::scheduleThisTask(ScheduleState state) { std::unique_lock lock(schedule_mu); if (schedule_state == ScheduleState::WAITING) @@ -539,7 +539,9 @@ void MPPTask::scheduleThisTask(ScheduleState state) LOG_FMT_INFO(log, "task is {}.", state == ScheduleState::SCHEDULED ? "scheduled" : " failed to schedule"); schedule_state = state; schedule_cv.notify_one(); + return true; } + return false; } int MPPTask::estimateCountOfNewThreads() diff --git a/dbms/src/Flash/Mpp/MPPTask.h b/dbms/src/Flash/Mpp/MPPTask.h index d4771159df8..dfa9f8a2ea8 100644 --- a/dbms/src/Flash/Mpp/MPPTask.h +++ b/dbms/src/Flash/Mpp/MPPTask.h @@ -76,7 +76,7 @@ class MPPTask : public std::enable_shared_from_this COMPLETED }; - void scheduleThisTask(ScheduleState state); + bool scheduleThisTask(ScheduleState state); bool isScheduled(); diff --git a/dbms/src/Flash/Mpp/MinTSOScheduler.cpp b/dbms/src/Flash/Mpp/MinTSOScheduler.cpp index 967bfcecfa3..be2f3fd6e97 100644 --- a/dbms/src/Flash/Mpp/MinTSOScheduler.cpp +++ b/dbms/src/Flash/Mpp/MinTSOScheduler.cpp @@ -167,7 +167,10 @@ void MinTSOScheduler::scheduleWaitingQueries(MPPTaskManager & task_manager) if (task_it != query_task_set->task_map.end() && task_it->second != nullptr && !scheduleImp(current_query_id, query_task_set, task_it->second, true, has_error)) { if (has_error) + { query_task_set->waiting_tasks.pop(); /// it should be pop from the waiting queue, because the task is scheduled with errors. + GET_METRIC(tiflash_task_scheduler, type_waiting_tasks_count).Decrement(); + } return; } query_task_set->waiting_tasks.pop(); @@ -189,11 +192,13 @@ bool MinTSOScheduler::scheduleImp(const UInt64 tso, const MPPQueryTaskSetPtr & q { updateMinTSO(tso, false, isWaiting ? "from the waiting set" : "when directly schedule it"); active_set.insert(tso); - estimated_thread_usage += needed_threads; - task->scheduleThisTask(MPPTask::ScheduleState::SCHEDULED); + if (task->scheduleThisTask(MPPTask::ScheduleState::SCHEDULED)) + { + estimated_thread_usage += needed_threads; + GET_METRIC(tiflash_task_scheduler, type_active_tasks_count).Increment(); + } GET_METRIC(tiflash_task_scheduler, type_active_queries_count).Set(active_set.size()); GET_METRIC(tiflash_task_scheduler, type_estimated_thread_usage).Set(estimated_thread_usage); - GET_METRIC(tiflash_task_scheduler, type_active_tasks_count).Increment(); LOG_FMT_INFO(log, "{} is scheduled (active set size = {}) due to available threads {}, after applied for {} threads, used {} of the thread {} limit {}.", task->getId().toString(), active_set.size(), isWaiting ? " from the waiting set" : " directly", needed_threads, estimated_thread_usage, min_tso == tso ? "hard" : "soft", min_tso == tso ? thread_hard_limit : thread_soft_limit); return true; }