Skip to content

Commit

Permalink
fix a bug that ExchangeReceiver can't be canceled (#4441) (#4445)
Browse files Browse the repository at this point in the history
close #4432
  • Loading branch information
ti-chi-bot authored Mar 26, 2022
1 parent f8feb5c commit 81025ae
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 5 deletions.
1 change: 1 addition & 0 deletions dbms/src/DataStreams/TiRemoteBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream
virtual void readSuffix() override
{
LOG_FMT_DEBUG(log, "finish read {} rows from remote", total_rows);
remote_reader->close();
}
};

Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Flash/Coprocessor/CoprocessorReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ class CoprocessorReader

void cancel() { resp_iter.cancel(); }


static DecodeDetail decodeChunks(
const std::shared_ptr<tipb::SelectResponse> & resp,
std::queue<Block> & block_queue,
Expand Down Expand Up @@ -187,6 +188,8 @@ class CoprocessorReader
collected = false;
}

void close() {}

bool collected = false;
int concurrency_;
};
Expand Down
8 changes: 8 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,14 @@ const std::unordered_map<String, std::shared_ptr<ExchangeReceiver>> & DAGContext
return mpp_exchange_receiver_map;
}

void DAGContext::cancelAllExchangeReceiver()
{
for (auto & it : mpp_exchange_receiver_map)
{
it.second->cancel();
}
}

int DAGContext::getNewThreadCountOfExchangeReceiver() const
{
return new_thread_count_of_exchange_receiver;
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,8 @@ class DAGContext
return sql_mode & f;
}

void cancelAllExchangeReceiver();

void initExchangeReceiverIfMPP(Context & context, size_t max_streams);
const std::unordered_map<String, std::shared_ptr<ExchangeReceiver>> & getMPPExchangeReceiverMap() const;

Expand Down
20 changes: 16 additions & 4 deletions dbms/src/Flash/Mpp/ExchangeReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -310,15 +310,21 @@ ExchangeReceiverBase<RPCContext>::ExchangeReceiverBase(
template <typename RPCContext>
ExchangeReceiverBase<RPCContext>::~ExchangeReceiverBase()
{
setState(ExchangeReceiverState::CLOSED);
msg_channel.finish();
close();
thread_manager->wait();
}

template <typename RPCContext>
void ExchangeReceiverBase<RPCContext>::cancel()
{
setState(ExchangeReceiverState::CANCELED);
setEndState(ExchangeReceiverState::CANCELED);
msg_channel.finish();
}

template <typename RPCContext>
void ExchangeReceiverBase<RPCContext>::close()
{
setEndState(ExchangeReceiverState::CLOSED);
msg_channel.finish();
}

Expand Down Expand Up @@ -599,10 +605,16 @@ ExchangeReceiverResult ExchangeReceiverBase<RPCContext>::nextResult(std::queue<B
}

template <typename RPCContext>
void ExchangeReceiverBase<RPCContext>::setState(ExchangeReceiverState new_state)
bool ExchangeReceiverBase<RPCContext>::setEndState(ExchangeReceiverState new_state)
{
assert(new_state == ExchangeReceiverState::CANCELED || new_state == ExchangeReceiverState::CLOSED);
std::unique_lock lock(mu);
if (state == ExchangeReceiverState::CANCELED || state == ExchangeReceiverState::CLOSED)
{
return false;
}
state = new_state;
return true;
}

template <typename RPCContext>
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Flash/Mpp/ExchangeReceiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ class ExchangeReceiverBase

void cancel();

void close();

const DAGSchema & getOutputSchema() const { return schema; }

ExchangeReceiverResult nextResult(
Expand Down Expand Up @@ -129,7 +131,7 @@ class ExchangeReceiverBase
void readLoop(const Request & req);
void reactor(const std::vector<Request> & async_requests);

void setState(ExchangeReceiverState new_state);
bool setEndState(ExchangeReceiverState new_state);
ExchangeReceiverState getState();

DecodeDetail decodeChunks(
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/Mpp/MPPTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,8 @@ void MPPTask::runImpl()
else
{
context->getProcessList().sendCancelToQuery(context->getCurrentQueryId(), context->getClientInfo().current_user, true);
if (dag_context)
dag_context->cancelAllExchangeReceiver();
writeErrToAllTunnels(err_msg);
}
LOG_FMT_INFO(log, "task ends, time cost is {} ms.", stopwatch.elapsedMilliseconds());
Expand Down

0 comments on commit 81025ae

Please sign in to comment.