Skip to content

Commit

Permalink
refine mpp schedule
Browse files Browse the repository at this point in the history
Signed-off-by: xufei <[email protected]>
  • Loading branch information
windtalker committed Sep 28, 2022
1 parent a981237 commit db032ea
Show file tree
Hide file tree
Showing 10 changed files with 241 additions and 97 deletions.
59 changes: 8 additions & 51 deletions dbms/src/Flash/Mpp/MPPTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,13 @@ extern const char force_no_local_region_for_mpp_task[];
} // namespace FailPoints

MPPTask::MPPTask(const mpp::TaskMeta & meta_, const ContextPtr & context_)
: context(context_)
, meta(meta_)
: meta(meta_)
, id(meta.start_ts(), meta.task_id())
, context(context_)
, manager(context_->getTMTContext().getMPPTaskManager().get())
, schedule_entry(manager, id)
, log(Logger::get("MPPTask", id.toString()))
, mpp_task_statistics(id, meta.address())
, needed_threads(0)
, schedule_state(ScheduleState::WAITING)
{
current_memory_tracker = nullptr;
}
Expand All @@ -70,13 +69,6 @@ MPPTask::~MPPTask()
if (process_list_entry != nullptr && current_memory_tracker != process_list_entry->get().getMemoryTrackerPtr().get())
current_memory_tracker = process_list_entry->get().getMemoryTrackerPtr().get();
abortTunnels("", true);
if (schedule_state == ScheduleState::SCHEDULED)
{
/// the threads of this task are not fully freed now, since the BlockIO and DAGContext are not destructed
/// TODO: finish all threads before here, except the current one.
manager->releaseThreadsFromScheduler(needed_threads);
schedule_state = ScheduleState::COMPLETED;
}
LOG_FMT_DEBUG(log, "finish MPPTask: {}", id.toString());
}

Expand Down Expand Up @@ -365,8 +357,8 @@ void MPPTask::runImpl()
{
LOG_FMT_INFO(log, "task starts preprocessing");
preprocess();
needed_threads = estimateCountOfNewThreads();
LOG_FMT_DEBUG(log, "Estimate new thread count of query: {} including tunnel_threads: {}, receiver_threads: {}", needed_threads, dag_context->tunnel_set->getRemoteTunnelCnt(), new_thread_count_of_exchange_receiver);
schedule_entry.setNeededThreads(estimateCountOfNewThreads());
LOG_FMT_DEBUG(log, "Estimate new thread count of query: {} including tunnel_threads: {}, receiver_threads: {}", schedule_entry.getNeededThreads(), dag_context->tunnel_set->getRemoteTunnelCnt(), new_thread_count_of_exchange_receiver);

scheduleOrWait();

Expand Down Expand Up @@ -511,41 +503,15 @@ bool MPPTask::switchStatus(TaskStatus from, TaskStatus to)

void MPPTask::scheduleOrWait()
{
if (!manager->tryToScheduleTask(shared_from_this()))
if (!manager->tryToScheduleTask(schedule_entry))
{
LOG_FMT_INFO(log, "task waits for schedule");
Stopwatch stopwatch;
double time_cost = 0;
{
std::unique_lock lock(schedule_mu);
schedule_cv.wait(lock, [&] { return schedule_state != ScheduleState::WAITING; });
time_cost = stopwatch.elapsedSeconds();
GET_METRIC(tiflash_task_scheduler_waiting_duration_seconds).Observe(time_cost);

if (schedule_state == ScheduleState::EXCEEDED)
{
throw Exception(fmt::format("{} is failed to schedule because of exceeding the thread hard limit in min-tso scheduler after waiting for {}s.", id.toString(), time_cost));
}
else if (schedule_state == ScheduleState::FAILED)
{
throw Exception(fmt::format("{} is failed to schedule because of being cancelled in min-tso scheduler after waiting for {}s.", id.toString(), time_cost));
}
}
LOG_FMT_INFO(log, "task waits for {} s to schedule and starts to run in parallel.", time_cost);
schedule_entry.waitForSchedule();
}
}

bool MPPTask::scheduleThisTask(ScheduleState state)
{
std::unique_lock lock(schedule_mu);
if (schedule_state == ScheduleState::WAITING)
{
LOG_FMT_INFO(log, "task is {}.", state == ScheduleState::SCHEDULED ? "scheduled" : " failed to schedule");
schedule_state = state;
schedule_cv.notify_one();
return true;
}
return false;
return schedule_entry.schedule(state);
}

int MPPTask::estimateCountOfNewThreads()
Expand All @@ -558,13 +524,4 @@ int MPPTask::estimateCountOfNewThreads()
+ dag_context->tunnel_set->getRemoteTunnelCnt();
}

