Skip to content

Commit

Permalink
Scheduler no throw in destruction and avoid updated min-tso query hang (
Browse files Browse the repository at this point in the history
#4367) (#4399)

close #4366
  • Loading branch information
ti-chi-bot authored Mar 24, 2022
1 parent e9a1ded commit 1dd5d6c
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 43 deletions.
11 changes: 10 additions & 1 deletion dbms/src/Flash/Mpp/MPPTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -426,12 +426,21 @@ void MPPTask::scheduleOrWait()
{
LOG_FMT_INFO(log, "task waits for schedule");
Stopwatch stopwatch;
double time_cost;
double time_cost = 0;
{
std::unique_lock lock(schedule_mu);
schedule_cv.wait(lock, [&] { return schedule_state != ScheduleState::WAITING; });
time_cost = stopwatch.elapsedSeconds();
GET_METRIC(tiflash_task_scheduler_waiting_duration_seconds).Observe(time_cost);

if (schedule_state == ScheduleState::EXCEEDED)
{
throw Exception("{} is failed to schedule because of exceeding the thread hard limit in min-tso scheduler after waiting for {}s.", id.toString(), time_cost);
}
else if (schedule_state == ScheduleState::FAILED)
{
throw Exception("{} is failed to schedule because of being cancelled in min-tso scheduler after waiting for {}s.", id.toString(), time_cost);
}
}
LOG_FMT_INFO(log, "task waits for {} s to schedule and starts to run in parallel.", time_cost);
}
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Mpp/MPPTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class MPPTask : public std::enable_shared_from_this<MPPTask>
WAITING,
SCHEDULED,
FAILED,
EXCEEDED,
COMPLETED
};

Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/Mpp/MPPTaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ void MPPTaskManager::cancelMPPQuery(UInt64 query_id, const String & reason)
return;
it->second->to_be_cancelled = true;
task_set = it->second;
scheduler->deleteCancelledQuery(query_id, *this);
scheduler->deleteQuery(query_id, *this, true);
cv.notify_all();
}
LOG_WARNING(log, fmt::format("Begin cancel query: {}", query_id));
Expand Down Expand Up @@ -155,7 +155,7 @@ void MPPTaskManager::unregisterTask(MPPTask * task)
if (it->second->task_map.empty())
{
/// remove query task map if the task is the last one
scheduler->deleteFinishedQuery(task->id.start_ts);
scheduler->deleteQuery(task->id.start_ts, *this, false);
mpp_query_map.erase(it);
}
return;
Expand Down
59 changes: 28 additions & 31 deletions dbms/src/Flash/Mpp/MinTSOScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,54 +75,43 @@ bool MinTSOScheduler::tryToSchedule(const MPPTaskPtr & task, MPPTaskManager & ta
return scheduleImp(id.start_ts, query_task_set, task, false);
}

/// the cancelled query maybe hang, so trigger scheduling as needed.
void MinTSOScheduler::deleteCancelledQuery(const UInt64 tso, MPPTaskManager & task_manager)
/// after finishing the query, there would be no threads released soon, so the updated min-tso query with waiting tasks should be scheduled.
/// the cancelled query maybe hang, so trigger scheduling as needed when deleting cancelled query.
void MinTSOScheduler::deleteQuery(const UInt64 tso, MPPTaskManager & task_manager, const bool is_cancelled)
{
if (isDisabled())
{
return;
}

LOG_FMT_DEBUG(log, "{} query {} (is min = {}) is deleted from active set {} left {} or waiting set {} left {}.", is_cancelled ? "Cancelled" : "Finished", tso, tso == min_tso, active_set.find(tso) != active_set.end(), active_set.size(), waiting_set.find(tso) != waiting_set.end(), waiting_set.size());
active_set.erase(tso);
waiting_set.erase(tso);
GET_METRIC(tiflash_task_scheduler, type_waiting_queries_count).Set(waiting_set.size());
GET_METRIC(tiflash_task_scheduler, type_active_queries_count).Set(active_set.size());

auto query_task_set = task_manager.getQueryTaskSetWithoutLock(tso);
if (query_task_set) /// release all waiting tasks
if (is_cancelled) /// cancelled queries may have waiting tasks, and finished queries haven't.
{
while (!query_task_set->waiting_tasks.empty())
auto query_task_set = task_manager.getQueryTaskSetWithoutLock(tso);
if (query_task_set) /// release all waiting tasks
{
query_task_set->waiting_tasks.front()->scheduleThisTask(MPPTask::ScheduleState::FAILED);
query_task_set->waiting_tasks.pop();
GET_METRIC(tiflash_task_scheduler, type_waiting_tasks_count).Decrement();
while (!query_task_set->waiting_tasks.empty())
{
query_task_set->waiting_tasks.front()->scheduleThisTask(MPPTask::ScheduleState::FAILED);
query_task_set->waiting_tasks.pop();
GET_METRIC(tiflash_task_scheduler, type_waiting_tasks_count).Decrement();
}
}
}

/// NOTE: if the cancelled query hang, it will block the min_tso, possibly resulting in deadlock. so here force scheduling waiting tasks of the updated min_tso query.
if (updateMinTSO(tso, true, "when cancelling it."))
/// NOTE: if updated min_tso query has waiting tasks, they should be scheduled, especially when the soft-limited threads are amost used and active tasks are in resources deadlock which cannot release threads soon.
if (updateMinTSO(tso, true, is_cancelled ? "when cancelling it" : "as finishing it"))
{
scheduleWaitingQueries(task_manager);
}
}

