Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
SeaRise committed May 8, 2023
1 parent a2887da commit cae66bb
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 11 deletions.
30 changes: 21 additions & 9 deletions dbms/src/Flash/Mpp/MPPTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ bool MPPTask::isRootMPPTask() const
void MPPTask::abortTunnels(const String & message, bool wait_sender_finish)
{
{
std::unique_lock lock(tunnel_and_receiver_mu);
std::unique_lock lock(mtx);
if (unlikely(tunnel_set == nullptr))
return;
}
Expand All @@ -125,7 +125,7 @@ void MPPTask::abortTunnels(const String & message, bool wait_sender_finish)
void MPPTask::abortReceivers()
{
{
std::unique_lock lock(tunnel_and_receiver_mu);
std::unique_lock lock(mtx);
if unlikely (receiver_set == nullptr)
return;
}
Expand Down Expand Up @@ -180,7 +180,7 @@ void MPPTask::registerTunnels(const mpp::DispatchTaskRequest & task_request)
injectFailPointDuringRegisterTunnel(dag_context->isRootMPPTask());
}
{
std::unique_lock lock(tunnel_and_receiver_mu);
std::unique_lock lock(mtx);
if (status != INITIALIZING)
throw Exception(fmt::format("The tunnels can not be registered, because the task is not in initializing state"));
tunnel_set = std::move(tunnel_set_local);
Expand Down Expand Up @@ -222,7 +222,7 @@ void MPPTask::initExchangeReceivers()
return true;
});
{
std::unique_lock lock(tunnel_and_receiver_mu);
std::unique_lock lock(mtx);
if (status != RUNNING)
throw Exception("exchange receiver map can not be initialized, because the task is not in running state");
receiver_set = std::move(receiver_set_local);
Expand All @@ -238,7 +238,7 @@ std::pair<MPPTunnelPtr, String> MPPTask::getTunnel(const ::mpp::EstablishMPPConn
"can't find tunnel ({} + {}) because the task is aborted, error message = {}",
request->sender_meta().task_id(),
request->receiver_meta().task_id(),
err_string);
getErrString());
return {nullptr, err_msg};
}

Expand All @@ -256,6 +256,18 @@ std::pair<MPPTunnelPtr, String> MPPTask::getTunnel(const ::mpp::EstablishMPPConn
return {tunnel_ptr, ""};
}

String MPPTask::getErrString() const
{
std::lock_guard lock(mtx);
return err_string;
}

void MPPTask::setErrString(const String & message)
{
std::lock_guard lock(mtx);
err_string = message;
}

void MPPTask::unregisterTask()
{
auto [result, reason] = manager->unregisterTask(id);
Expand Down Expand Up @@ -349,7 +361,7 @@ void MPPTask::preprocess()
query_executor_holder.set(queryExecute(*context));
LOG_DEBUG(log, "init query executor done");
{
std::unique_lock lock(tunnel_and_receiver_mu);
std::unique_lock lock(mtx);
if (status != RUNNING)
throw Exception("task not in running state, may be cancelled");
for (auto & r : dag_context->getCoprocessorReaders())
Expand Down Expand Up @@ -481,7 +493,7 @@ void MPPTask::runImpl()
}
}
}
mpp_task_statistics.end(status.load(), err_string);
mpp_task_statistics.end(status.load(), getErrString());
mpp_task_statistics.logTracingJson();

LOG_DEBUG(log, "task ends, time cost is {} ms.", stopwatch.elapsedMilliseconds());
Expand Down Expand Up @@ -521,7 +533,7 @@ void MPPTask::abort(const String & message, AbortType abort_type)
}
else if (previous_status == INITIALIZING && switchStatus(INITIALIZING, next_task_status))
{
err_string = message;
setErrString(message);
/// if the task is in initializing state, mpp task can return error to TiDB directly,
/// so just close all tunnels here
abortTunnels("", false);
Expand All @@ -533,7 +545,7 @@ void MPPTask::abort(const String & message, AbortType abort_type)
/// abort the components from top to bottom because if bottom components are aborted
/// first, the top components may see an error caused by the abort, which is not
/// the original error
err_string = message;
setErrString(message);
abortTunnels(message, false);
abortQueryExecutor();
abortReceivers();
Expand Down
10 changes: 8 additions & 2 deletions dbms/src/Flash/Mpp/MPPTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ class MPPTask : public std::enable_shared_from_this<MPPTask>

void initExchangeReceivers();

String getErrString() const;
void setErrString(const String & message);

private:
// To make sure dag_req is not destroyed before the mpp task ends.
tipb::DAGRequest dag_req;
mpp::TaskMeta meta;
Expand All @@ -130,9 +134,11 @@ class MPPTask : public std::enable_shared_from_this<MPPTask>
QueryExecutorHolder query_executor_holder;

std::atomic<TaskStatus> status{INITIALIZING};
String err_string;

std::mutex tunnel_and_receiver_mu;
/// Used to protect concurrent access to `err_string`, `tunnel_set`, and `receiver_set`.
mutable std::mutex mtx;

String err_string;

MPPTunnelSetPtr tunnel_set;

Expand Down

0 comments on commit cae66bb

Please sign in to comment.