int MPPTask::getNeededThreads()
{
if (needed_threads == 0)
{
throw Exception(" the needed_threads of task " + id.toString() + " is not initialized!");
}
return needed_threads;
}

} // namespace DB
35 changes: 11 additions & 24 deletions dbms/src/Flash/Mpp/MPPTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Mpp/MPPReceiverSet.h>
#include <Flash/Mpp/MPPTaskId.h>
#include <Flash/Mpp/MPPTaskScheduleEntry.h>
#include <Flash/Mpp/MPPTaskStatistics.h>
#include <Flash/Mpp/MPPTunnel.h>
#include <Flash/Mpp/MPPTunnelSet.h>
Expand Down Expand Up @@ -72,19 +73,10 @@ class MPPTask : public std::enable_shared_from_this<MPPTask>

void run();

int getNeededThreads();

enum class ScheduleState
{
WAITING,
SCHEDULED,
FAILED,
EXCEEDED,
COMPLETED
};

bool scheduleThisTask(ScheduleState state);

MPPTaskScheduleEntry & getScheduleEntry() { return schedule_entry; }

// tunnel and error_message
std::pair<MPPTunnelPtr, String> getTunnel(const ::mpp::EstablishMPPConnectionRequest * request);

Expand Down Expand Up @@ -119,8 +111,16 @@ class MPPTask : public std::enable_shared_from_this<MPPTask>
void initExchangeReceivers();

tipb::DAGRequest dag_req;
mpp::TaskMeta meta;
MPPTaskId id;

ContextPtr context;

MPPTaskManager * manager;
std::atomic<bool> registered{false};

MPPTaskScheduleEntry schedule_entry;

// `dag_context` holds inputstreams which could hold ref to `context` so it should be destructed
// before `context`.
std::unique_ptr<DAGContext> dag_context;
Expand All @@ -130,10 +130,6 @@ class MPPTask : public std::enable_shared_from_this<MPPTask>
std::atomic<TaskStatus> status{INITIALIZING};
String err_string;

mpp::TaskMeta meta;

MPPTaskId id;

std::mutex tunnel_and_receiver_mu;

MPPTunnelSetPtr tunnel_set;
Expand All @@ -142,21 +138,12 @@ class MPPTask : public std::enable_shared_from_this<MPPTask>

int new_thread_count_of_exchange_receiver = 0;

MPPTaskManager * manager;
std::atomic<bool> registered{false};

const LoggerPtr log;

MPPTaskStatistics mpp_task_statistics;

friend class MPPTaskManager;
friend class MPPHandler;

int needed_threads;

std::mutex schedule_mu;
std::condition_variable schedule_cv;
ScheduleState schedule_state;
};

using MPPTaskPtr = std::shared_ptr<MPPTask>;
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/Mpp/MPPTaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,10 @@ MPPQueryTaskSetPtr MPPTaskManager::getQueryTaskSet(UInt64 query_id)
return getQueryTaskSetWithoutLock(query_id);
}

bool MPPTaskManager::tryToScheduleTask(const MPPTaskPtr & task)
bool MPPTaskManager::tryToScheduleTask(MPPTaskScheduleEntry & schedule_entry)
{
std::lock_guard lock(mu);
return scheduler->tryToSchedule(task, *this);
return scheduler->tryToSchedule(schedule_entry, *this);
}

void MPPTaskManager::releaseThreadsFromScheduler(const int needed_threads)
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Mpp/MPPTaskManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class MPPTaskManager : private boost::noncopyable

std::pair<bool, String> unregisterTask(const MPPTaskId & id);

bool tryToScheduleTask(const MPPTaskPtr & task);
bool tryToScheduleTask(MPPTaskScheduleEntry & schedule_entry);

void releaseThreadsFromScheduler(int needed_threads);

Expand Down
115 changes: 115 additions & 0 deletions dbms/src/Flash/Mpp/MPPTaskScheduleEntry.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/CPUAffinityManager.h>
#include <Common/FailPoint.h>
#include <Common/ThreadFactory.h>
#include <Common/ThreadManager.h>
#include <Common/TiFlashMetrics.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/SquashingBlockOutputStream.h>
#include <Flash/Coprocessor/DAGCodec.h>
#include <Flash/Coprocessor/DAGUtils.h>
#include <Flash/Mpp/GRPCReceiverContext.h>
#include <Flash/Mpp/MPPTask.h>
#include <Flash/Mpp/MPPTaskScheduleEntry.h>
#include <Flash/Mpp/MPPTunnelSet.h>
#include <Flash/Mpp/Utils.h>
#include <Flash/Statistics/traverseExecutors.h>
#include <Flash/executeQuery.h>
#include <Interpreters/ProcessList.h>
#include <Storages/Transaction/KVStore.h>
#include <Storages/Transaction/TMTContext.h>
#include <fmt/core.h>