void MinTSOScheduler::deleteFinishedQuery(const UInt64 tso)
{
if (isDisabled())
{
return;
}

LOG_FMT_DEBUG(log, "query {} (is min = {}) is deleted from active set {} left {} or waiting set {} left {}.", tso, tso == min_tso, active_set.find(tso) != active_set.end(), active_set.size(), waiting_set.find(tso) != waiting_set.end(), waiting_set.size());
/// delete from sets
active_set.erase(tso);
waiting_set.erase(tso);
GET_METRIC(tiflash_task_scheduler, type_waiting_queries_count).Set(waiting_set.size());
GET_METRIC(tiflash_task_scheduler, type_active_queries_count).Set(active_set.size());

updateMinTSO(tso, true, "as deleting it.");
}

/// NOTE: should not throw exceptions due to being called when destruction.
void MinTSOScheduler::releaseThreadsThenSchedule(const int needed_threads, MPPTaskManager & task_manager)
{
if (isDisabled())
Expand All @@ -132,9 +121,8 @@ void MinTSOScheduler::releaseThreadsThenSchedule(const int needed_threads, MPPTa

if (static_cast<Int64>(estimated_thread_usage) < needed_threads)
{
auto msg = fmt::format("estimated_thread_usage should not be smaller than 0, actually is {}.", static_cast<Int64>(estimated_thread_usage) - needed_threads);
LOG_FMT_ERROR(log, "{}", msg);
throw Exception(msg);
LOG_FMT_FATAL(log, "estimated_thread_usage should not be smaller than 0, actually is {}.", static_cast<Int64>(estimated_thread_usage) - needed_threads);
std::terminate();
}
estimated_thread_usage -= needed_threads;
GET_METRIC(tiflash_task_scheduler, type_estimated_thread_usage).Set(estimated_thread_usage);
Expand Down Expand Up @@ -202,7 +190,16 @@ bool MinTSOScheduler::scheduleImp(const UInt64 tso, const MPPQueryTaskSetPtr & q
auto msg = fmt::format("threads are unavailable for the query {} ({} min_tso {}) {}, need {}, but used {} of the thread hard limit {}, {} active and {} waiting queries.", tso, tso == min_tso ? "is" : "is newer than", min_tso, isWaiting ? "from the waiting set" : "when directly schedule it", needed_threads, estimated_thread_usage, thread_hard_limit, active_set.size(), waiting_set.size());
LOG_FMT_ERROR(log, "{}", msg);
GET_METRIC(tiflash_task_scheduler, type_hard_limit_exceeded_count).Increment();
throw Exception(msg);
if (isWaiting)
{
/// set this task be failed to schedule, and the task will throw exception, then TiDB will finally notify this tiflash node canceling all tasks of this tso and update metrics.
task->scheduleThisTask(MPPTask::ScheduleState::EXCEEDED);
waiting_set.erase(tso); /// avoid the left waiting tasks of this query reaching here many times.
}
else
{
throw Exception(msg);
}
}
if (!isWaiting)
{
Expand Down
13 changes: 4 additions & 9 deletions dbms/src/Flash/Mpp/MinTSOScheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ namespace DB
/// scheduling tasks in the set according to the tso order under the soft limit of threads, but allow the min_tso query to preempt threads under the hard limit of threads.
/// The min_tso query avoids the deadlock resulted from threads competition among nodes.
/// schedule tasks under the lock protection of the task manager.
/// NOTE: if this scheduler hangs resulting from some bugs, kill the min_tso query, and the cancelled query surely transfers the min_tso.
/// NOTE: if the updated min-tso query has waiting tasks, necessarily scheduling them, otherwise the query would hang.
class MinTSOScheduler : private boost::noncopyable
{
public:
Expand All @@ -34,14 +34,9 @@ class MinTSOScheduler : private boost::noncopyable
/// NOTE: call tryToSchedule under the lock protection of MPPTaskManager
bool tryToSchedule(const MPPTaskPtr & task, MPPTaskManager & task_manager);

/// delete this to-be cancelled query from scheduler and update min_tso if needed, so that there aren't cancelled queries in the scheduler.
/// NOTE: call deleteCancelledQuery under the lock protection of MPPTaskManager
void deleteCancelledQuery(const UInt64 tso, MPPTaskManager & task_manager);

/// delete the query in the active set and waiting set
/// NOTE: call deleteFinishedQuery under the lock protection of MPPTaskManager,
/// so this func is called exactly once for a query.
void deleteFinishedQuery(const UInt64 tso);
/// delete this to-be cancelled/finished query from scheduler and update min_tso if needed, so that there aren't cancelled/finished queries in the scheduler.
/// NOTE: call deleteQuery under the lock protection of MPPTaskManager
void deleteQuery(const UInt64 tso, MPPTaskManager & task_manager, const bool is_cancelled);

/// all scheduled tasks should finally call this function to release threads and schedule new tasks
void releaseThreadsThenSchedule(const int needed_threads, MPPTaskManager & task_manager);
Expand Down
1 change: 1 addition & 0 deletions libs/libcommon/include/common/logger_useful.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,3 +151,4 @@ std::string toCheckedFmtStr(const S & format, const Ignored &, Args &&... args)
#define LOG_FMT_INFO(logger, ...) LOG_FMT_IMPL(logger, Poco::Message::PRIO_INFORMATION, __VA_ARGS__)
#define LOG_FMT_WARNING(logger, ...) LOG_FMT_IMPL(logger, Poco::Message::PRIO_WARNING, __VA_ARGS__)
#define LOG_FMT_ERROR(logger, ...) LOG_FMT_IMPL(logger, Poco::Message::PRIO_ERROR, __VA_ARGS__)
#define LOG_FMT_FATAL(logger, ...) LOG_FMT_IMPL(logger, Poco::Message::PRIO_FATAL, __VA_ARGS__)

0 comments on commit 1dd5d6c

Please sign in to comment.