From 2d46e03c8abb1b9b640b49530d72c817c0379802 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Mon, 8 May 2023 12:16:57 +0800 Subject: [PATCH] This is an automated cherry-pick of #7434 Signed-off-by: ti-chi-bot --- dbms/src/Flash/Mpp/MPPTask.cpp | 30 +++++++++++++++++++++--------- dbms/src/Flash/Mpp/MPPTask.h | 14 ++++++++++++-- 2 files changed, 33 insertions(+), 11 deletions(-) diff --git a/dbms/src/Flash/Mpp/MPPTask.cpp b/dbms/src/Flash/Mpp/MPPTask.cpp index 14a5a1bc962..ba72f8988f6 100644 --- a/dbms/src/Flash/Mpp/MPPTask.cpp +++ b/dbms/src/Flash/Mpp/MPPTask.cpp @@ -77,7 +77,7 @@ MPPTask::~MPPTask() void MPPTask::abortTunnels(const String & message, bool wait_sender_finish) { { - std::unique_lock lock(tunnel_and_receiver_mu); + std::unique_lock lock(mtx); if (unlikely(tunnel_set == nullptr)) return; } @@ -87,7 +87,7 @@ void MPPTask::abortTunnels(const String & message, bool wait_sender_finish) void MPPTask::abortReceivers() { { - std::unique_lock lock(tunnel_and_receiver_mu); + std::unique_lock lock(mtx); if unlikely (receiver_set == nullptr) return; } @@ -146,7 +146,7 @@ void MPPTask::registerTunnels(const mpp::DispatchTaskRequest & task_request) } } { - std::unique_lock lock(tunnel_and_receiver_mu); + std::unique_lock lock(mtx); if (status != INITIALIZING) throw Exception(fmt::format("The tunnels can not be registered, because the task is not in initializing state")); tunnel_set = std::move(tunnel_set_local); @@ -184,7 +184,7 @@ void MPPTask::initExchangeReceivers() return true; }); { - std::unique_lock lock(tunnel_and_receiver_mu); + std::unique_lock lock(mtx); if (status != RUNNING) throw Exception("exchange receiver map can not be initialized, because the task is not in running state"); receiver_set = std::move(receiver_set_local); @@ -200,7 +200,7 @@ std::pair MPPTask::getTunnel(const ::mpp::EstablishMPPConn "can't find tunnel ({} + {}) because the task is aborted, error message = {}", request->sender_meta().task_id(), request->receiver_meta().task_id(), - err_string); + getErrString()); return {nullptr, err_msg}; } @@ -218,6 +218,18 @@ std::pair MPPTask::getTunnel(const ::mpp::EstablishMPPConn return {tunnel_ptr, ""}; } +String MPPTask::getErrString() const +{ + std::lock_guard lock(mtx); + return err_string; +} + +void MPPTask::setErrString(const String & message) +{ + std::lock_guard lock(mtx); + err_string = message; +} + void MPPTask::unregisterTask() { auto [result, reason] = manager->unregisterTask(id); @@ -328,7 +340,7 @@ void MPPTask::preprocess() query_executor_holder.set(queryExecute(*context)); LOG_DEBUG(log, "init query executor done"); { - std::unique_lock lock(tunnel_and_receiver_mu); + std::unique_lock lock(mtx); if (status != RUNNING) throw Exception("task not in running state, may be cancelled"); for (auto & r : dag_context->getCoprocessorReaders()) @@ -446,7 +458,7 @@ void MPPTask::runImpl() } } } - mpp_task_statistics.end(status.load(), err_string); + mpp_task_statistics.end(status.load(), getErrString()); mpp_task_statistics.logTracingJson(); LOG_DEBUG(log, "task ends, time cost is {} ms.", stopwatch.elapsedMilliseconds()); @@ -486,7 +498,7 @@ void MPPTask::abort(const String & message, AbortType abort_type) } else if (previous_status == INITIALIZING && switchStatus(INITIALIZING, next_task_status)) { - err_string = message; + setErrString(message); /// if the task is in initializing state, mpp task can return error to TiDB directly, /// so just close all tunnels here abortTunnels("", false); @@ -498,7 +510,7 @@ void MPPTask::abort(const String & message, AbortType abort_type) /// abort the components from top to bottom because if bottom components are aborted /// first, the top components may see an error caused by the abort, which is not /// the original error - err_string = message; + setErrString(message); abortTunnels(message, false); abortDataStreams(abort_type); abortReceivers(); diff --git a/dbms/src/Flash/Mpp/MPPTask.h b/dbms/src/Flash/Mpp/MPPTask.h index 08c579031ad..3a86e512c5a 100644 --- a/dbms/src/Flash/Mpp/MPPTask.h +++ b/dbms/src/Flash/Mpp/MPPTask.h @@ -108,6 +108,14 @@ class MPPTask : public std::enable_shared_from_this void initExchangeReceivers(); +<<<<<<< HEAD +======= + String getErrString() const; + void setErrString(const String & message); + +private: + // To make sure dag_req is not destroyed before the mpp task ends. +>>>>>>> 12bda10fd1 (Tsan: fix data race on `ComputeServerRunner.cancelJoinTasks` (#7434)) tipb::DAGRequest dag_req; mpp::TaskMeta meta; MPPTaskId id; @@ -128,9 +136,11 @@ class MPPTask : public std::enable_shared_from_this QueryExecutorHolder query_executor_holder; std::atomic status{INITIALIZING}; - String err_string; - std::mutex tunnel_and_receiver_mu; + /// Used to protect concurrent access to `err_string`, `tunnel_set`, and `receiver_set`. + mutable std::mutex mtx; + + String err_string; MPPTunnelSetPtr tunnel_set;