#include <chrono>
#include <ext/scope_guard.h>
#include <magic_enum.hpp>
#include <map>

namespace DB
{
MPPTaskScheduleEntry::MPPTaskScheduleEntry(MPPTaskManager * manager_, const MPPTaskId & id_)
: manager(manager_)
, id(id_)
, needed_threads(0)
, schedule_state(ScheduleState::WAITING)
, log(Logger::get("MPPTaskScheduleEntry", id.toString()))
{}

MPPTaskScheduleEntry::~MPPTaskScheduleEntry()
{
if (schedule_state == ScheduleState::SCHEDULED)
{
manager->releaseThreadsFromScheduler(needed_threads);
schedule_state = ScheduleState::COMPLETED;
}
}

bool MPPTaskScheduleEntry::schedule(ScheduleState state)
{
std::unique_lock lock(schedule_mu);
if (schedule_state == ScheduleState::WAITING)
{
LOG_FMT_INFO(log, "task is {}.", state == ScheduleState::SCHEDULED ? "scheduled" : " failed to schedule");
schedule_state = state;
schedule_cv.notify_one();
return true;
}
return false;
}

void MPPTaskScheduleEntry::waitForSchedule()
{
LOG_FMT_INFO(log, "task waits for schedule");
Stopwatch stopwatch;
double time_cost = 0;
{
std::unique_lock lock(schedule_mu);
schedule_cv.wait(lock, [&] { return schedule_state != ScheduleState::WAITING; });
time_cost = stopwatch.elapsedSeconds();
GET_METRIC(tiflash_task_scheduler_waiting_duration_seconds).Observe(time_cost);

if (schedule_state == ScheduleState::EXCEEDED)
{
throw Exception(fmt::format("{} is failed to schedule because of exceeding the thread hard limit in min-tso scheduler after waiting for {}s.", id.toString(), time_cost));
}
else if (schedule_state == ScheduleState::FAILED)
{
throw Exception(fmt::format("{} is failed to schedule because of being cancelled in min-tso scheduler after waiting for {}s.", id.toString(), time_cost));
}
}
LOG_FMT_INFO(log, "task waits for {} s to schedule and starts to run in parallel.", time_cost);
}

const MPPTaskId & MPPTaskScheduleEntry::getMPPTaskId() const
{
return id;
}

int MPPTaskScheduleEntry::getNeededThreads() const
{
if (needed_threads == 0)
{
throw Exception(" the needed_threads of task " + id.toString() + " is not initialized!");
}
return needed_threads;
}

void MPPTaskScheduleEntry::setNeededThreads(int needed_threads_)
{
needed_threads = needed_threads_;
}

} // namespace DB
78 changes: 78 additions & 0 deletions dbms/src/Flash/Mpp/MPPTaskScheduleEntry.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Common/Exception.h>
#include <Common/Logger.h>
#include <Common/MemoryTracker.h>
#include <DataStreams/BlockIO.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Mpp/MPPReceiverSet.h>
#include <Flash/Mpp/MPPTaskId.h>
#include <Flash/Mpp/MPPTaskStatistics.h>
#include <Flash/Mpp/MPPTunnel.h>
#include <Flash/Mpp/MPPTunnelSet.h>
#include <Flash/Mpp/TaskStatus.h>
#include <Interpreters/Context.h>
#include <common/logger_useful.h>
#include <common/types.h>
#include <kvproto/mpp.pb.h>

#include <atomic>
#include <boost/noncopyable.hpp>
#include <memory>
#include <unordered_map>

namespace DB
{
class MPPTaskManager;

enum class ScheduleState
{
WAITING,
SCHEDULED,
FAILED,
EXCEEDED,
COMPLETED
};

class MPPTaskScheduleEntry
{
public:
int getNeededThreads() const;
void setNeededThreads(int needed_threads_);

bool schedule(ScheduleState state);
void waitForSchedule();

const MPPTaskId & getMPPTaskId() const;

~MPPTaskScheduleEntry();

MPPTaskScheduleEntry(MPPTaskManager * manager_, const MPPTaskId & id_);

private:
MPPTaskManager * manager;
MPPTaskId id;

int needed_threads;

std::mutex schedule_mu;
std::condition_variable schedule_cv;
ScheduleState schedule_state;
const LoggerPtr log;
};

} // namespace DB
Loading

0 comments on commit db032ea

Please sign in to comment.