From 81025ae364a6aeec83891e1eb4870915a5ee99ca Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Sat, 26 Mar 2022 12:32:33 +0800 Subject: [PATCH] fix a bug that ExchangeReceiver can't be canceled (#4441) (#4445) close pingcap/tiflash#4432 --- .../DataStreams/TiRemoteBlockInputStream.h | 1 + .../src/Flash/Coprocessor/CoprocessorReader.h | 3 +++ dbms/src/Flash/Coprocessor/DAGContext.cpp | 8 ++++++++ dbms/src/Flash/Coprocessor/DAGContext.h | 2 ++ dbms/src/Flash/Mpp/ExchangeReceiver.cpp | 20 +++++++++++++++---- dbms/src/Flash/Mpp/ExchangeReceiver.h | 4 +++- dbms/src/Flash/Mpp/MPPTask.cpp | 2 ++ 7 files changed, 35 insertions(+), 5 deletions(-) diff --git a/dbms/src/DataStreams/TiRemoteBlockInputStream.h b/dbms/src/DataStreams/TiRemoteBlockInputStream.h index dad7c7c9abc..a3b9f5e2470 100644 --- a/dbms/src/DataStreams/TiRemoteBlockInputStream.h +++ b/dbms/src/DataStreams/TiRemoteBlockInputStream.h @@ -241,6 +241,7 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream virtual void readSuffix() override { LOG_FMT_DEBUG(log, "finish read {} rows from remote", total_rows); + remote_reader->close(); } }; diff --git a/dbms/src/Flash/Coprocessor/CoprocessorReader.h b/dbms/src/Flash/Coprocessor/CoprocessorReader.h index 98a8f399237..8a3eb471e54 100644 --- a/dbms/src/Flash/Coprocessor/CoprocessorReader.h +++ b/dbms/src/Flash/Coprocessor/CoprocessorReader.h @@ -95,6 +95,7 @@ class CoprocessorReader void cancel() { resp_iter.cancel(); } + static DecodeDetail decodeChunks( const std::shared_ptr & resp, std::queue & block_queue, @@ -187,6 +188,8 @@ class CoprocessorReader collected = false; } + void close() {} + bool collected = false; int concurrency_; }; diff --git a/dbms/src/Flash/Coprocessor/DAGContext.cpp b/dbms/src/Flash/Coprocessor/DAGContext.cpp index 70d8d02d12b..bdcf4307a01 100644 --- a/dbms/src/Flash/Coprocessor/DAGContext.cpp +++ b/dbms/src/Flash/Coprocessor/DAGContext.cpp @@ -219,6 +219,14 @@ const std::unordered_map> & DAGContext return mpp_exchange_receiver_map; } +void DAGContext::cancelAllExchangeReceiver() +{ + for (auto & it : mpp_exchange_receiver_map) + { + it.second->cancel(); + } +} + int DAGContext::getNewThreadCountOfExchangeReceiver() const { return new_thread_count_of_exchange_receiver; diff --git a/dbms/src/Flash/Coprocessor/DAGContext.h b/dbms/src/Flash/Coprocessor/DAGContext.h index 448e98482d8..f04293729ad 100644 --- a/dbms/src/Flash/Coprocessor/DAGContext.h +++ b/dbms/src/Flash/Coprocessor/DAGContext.h @@ -266,6 +266,8 @@ class DAGContext return sql_mode & f; } + void cancelAllExchangeReceiver(); + void initExchangeReceiverIfMPP(Context & context, size_t max_streams); const std::unordered_map> & getMPPExchangeReceiverMap() const; diff --git a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp index b7873fd5e62..98e784affa0 100644 --- a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp +++ b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp @@ -310,15 +310,21 @@ ExchangeReceiverBase::ExchangeReceiverBase( template ExchangeReceiverBase::~ExchangeReceiverBase() { - setState(ExchangeReceiverState::CLOSED); - msg_channel.finish(); + close(); thread_manager->wait(); } template void ExchangeReceiverBase::cancel() { - setState(ExchangeReceiverState::CANCELED); + setEndState(ExchangeReceiverState::CANCELED); + msg_channel.finish(); +} + +template +void ExchangeReceiverBase::close() +{ + setEndState(ExchangeReceiverState::CLOSED); msg_channel.finish(); } @@ -599,10 +605,16 @@ ExchangeReceiverResult ExchangeReceiverBase::nextResult(std::queue -void ExchangeReceiverBase::setState(ExchangeReceiverState new_state) +bool ExchangeReceiverBase::setEndState(ExchangeReceiverState new_state) { + assert(new_state == ExchangeReceiverState::CANCELED || new_state == ExchangeReceiverState::CLOSED); std::unique_lock lock(mu); + if (state == ExchangeReceiverState::CANCELED || state == ExchangeReceiverState::CLOSED) + { + return false; + } state = new_state; + return true; } template diff --git a/dbms/src/Flash/Mpp/ExchangeReceiver.h b/dbms/src/Flash/Mpp/ExchangeReceiver.h index 37e7a401e7a..d0d4b759410 100644 --- a/dbms/src/Flash/Mpp/ExchangeReceiver.h +++ b/dbms/src/Flash/Mpp/ExchangeReceiver.h @@ -98,6 +98,8 @@ class ExchangeReceiverBase void cancel(); + void close(); + const DAGSchema & getOutputSchema() const { return schema; } ExchangeReceiverResult nextResult( @@ -129,7 +131,7 @@ class ExchangeReceiverBase void readLoop(const Request & req); void reactor(const std::vector & async_requests); - void setState(ExchangeReceiverState new_state); + bool setEndState(ExchangeReceiverState new_state); ExchangeReceiverState getState(); DecodeDetail decodeChunks( diff --git a/dbms/src/Flash/Mpp/MPPTask.cpp b/dbms/src/Flash/Mpp/MPPTask.cpp index 394e034536b..424437eb01c 100644 --- a/dbms/src/Flash/Mpp/MPPTask.cpp +++ b/dbms/src/Flash/Mpp/MPPTask.cpp @@ -353,6 +353,8 @@ void MPPTask::runImpl() else { context->getProcessList().sendCancelToQuery(context->getCurrentQueryId(), context->getClientInfo().current_user, true); + if (dag_context) + dag_context->cancelAllExchangeReceiver(); writeErrToAllTunnels(err_msg); } LOG_FMT_INFO(log, "task ends, time cost is {} ms.", stopwatch.elapsedMilliseconds());