Skip to content

Commit

Permalink
Remove MPPTaskManager::findTaskWithTimeout (#5647)
Browse files Browse the repository at this point in the history
ref #5095
  • Loading branch information
windtalker authored Aug 18, 2022
1 parent 3240237 commit b6536f6
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 69 deletions.
34 changes: 13 additions & 21 deletions dbms/src/Flash/FlashService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -262,36 +262,28 @@ ::grpc::Status FlashService::establishMPPConnectionSyncOrAsync(::grpc::ServerCon
auto & tmt_context = context->getTMTContext();
auto task_manager = tmt_context.getMPPTaskManager();
std::chrono::seconds timeout(10);
std::string err_msg;
MPPTunnelPtr tunnel = nullptr;
auto [tunnel, err_msg] = task_manager->findTunnelWithTimeout(request, timeout);
if (tunnel == nullptr)
{
MPPTaskPtr sender_task = task_manager->findTaskWithTimeout(request->sender_meta(), timeout, err_msg);
if (sender_task != nullptr)
if (calldata)
{
std::tie(tunnel, err_msg) = sender_task->getTunnel(request);
LOG_ERROR(log, err_msg);
// In Async version, writer::Write() return void.
// So the way to track Write fail and return grpc::StatusCode::UNKNOWN is to catch the exeception.
calldata->writeErr(getPacketWithError(err_msg));
return grpc::Status::OK;
}
if (tunnel == nullptr)
else
{
if (calldata)
LOG_ERROR(log, err_msg);
if (sync_writer->Write(getPacketWithError(err_msg)))
{
LOG_ERROR(log, err_msg);
// In Async version, writer::Write() return void.
// So the way to track Write fail and return grpc::StatusCode::UNKNOWN is to catch the exeception.
calldata->writeErr(getPacketWithError(err_msg));
return grpc::Status::OK;
}
else
{
LOG_ERROR(log, err_msg);
if (sync_writer->Write(getPacketWithError(err_msg)))
{
return grpc::Status::OK;
}
else
{
LOG_FMT_DEBUG(log, "Write error message failed for unknown reason.");
return grpc::Status(grpc::StatusCode::UNKNOWN, "Write error message failed for unknown reason.");
}
LOG_FMT_DEBUG(log, "Write error message failed for unknown reason.");
return grpc::Status(grpc::StatusCode::UNKNOWN, "Write error message failed for unknown reason.");
}
}
}
Expand Down
17 changes: 4 additions & 13 deletions dbms/src/Flash/Mpp/GRPCReceiverContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,23 +154,14 @@ std::tuple<MPPTunnelPtr, grpc::Status> establishMPPConnectionLocal(
const std::shared_ptr<MPPTaskManager> & task_manager)
{
std::chrono::seconds timeout(10);
String err_msg;
MPPTunnelPtr tunnel = nullptr;
auto [tunnel, err_msg] = task_manager->findTunnelWithTimeout(request, timeout);
if (tunnel == nullptr)
{
MPPTaskPtr sender_task = task_manager->findTaskWithTimeout(request->sender_meta(), timeout, err_msg);
if (sender_task != nullptr)
{
std::tie(tunnel, err_msg) = sender_task->getTunnel(request);
}
if (tunnel == nullptr)
{
return std::make_tuple(tunnel, grpc::Status(grpc::StatusCode::INTERNAL, err_msg));
}
return std::make_tuple(tunnel, grpc::Status(grpc::StatusCode::INTERNAL, err_msg));
}
if (!tunnel->isLocal())
{
String err_msg("EstablishMPPConnectionLocal into a remote channel !");
return std::make_tuple(nullptr, grpc::Status(grpc::StatusCode::INTERNAL, err_msg));
return std::make_tuple(nullptr, grpc::Status(grpc::StatusCode::INTERNAL, "EstablishMPPConnectionLocal into a remote channel!"));
}
tunnel->connect(nullptr);
return std::make_tuple(tunnel, grpc::Status::OK);
Expand Down
34 changes: 5 additions & 29 deletions dbms/src/Flash/Mpp/MPPTaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ MPPTaskManager::MPPTaskManager(MPPTaskSchedulerPtr scheduler_)
, log(&Poco::Logger::get("TaskManager"))
{}

MPPTaskPtr MPPTaskManager::findTaskWithTimeout(const mpp::TaskMeta & meta, std::chrono::seconds timeout, std::string & errMsg)
std::pair<MPPTunnelPtr, String> MPPTaskManager::findTunnelWithTimeout(const ::mpp::EstablishMPPConnectionRequest * request, std::chrono::seconds timeout)
{
const auto & meta = request->sender_meta();
MPPTaskId id{meta.start_ts(), meta.task_id()};
std::unordered_map<MPPTaskId, MPPTaskPtr>::iterator it;
bool cancelled = false;
Expand All @@ -59,15 +60,13 @@ MPPTaskPtr MPPTaskManager::findTaskWithTimeout(const mpp::TaskMeta & meta, std::
fiu_do_on(FailPoints::random_task_manager_find_task_failure_failpoint, ret = false;);
if (cancelled)
{
errMsg = fmt::format("Task [{},{}] has been cancelled.", meta.start_ts(), meta.task_id());
return nullptr;
return {nullptr, fmt::format("Task [{},{}] has been cancelled.", meta.start_ts(), meta.task_id())};
}
else if (!ret)
{
errMsg = fmt::format("Can't find task [{},{}] within {} s.", meta.start_ts(), meta.task_id(), timeout.count());
return nullptr;
return {nullptr, fmt::format("Can't find task [{},{}] within {} s.", meta.start_ts(), meta.task_id(), timeout.count())};
}
return it->second;
return it->second->getTunnel(request);
}

class MPPTaskCancelHelper
Expand Down Expand Up @@ -211,29 +210,6 @@ void MPPTaskManager::unregisterTask(MPPTask * task)
LOG_ERROR(log, "The task " + task->id.toString() + " cannot be found and fail to unregister");
}

std::vector<UInt64> MPPTaskManager::getCurrentQueries()
{
std::vector<UInt64> ret;
std::lock_guard lock(mu);
for (auto & it : mpp_query_map)
{
ret.push_back(it.first);
}
return ret;
}

std::vector<MPPTaskPtr> MPPTaskManager::getCurrentTasksForQuery(UInt64 query_id)
{
std::vector<MPPTaskPtr> ret;
std::lock_guard lock(mu);
const auto & it = mpp_query_map.find(query_id);
if (it == mpp_query_map.end() || it->second->to_be_cancelled)
return ret;
for (const auto & task_it : it->second->task_map)
ret.push_back(task_it.second);
return ret;
}

String MPPTaskManager::toString()
{
std::lock_guard lock(mu);
Expand Down
8 changes: 2 additions & 6 deletions dbms/src/Flash/Mpp/MPPTaskManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,6 @@ class MPPTaskManager : private boost::noncopyable

~MPPTaskManager() = default;

std::vector<UInt64> getCurrentQueries();

std::vector<MPPTaskPtr> getCurrentTasksForQuery(UInt64 query_id);

MPPQueryTaskSetPtr getQueryTaskSetWithoutLock(UInt64 query_id);

bool registerTask(MPPTaskPtr task);
Expand All @@ -77,9 +73,9 @@ class MPPTaskManager : private boost::noncopyable

bool tryToScheduleTask(const MPPTaskPtr & task);

void releaseThreadsFromScheduler(const int needed_threads);
void releaseThreadsFromScheduler(int needed_threads);

MPPTaskPtr findTaskWithTimeout(const mpp::TaskMeta & meta, std::chrono::seconds timeout, std::string & errMsg);
std::pair<MPPTunnelPtr, String> findTunnelWithTimeout(const ::mpp::EstablishMPPConnectionRequest * request, std::chrono::seconds timeout);

void cancelMPPQuery(UInt64 query_id, const String & reason);

Expand Down

0 comments on commit b6536f6

Please sign in to comment.